16 Aralık 2019 Pazartesi

SpringKafka Consumer ConcurrentKafkaListenerContainerFactory Sınıfı - Kafka'ya Bağlanmak İçindir

Giriş
Açıklaması şöyle
To create a parallel consumer for Apache Kafka in a Java Spring Boot application, you can utilize the Spring Kafka library and leverage the ConcurrentKafkaListenerContainerFactory provided by Spring Kafka.
Yani Kafka'ya bağlanmak için KafkaListenerContainerFactory arayüzünü gerçekleştiren ConcurrentKafkaListenerContainerFactory sınıfını bean olarak yaratmak gerekir.

Bu sınıfa serialization işlerini halleden consumer vermek için DefaultKafkaConsumerFactory nesnesi yaratılır.

setConcurrency metodu
setConcurrency metodu yazısına taşıdım.

setConsumerFactory metodu - ConsumerFactory
ConcurrentKafkaListenerContainerFactory nesnesin bir ConsumerFactory takılır.
ConsumerFactory arayüzünü DefaultKafkaConsumerFactory gerçekleştirir.
Örnek
Şöyle yaparız.
@Bean
public ConsumerFactory<String, String> consumerFactory() {
  Map<String, Object> props = new HashMap<>();
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers);
  // .. Misc other properties related to serialisation etc ..
  props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  return new DefaultKafkaConsumerFactory<>(props);
}


@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
 kafkaListenerContainerFactory(RetryTemplate retryTemplate) {
  ConcurrentKafkaListenerContainerFactory<String, String> factory =
    new ConcurrentKafkaListenerContainerFactory<>();
  factory.setConsumerFactory(consumerFactory());
  ...
  return factory;
}
Örnek
Elimizde şöyle bir ConsumerFactory olsun
@Configuration
public class KafkaConsumerConfig {

  @Autowired
  private KafkaProperties kafkaProperties;

  @Bean
  public ConsumerFactory<String, Object> consumerFactory() {
    JsonDeserializer<Object> jsonDeserializer = new JsonDeserializer<>();
    jsonDeserializer.addTrustedPackages("*");
    return new DefaultKafkaConsumerFactory<>(
      kafkaProperties.buildConsumerProperties(),
new StringDeserializer(), jsonDeserializer
    );
  }
}
Şöyle yaparız
@Configuration
public class KafkaConsumerConfig {

  @Value("${kafka.topics.bookingConfirmation.consumerThreads}")
  private Integer bookingConfirmationConsumerThreads;

  @Value("${kafka.topics.userReviews.consumerThreads}")
  private Integer userReviewConsumerThreads;
 
  @Bean
  public ConcurrentKafkaListenerContainerFactory<String, Object>
kafkaListenerBookingConfirmationFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(bookingConfirmationConsumerThreads);
    return factory;
  }

  @Bean
  public ConcurrentKafkaListenerContainerFactory<String, Object>
kafkaListenerUserReviewFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(userReviewConsumerThreads);
    return factory;
  }
}
setErrorHandler metodu
setErrorHandler metodu yazısına taşıdım.

setRecordFilterStrategy metodu
Filtreyi geçemeyen mesajları listener göremez.
setRecordFilterStrategy yazısına taşıdım

setRetryTemplate metodu
Örnek
Şöyle yaparız
@Bean
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory( ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory) { ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); configurer.configure(factory, kafkaConsumerFactory); factory.setErrorHandler(((exception, data) -> { log.error("Error in process: Record key/id is {} with Exception {} ", data.key(), exception.getMessage()); // maybe send to dead-letter topic })); factory.setRetryTemplate(retryTemplate()); return factory; } private RetryTemplate retryTemplate() { RetryTemplate retryTemplate = new RetryTemplate(); retryTemplate.setRetryPolicy(getSimpleRetryPolicy()); return retryTemplate; } private SimpleRetryPolicy getSimpleRetryPolicy() { Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>(); //exceptionMap.put(IllegalArgumentException.class, false); exceptionMap.put(IllegalStateException.class, true); return new SimpleRetryPolicy(3, exceptionMap, true); } }
Örnek
Şöyle yaparız
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Foo>
kafkaRetryListenerContainerFactory(
  KafkaTemplate<String, Foo> kafkaTemplate, 
  ObjectMapper objectMapper) {
  
  ConcurrentKafkaListenerContainerFactory<String, Foo> factory =
new ConcurrentKafkaListenerContainerFactory<>();
  factory.setMessageConverter(new StringJsonMessageConverter());
  factory.setRetryTemplate(retryTemplate());
  factory.setRecoveryCallback(retryContext -> {
    ConsumerRecord consumerRecord = (ConsumerRecord) retryContext
.getAttribute(CONTEXT_RECORD);
    kafkaTemplate.send("application-events", objectMapper.readValue(consumerRecord.value()
.toString(), Foo.class));
    return Optional.empty();
  });
  return factory;
}

@Bean
public RetryTemplate retryTemplate() {
  RetryTemplate retryTemplate = new RetryTemplate();

  FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
  fixedBackOffPolicy.setBackOffPeriod(3000);
  retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

  SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
  simpleRetryPolicy.setMaxAttempts(3);
  retryTemplate.setRetryPolicy(simpleRetryPolicy);

  return retryTemplate;
}

Hiç yorum yok:

Yorum Gönder