12 Mayıs 2023 Cuma

SpringKafka Consumer Topicleri Sürekli En Baştan Okumak

Şu satırı dahil ederiz
import org.springframework.kafka.listener.ConsumerSeekAware;
Topicleri sürekli en baştan okumak için 2 tane yöntem var

1. enable-auto-commit=false Yapmak
Kafka'ya bağlandıktan sonra @KafkaListener anotasyonlu sınıflar çağrılmaya başlanır.
Eğer listener'lar topic'lerin her zaman en başından başlasın istersek şöyle yaparız
2. ConsumerSeekAware  Kullanmak
Şu satırı dahil ederiz
onPartitionsAssigned metodu
Açıklaması şöyle
This method is called when Kafka topic partitions are assigned to the consumer group. It provides a map of assigned partitions along with their current offsets. The ConsumerSeekCallback parameter allows you to manually control the seeking behavior of the consumer.

Örnek - seekToBeginning()
Listener'lar topic'lerin her zaman en başından başlasın istersek kullanılabilir. Şöyle yaparız
public class FromBeginningKafkaMessageConsumer implements ConsumerSeekAware {
  public void onPartitionsAssigned(Map<TopicPartition, Long> assignments,
    ConsumerSeekCallback callback) {
    // Seek to the beginning of each assigned partition
      topicPartition -> callback.seekToBeginning(topicPartition.topic(),


  @KafkaListener(topics = "...")
  public void listen(ConsumerRecord<String, String> record) {
    String key = record.key();
    String value = record.value();
    log.info("Received key : {} value : {}", key, value);
    // Process the message as per your requirement
Örnek - seek() İle Belirtilen Offset'e Gitmek
Şöyle yaparız
public class ConsumerSeekAwareImpl implements ConsumerSeekAware {
  long offset = 777l; //your offset number
  String topic = "TopicName"; //your topic name
  String listenerId = "listenerId"; //your listener id

  //id of this listener has to be remembered for further
  //use in method getListenerContainer of KafkaListenerEndpointRegistry
  @KafkaListener(id = listenerId,
                 groupId = "groupName",
                 topics = topic)
  public void listenServiceCall(@Payload String message,
                                @Header(KafkaHeaders.OFFSET) Long offset) {

  public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, 
                                   ConsumerSeekCallback callback) {
    assignments.keySet().forEach(partition ->
      callback.seek(this.topic, partition, this.offset));

