Giriş
Şu satırı dahil ederiz
import org.springframework.kafka.listener.ConsumerSeekAware;
Topicleri sürekli en baştan okumak için 2 tane yöntem var
1. enable-auto-commit=false Yapmak
Kafka'ya bağlandıktan sonra @KafkaListener anotasyonlu sınıflar çağrılmaya başlanır.
Eğer listener'lar topic'lerin her zaman en başından başlasın istersek şöyle yaparız.
Eğer listener'lar topic'lerin her zaman en başından başlasın istersek şöyle yaparız.
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
2. ConsumerSeekAware Kullanmak
Şu satırı dahil ederiz
import org.springframework.kafka.listener.ConsumerSeekAware;
onPartitionsAssigned metodu
Açıklaması şöyle
This method is called when Kafka topic partitions are assigned to the consumer group. It provides a map of assigned partitions along with their current offsets. The ConsumerSeekCallback parameter allows you to manually control the seeking behavior of the consumer.
Örnek - seekToBeginning()
Listener'lar topic'lerin her zaman en başından başlasın istersek kullanılabilir. Şöyle yaparız
@Component @Slf4j public class FromBeginningKafkaMessageConsumer implements ConsumerSeekAware { @Override public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) { // Seek to the beginning of each assigned partition assignments.keySet().forEach( topicPartition -> callback.seekToBeginning(topicPartition.topic(), topicPartition.partition())); } @KafkaListener(topics = "...") public void listen(ConsumerRecord<String, String> record) { String key = record.key(); String value = record.value(); log.info("Received key : {} value : {}", key, value); // Process the message as per your requirement } }
Örnek - seek() İle Belirtilen Offset'e Gitmek
Şöyle yaparız
@Service public class ConsumerSeekAwareImpl implements ConsumerSeekAware { long offset = 777l; //your offset number String topic = "TopicName"; //your topic name String listenerId = "listenerId"; //your listener id //id of this listener has to be remembered for further //use in method getListenerContainer of KafkaListenerEndpointRegistry @KafkaListener(id = listenerId, groupId = "groupName", topics = topic) public void listenServiceCall(@Payload String message, @Header(KafkaHeaders.OFFSET) Long offset) { ... } @Override public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) { assignments.keySet().forEach(partition -> callback.seek(this.topic, partition, this.offset)); } } }
Hiç yorum yok:
Yorum Gönder