13 Şubat 2022 Pazar

SpringKafka Consumer SeekToCurrentErrorHandler

Giriş
Şu satırı dahil ederiz
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
Stateful Retry İçindir. Açıklaması şöyle
Stateful retry is not offered by the Java Apache client by itself, but is available as a configuration option out of the box with Spring Kafka, using the SeekToCurrentErrorHandler.
Açıklaması şöyle
An additional feature of the SeekToCurrentErrorHandler is that those events within the batch that have successfully been processed prior to the event that results in a RetryableException being thrown are still able to be successfully marked as consumed, so are not themselves re-delivered too in the next poll.
constructor - Custom ConsumerRecord + BackOff 
Kullanmayın
Örnek - FixedBackOff
Şöyle yaparız
//Stateful retry listener.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> 
  kafkaStatefulRetryListenerContainerFactory
    ConsumerFactory<String, String> consumerFactory) {

  SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler(
    (record, exception) -> {}, 
                           // 4 seconds pause, 4 retries.
                           new FixedBackOff(4000L, 4L));

  ConcurrentKafkaListenerContainerFactory<String, String> factory = 
    new ConcurrentKafkaListenerContainerFactory();
  factory.setConsumerFactory(consumerFactory);
  factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
  factory.setErrorHandler(errorHandler);
  return factory;
}
Örnek - ExponentialBackOff 
Elimizde şöyle bir kod olsun. SeekToCurrentErrorHandler kullanımı ile ilgili bir örnek burada ve burada.
@Configuration
@EnableKafka
public class SpringConfiguration {

  @Bean
  public SeekToCurrentErrorHandler eh() {
    long initialMillis = 500;
    long factor = 2;
    long maxElapsedTimeSecs = 60;
    ExponentialBackOff backoff = new ExponentialBackOff(initialMillis, factor);
    backoff.setMaxElapsedTime(maxElapsedTimeSecs*1000);

    BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer = (rec, exc) ->  {
      // TODO In the final app do something more useful here
      logger.error("* Maximum retry policy has been reached {} - acknowledging and
        proceeding *", rec);
    };

    SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(recoverer, backoff);
    eh.setCommitRecovered(true);
    return eh;
  }
  ...
}
Örnek - FixedBackOff 
Şöyle yaparız
//Stateful retry listener.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> 
  kafkaStatefulRetryListenerContainerFactory
    ConsumerFactory<String, String> consumerFactory) {

  SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler(
    (record, exception) -> {}, 
                           // 4 seconds pause, 4 retries.
                           new FixedBackOff(4000L, 4L));

  ConcurrentKafkaListenerContainerFactory<String, String> factory = 
    new ConcurrentKafkaListenerContainerFactory();
  factory.setConsumerFactory(consumerFactory);
  factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
  factory.setErrorHandler(errorHandler);
  return factory;
}
Örnek - ExponentialBackOff 
Elimizde şöyle bir kod olsun. SeekToCurrentErrorHandler kullanımı ile ilgili bir örnek burada ve burada.
@Configuration
@EnableKafka
public class SpringConfiguration {

  @Bean
  public SeekToCurrentErrorHandler eh() {
    long initialMillis = 500;
    long factor = 2;
    long maxElapsedTimeSecs = 60;
    ExponentialBackOff backoff = new ExponentialBackOff(initialMillis, factor);
    backoff.setMaxElapsedTime(maxElapsedTimeSecs*1000);

    BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer = (rec, exc) ->  {
      // TODO In the final app do something more useful here
      logger.error("* Maximum retry policy has been reached {} - acknowledging and
        proceeding *", rec);
    };

    SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(recoverer, backoff);
    eh.setCommitRecovered(true);
    return eh;
  }
  ...
}
Şöyle yaparız.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
  kafkaListenerContainerFactory(RetryTemplate retryTemplate) {
  ConcurrentKafkaListenerContainerFactory<String, String> factory =
    new ConcurrentKafkaListenerContainerFactory<>();
  factory.setConsumerFactory(consumerFactory());
  factory.setMissingTopicsFatal(missingTopicsFatal);//True in prod, false otherwise

  factory.getContainerProperties().setAckMode
    (ContainerProperties.AckMode.MANUAL_IMMEDIATE);
  factory.getContainerProperties().setSyncCommitTimeout(Duration.ofSeconds(60));
  factory.setStatefulRetry(true);
  factory.setErrorHandler(eh());
  return factory;
}

constructor - Sadece BackOff 
Örnek - FixedBackOff
Şöyle yaparız. Eğer hata varsa tekrar işlemeye çalışır. Her hata arasında 60 saniye bekler.
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<byte[], byte[]>>
kafkaListenerContainerFactory() {
  ConcurrentKafkaListenerContainerFactory<byte[], byte[]> factory =
new ConcurrentKafkaListenerContainerFactory<>();
  factory.setErrorHandler(new SeekToCurrentErrorHandler(
new FixedBackOff(60000L))); // per listener
  factory.setConsumerFactory(consumerFactory());
  return factory;
}

Şöyle yaparız.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
  kafkaListenerContainerFactory(RetryTemplate retryTemplate) {
  ConcurrentKafkaListenerContainerFactory<String, String> factory =
    new ConcurrentKafkaListenerContainerFactory<>();
  factory.setConsumerFactory(consumerFactory());
  factory.setMissingTopicsFatal(missingTopicsFatal);//True in prod, false otherwise

  factory.getContainerProperties().setAckMode
    (ContainerProperties.AckMode.MANUAL_IMMEDIATE);
  factory.getContainerProperties().setSyncCommitTimeout(Duration.ofSeconds(60));
  factory.setStatefulRetry(true);
  factory.setErrorHandler(eh());
  return factory;
}
Örnek FixedBackOff
Şöyle yaparız. Eğer hata varsa tekrar işlemeye çalışır. Her hata arasında 60 saniye bekler.
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<byte[], byte[]>>
kafkaListenerContainerFactory() {
  ConcurrentKafkaListenerContainerFactory<byte[], byte[]> factory =
new ConcurrentKafkaListenerContainerFactory<>();
  factory.setErrorHandler(new SeekToCurrentErrorHandler(
new FixedBackOff(60000L))); // per listener
  factory.setConsumerFactory(consumerFactory());
  return factory;
}

Hiç yorum yok:

Yorum Gönder