29 Mart 2021 Pazartesi

SpringKafka Consumer ConcurrentKafkaListenerContainerFactory.setErrorHandler() metodu

Giriş
Hataları ele almak için seçeneklerimiz şöyle
When you build your spring boot application and make use of Kafka in order to create some consumer, Spring provides on its own a listener container for asynchronous execution of POJO listeners. The provided listener container has three ways to handle a potential exception:

1. Ignores it and moves to the next record.
2. It can retry to process the same item from the listed topics/partitions of that listener.
3. It can send the item to a dead letter topic.
Açıklaması şöyle. Yani eğer bir şey yapmazsak varsayılan davranış hatalar dikkate almamak
By default, records that fail are simply logged, and we move on to the next one. We can, however, configure an error handler in the listener container to perform some other action. 
Retry
Retry için iki seçenek var
1. Kafka Client Kütüphanesini kullanmak. Bu yöntem stateless retry olarak anılıyor.
2. Spring sınıflarını kullanmak. Bu yöntem stateful retry olarak anılıyor.

setErrorHandler metodu - Stateful Retry İçindir
SeekToCurrentErrorHandler kullanılabilir.


setRetryTemplate metodu - Stateless Retry İçindir
Açıklaması şöyle
The Java Kafka client library offers stateless retry, with the Kafka consumer retrying a retryable exception as part of the consumer poll.
- Retries happen within the consumer poll for the batch.
- Consumer poll must complete before poll timeout, containing all retries, and total processing time (including REST calls & DB calls), retry delay and backoff, for all records in the batch.
- Default poll time is 5 minutes for 500 records in the batch. This only averages to 600ms per event.
- If poll time is exceeded this results in event duplication.
- Calculation of retries/time possible, but total retry duration will have to be short.
Örnek
Şöyle yaparız
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.retry.support.RetryTemplate;

//Stateless retry listener.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
  kafkaStatelessRetryListenerContainerFactory(
    ConsumerFactory<String, String> consumerFactory, final RetryTemplate retryTemplate) {

  ConcurrentKafkaListenerContainerFactory<String, String> factory =
    new ConcurrentKafkaListenerContainerFactory();
  factory.setConsumerFactory(consumerFactory);
  factory.setRetryTemplate(retryTemplate);
  factory.setRecoveryCallback((context -> {
    log.warn("**** Retries exhausted - error class: "+context.getLastThrowable() +
             " - error message: "+context.getLastThrowable().getMessage());
      // Return null to mark processing complete.
      return null;
  }));
  return factory;
}

@Bean
public RetryTemplate retryTemplate() {
  return RetryTemplate.builder()
    .fixedBackoff(4000)
    .maxAttempts(5)
    .build();
}

Hiç yorum yok:

Yorum Gönder