12 Mayıs 2023 Cuma

SpringKafka Consumer Topicleri Sürekli En Baştan Okumak

Giriş
Şu satırı dahil ederiz
import org.springframework.kafka.listener.ConsumerSeekAware;
Topicleri sürekli en baştan okumak için 2 tane yöntem var

1. enable-auto-commit=false Yapmak
Kafka'ya bağlandıktan sonra @KafkaListener anotasyonlu sınıflar çağrılmaya başlanır.
Eğer listener'lar topic'lerin her zaman en başından başlasın istersek şöyle yaparız
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
2. ConsumerSeekAware  Kullanmak
Şu satırı dahil ederiz
import org.springframework.kafka.listener.ConsumerSeekAware;
onPartitionsAssigned metodu
Açıklaması şöyle
This method is called when Kafka topic partitions are assigned to the consumer group. It provides a map of assigned partitions along with their current offsets. The ConsumerSeekCallback parameter allows you to manually control the seeking behavior of the consumer.

Örnek - seekToBeginning()
Listener'lar topic'lerin her zaman en başından başlasın istersek kullanılabilir. Şöyle yaparız
@Component
@Slf4j
public class FromBeginningKafkaMessageConsumer implements ConsumerSeekAware {
  @Override
  public void onPartitionsAssigned(Map<TopicPartition, Long> assignments,
    ConsumerSeekCallback callback) {
    // Seek to the beginning of each assigned partition
    assignments.keySet().forEach(
      topicPartition -> callback.seekToBeginning(topicPartition.topic(),
                                                 topicPartition.partition()));

  }

  @KafkaListener(topics = "...")
  public void listen(ConsumerRecord<String, String> record) {
    String key = record.key();
    String value = record.value();
    log.info("Received key : {} value : {}", key, value);
    // Process the message as per your requirement
  }
}
Örnek - seek() İle Belirtilen Offset'e Gitmek
Şöyle yaparız
@Service
public class ConsumerSeekAwareImpl implements ConsumerSeekAware {
  long offset = 777l; //your offset number
  String topic = "TopicName"; //your topic name
  String listenerId = "listenerId"; //your listener id

  //id of this listener has to be remembered for further
  //use in method getListenerContainer of KafkaListenerEndpointRegistry
  @KafkaListener(id = listenerId,
                 groupId = "groupName",
                 topics = topic)
  public void listenServiceCall(@Payload String message,
                                @Header(KafkaHeaders.OFFSET) Long offset) {
    ...
  }

  @Override
  public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, 
                                   ConsumerSeekCallback callback) {
    assignments.keySet().forEach(partition ->
      callback.seek(this.topic, partition, this.offset));
    }
  }
}


Hiç yorum yok:

Yorum Gönder