Giriş
Açıklaması şöyle
Açıklaması şöyle
To create a parallel consumer for Apache Kafka in a Java Spring Boot application, you can utilize the Spring Kafka library and leverage the ConcurrentKafkaListenerContainerFactory provided by Spring Kafka.
Yani Kafka'ya bağlanmak için KafkaListenerContainerFactory arayüzünü gerçekleştiren ConcurrentKafkaListenerContainerFactory sınıfını bean olarak yaratmak gerekir.
setConcurrency metodu
setConcurrency metodu yazısına taşıdım.
setConsumerFactory metodu - ConsumerFactory
ConsumerFactory arayüzünü DefaultKafkaConsumerFactory gerçekleştirir.
Örnek
Şöyle yaparız.
setErrorHandler metodu
Örnek
Şöyle yaparız.
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers);
// .. Misc other properties related to serialisation etc ..
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory(RetryTemplate retryTemplate) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
...
return factory;
}
Örnek
Elimizde şöyle bir ConsumerFactory olsun
@Configurationpublic class KafkaConsumerConfig {@Autowiredprivate KafkaProperties kafkaProperties;@Beanpublic ConsumerFactory<String, Object> consumerFactory() {JsonDeserializer<Object> jsonDeserializer = new JsonDeserializer<>();jsonDeserializer.addTrustedPackages("*");return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(),
new StringDeserializer(), jsonDeserializer);}}
Şöyle yaparız
@Configurationpublic class KafkaConsumerConfig {@Value("${kafka.topics.bookingConfirmation.consumerThreads}")private Integer bookingConfirmationConsumerThreads;@Value("${kafka.topics.userReviews.consumerThreads}")private Integer userReviewConsumerThreads;@Beanpublic ConcurrentKafkaListenerContainerFactory<String, Object>
kafkaListenerBookingConfirmationFactory() {ConcurrentKafkaListenerContainerFactory<String, Object> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(bookingConfirmationConsumerThreads);return factory;}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, Object>
kafkaListenerUserReviewFactory() {ConcurrentKafkaListenerContainerFactory<String, Object> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(userReviewConsumerThreads);return factory;}}
setErrorHandler metodu yazısına taşıdım.
setRecordFilterStrategy metodu
Filtreyi geçemeyen mesajları listener göremez.
setRecordFilterStrategy yazısına taşıdım
setRetryTemplate metodu
Örnek
Şöyle yaparız
@BeanConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory( ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory) { ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); configurer.configure(factory, kafkaConsumerFactory); factory.setErrorHandler(((exception, data) -> { log.error("Error in process: Record key/id is {} with Exception {} ", data.key(), exception.getMessage()); // maybe send to dead-letter topic })); factory.setRetryTemplate(retryTemplate()); return factory; } private RetryTemplate retryTemplate() { RetryTemplate retryTemplate = new RetryTemplate(); retryTemplate.setRetryPolicy(getSimpleRetryPolicy()); return retryTemplate; } private SimpleRetryPolicy getSimpleRetryPolicy() { Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>(); //exceptionMap.put(IllegalArgumentException.class, false); exceptionMap.put(IllegalStateException.class, true); return new SimpleRetryPolicy(3, exceptionMap, true); } }
Örnek
Şöyle yaparız
@Beanpublic ConcurrentKafkaListenerContainerFactory<String, Foo>
kafkaRetryListenerContainerFactory(KafkaTemplate<String, Foo> kafkaTemplate,ObjectMapper objectMapper) {ConcurrentKafkaListenerContainerFactory<String, Foo> factory =
new ConcurrentKafkaListenerContainerFactory<>();factory.setMessageConverter(new StringJsonMessageConverter());factory.setRetryTemplate(retryTemplate());factory.setRecoveryCallback(retryContext -> {ConsumerRecord consumerRecord = (ConsumerRecord) retryContext
.getAttribute(CONTEXT_RECORD);kafkaTemplate.send("application-events", objectMapper.readValue(consumerRecord.value()
.toString(), Foo.class));return Optional.empty();});return factory;}@Beanpublic RetryTemplate retryTemplate() {RetryTemplate retryTemplate = new RetryTemplate();FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();fixedBackOffPolicy.setBackOffPeriod(3000);retryTemplate.setBackOffPolicy(fixedBackOffPolicy);SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();simpleRetryPolicy.setMaxAttempts(3);retryTemplate.setRetryPolicy(simpleRetryPolicy);return retryTemplate;}
Hiç yorum yok:
Yorum Gönder