Giriş
Kaç thread kullanılacağını belirtir. Varsayılan değer 1. Açıklaması şöyleBy default, if you do not explicitly set the concurrency value using setConcurrency on the ConcurrentKafkaListenerContainerFactory, the default value is 1. This means that by default, only a single thread will be used to process messages from a particular Kafka topic.
ConcurrentKafkaListenerContainerFactory nesnesini yaratmak zorunda değiliz. setConcurrency() işlevini şöyle de yapabiliriz
@KafkaListener(
topics = "word-processor",
concurrency = "3",
groupId = "parallel-consumer")
public void consume(ConsumerRecord<String, String> consumerRecord) {
log.info("Partition : {}, Msg: {}",
consumerRecord.partition(), consumerRecord.value());
}
@KafkaListener(
topics = {"word-processor", "word-processor-two"},
concurrency = "2",
groupId = "multi-topic-parallel-consumer")
public void consume(ConsumerRecord<String, String> consumerRecord) {
log.info("Partition : {}, Msg: {}",
consumerRecord.partition(), consumerRecord.value());
}
Örnek
Şöyle yaparız.
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Long, LogDay>>
onlineKafkaListenerContainerFactory() {
Map<String, Object> propMap = ...;
ConcurrentKafkaListenerContainerFactory<Long, LogDay> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(5);
factory.getContainerProperties().setPollTimeout(1_000l);
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(propMap));
return factory;
}
Örnek
Şöyle yaparız
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
// Configure Kafka consumer properties
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "your-kafka-bootstrap-servers");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "your-consumer-group-id");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
// Additional consumer properties
return new DefaultKafkaConsumerFactory<>(props);
}
// Configure the listener container factory for parallel consumption
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3); // Set the number of consumer threads
return factory;
}
}
Hiç yorum yok:
Yorum Gönder