Giriş
Şu satırı dahil ederiz
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
Stateful Retry İçindir. Açıklaması şöyle
Stateful retry is not offered by the Java Apache client by itself, but is available as a configuration option out of the box with Spring Kafka, using the SeekToCurrentErrorHandler.
Açıklaması şöyle
An additional feature of the SeekToCurrentErrorHandler is that those events within the batch that have successfully been processed prior to the event that results in a RetryableException being thrown are still able to be successfully marked as consumed, so are not themselves re-delivered too in the next poll.
constructor - Custom ConsumerRecord + BackOff
Kullanmayın
Örnek - FixedBackOff
Şöyle yaparız
//Stateful retry listener. @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaStatefulRetryListenerContainerFactory ConsumerFactory<String, String> consumerFactory) { SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler( (record, exception) -> {}, // 4 seconds pause, 4 retries. new FixedBackOff(4000L, 4L)); ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD); factory.setErrorHandler(errorHandler); return factory; }
Örnek - ExponentialBackOff
Elimizde şöyle bir kod olsun. SeekToCurrentErrorHandler kullanımı ile ilgili bir örnek burada ve burada.
Elimizde şöyle bir kod olsun. SeekToCurrentErrorHandler kullanımı ile ilgili bir örnek burada ve burada.
@Configuration
@EnableKafka
public class SpringConfiguration {
@Bean
public SeekToCurrentErrorHandler eh() {
long initialMillis = 500;
long factor = 2;
long maxElapsedTimeSecs = 60;
ExponentialBackOff backoff = new ExponentialBackOff(initialMillis, factor);
backoff.setMaxElapsedTime(maxElapsedTimeSecs*1000);
BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer = (rec, exc) -> {
// TODO In the final app do something more useful here
logger.error("* Maximum retry policy has been reached {} - acknowledging and
proceeding *", rec);
};
SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(recoverer, backoff);
eh.setCommitRecovered(true);
return eh;
}
...
}
Örnek - FixedBackOff
Şöyle yaparız
//Stateful retry listener. @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaStatefulRetryListenerContainerFactory ConsumerFactory<String, String> consumerFactory) { SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler( (record, exception) -> {}, // 4 seconds pause, 4 retries. new FixedBackOff(4000L, 4L)); ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD); factory.setErrorHandler(errorHandler); return factory; }
Örnek - ExponentialBackOff
Elimizde şöyle bir kod olsun. SeekToCurrentErrorHandler kullanımı ile ilgili bir örnek burada ve burada.
@Configuration
@EnableKafka
public class SpringConfiguration {
@Bean
public SeekToCurrentErrorHandler eh() {
long initialMillis = 500;
long factor = 2;
long maxElapsedTimeSecs = 60;
ExponentialBackOff backoff = new ExponentialBackOff(initialMillis, factor);
backoff.setMaxElapsedTime(maxElapsedTimeSecs*1000);
BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer = (rec, exc) -> {
// TODO In the final app do something more useful here
logger.error("* Maximum retry policy has been reached {} - acknowledging and
proceeding *", rec);
};
SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(recoverer, backoff);
eh.setCommitRecovered(true);
return eh;
}
...
}
Şöyle yaparız.@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory(RetryTemplate retryTemplate) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMissingTopicsFatal(missingTopicsFatal);//True in prod, false otherwise
factory.getContainerProperties().setAckMode
(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setSyncCommitTimeout(Duration.ofSeconds(60));
factory.setStatefulRetry(true);
factory.setErrorHandler(eh());
return factory;
}
constructor - Sadece BackOff
Örnek - FixedBackOff
Şöyle yaparız. Eğer hata varsa tekrar işlemeye çalışır. Her hata arasında 60 saniye bekler.
@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<byte[], byte[]>>
kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<byte[], byte[]> factory =
new ConcurrentKafkaListenerContainerFactory<>();factory.setErrorHandler(new SeekToCurrentErrorHandler(
new FixedBackOff(60000L))); // per listenerfactory.setConsumerFactory(consumerFactory());return factory;}
Şöyle yaparız.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory(RetryTemplate retryTemplate) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMissingTopicsFatal(missingTopicsFatal);//True in prod, false otherwise
factory.getContainerProperties().setAckMode
(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setSyncCommitTimeout(Duration.ofSeconds(60));
factory.setStatefulRetry(true);
factory.setErrorHandler(eh());
return factory;
}
Örnek - FixedBackOff
Şöyle yaparız. Eğer hata varsa tekrar işlemeye çalışır. Her hata arasında 60 saniye bekler.
@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<byte[], byte[]>>
kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<byte[], byte[]> factory =
new ConcurrentKafkaListenerContainerFactory<>();factory.setErrorHandler(new SeekToCurrentErrorHandler(
new FixedBackOff(60000L))); // per listenerfactory.setConsumerFactory(consumerFactory());return factory;}
Hiç yorum yok:
Yorum Gönder