15 Mart 2023 Çarşamba

SpringKafka Consumer @KafkaListener Anotasyonu - Manual Commit

Manual Commit
Örnek
application.properties şöyle olsun
spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=my-group
Şöyle yaparız
@KafkaListener(topics = "my-topic") public void listen(ConsumerRecord<String, String> record, Acknowledgment ack) { try { // Process the message System.out.println("Received message: " + record.value()); // Manually commit the offset ack.acknowledge(); } catch (Exception e) { // Handle any exceptions } }
Batch Listener
ConcurrentKafkaListenerContainerFactory nesnesinin setBatchListener özelliği etkinleştirilir

Örnek
Şöyle yaparız
@Bean public ConcurrentKafkaListenerContainerFactory<String,String> batchListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setBatchListener(true); return factory; } @KafkaListener(topics = "my-topic", containerFactory = "batchListenerContainerFactory") public void listen(List<ConsumerRecord<String, String>> records, Acknowledgment ack) { try { for (ConsumerRecord<String, String> record : records) { // Process the message System.out.println("Received message: " + record.value()); } // Manually commit the offset ack.acknowledge(); } catch (Exception e) { // Handle any exceptions } }

Hiç yorum yok:

Yorum Gönder