5 Haziran 2023 Pazartesi

SpringKafka Consumer ConcurrentKafkaListenerContainerFactory.setConcurrency metodu

Giriş
Kaç thread kullanılacağını belirtir. Varsayılan değer 1. Açıklaması şöyle
By default, if you do not explicitly set the concurrency value using setConcurrency on the ConcurrentKafkaListenerContainerFactory, the default value is 1. This means that by default, only a single thread will be used to process messages from a particular Kafka topic.
ConcurrentKafkaListenerContainerFactory nesnesini yaratmak zorunda değiliz. setConcurrency() işlevini şöyle de yapabiliriz
@KafkaListener(
        topics = "word-processor", 
        concurrency = "3",
        groupId = "parallel-consumer")
public void consume(ConsumerRecord<String, String> consumerRecord) {
  log.info("Partition : {}, Msg: {}", 
    consumerRecord.partition(), consumerRecord.value());
}

@KafkaListener(
        topics = {"word-processor", "word-processor-two"},
        concurrency = "2",
        groupId = "multi-topic-parallel-consumer")
public void consume(ConsumerRecord<String, String> consumerRecord) {
  log.info("Partition : {}, Msg: {}",
    consumerRecord.partition(), consumerRecord.value());
}
Örnek
Şöyle yaparız.
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Long, LogDay>>
  onlineKafkaListenerContainerFactory() {
  Map<String, Object> propMap = ...;
  ConcurrentKafkaListenerContainerFactory<Long, LogDay> factory = 
    new ConcurrentKafkaListenerContainerFactory<>();
  factory.setConcurrency(5);
  factory.getContainerProperties().setPollTimeout(1_000l);
  factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(propMap));
  return factory;
}
Örnek
Şöyle yaparız
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

  // Configure Kafka consumer properties
  @Bean
  public ConsumerFactory<String, String> consumerFactory() {
   Map<String, Object> props = new HashMap<>();
   props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "your-kafka-bootstrap-servers");
   props.put(ConsumerConfig.GROUP_ID_CONFIG, "your-consumer-group-id");
   props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
     "org.apache.kafka.common.serialization.StringDeserializer");
   props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
     "org.apache.kafka.common.serialization.StringDeserializer");
   // Additional consumer properties

   return new DefaultKafkaConsumerFactory<>(props);
  }

  // Configure the listener container factory for parallel consumption
  @Bean
  public ConcurrentKafkaListenerContainerFactory<String, String> 
  kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(3); // Set the number of consumer threads

    return factory;
  }
}

Hiç yorum yok:

Yorum Gönder