Giriş
Şu satırı dahil ederiz
import org.springframework.kafka.listener.ConsumerSeekAware;
Topicleri sürekli en baştan okumak için 2 tane yöntem var
1. enable-auto-commit=false Yapmak
Kafka'ya bağlandıktan sonra @KafkaListener anotasyonlu sınıflar çağrılmaya başlanır.
Eğer listener'lar topic'lerin her zaman en başından başlasın istersek şöyle yaparız.
Eğer listener'lar topic'lerin her zaman en başından başlasın istersek şöyle yaparız.
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
2. ConsumerSeekAware Kullanmak
Şu satırı dahil ederiz
import org.springframework.kafka.listener.ConsumerSeekAware;
onPartitionsAssigned metodu
Açıklaması şöyle
This method is called when Kafka topic partitions are assigned to the consumer group. It provides a map of assigned partitions along with their current offsets. The ConsumerSeekCallback parameter allows you to manually control the seeking behavior of the consumer.
Örnek - seekToBeginning()
Listener'lar topic'lerin her zaman en başından başlasın istersek kullanılabilir. Şöyle yaparız
@Component @Slf4j public class FromBeginningKafkaMessageConsumer implements ConsumerSeekAware { @Override public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) { // Seek to the beginning of each assigned partition assignments.keySet().forEach( topicPartition -> callback.seekToBeginning(topicPartition.topic(), topicPartition.partition())); } @KafkaListener(topics = "...") public void listen(ConsumerRecord<String, String> record) { String key = record.key(); String value = record.value(); log.info("Received key : {} value : {}", key, value); // Process the message as per your requirement } }
Örnek - seek() İle Belirtilen Offset'e Gitmek
Şöyle yaparız
@Service
public class ConsumerSeekAwareImpl implements ConsumerSeekAware {
long offset = 777l; //your offset number
String topic = "TopicName"; //your topic name
String listenerId = "listenerId"; //your listener id
//id of this listener has to be remembered for further
//use in method getListenerContainer of KafkaListenerEndpointRegistry
@KafkaListener(id = listenerId,
groupId = "groupName",
topics = topic)
public void listenServiceCall(@Payload String message,
@Header(KafkaHeaders.OFFSET) Long offset) {
...
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments,
ConsumerSeekCallback callback) {
assignments.keySet().forEach(partition ->
callback.seek(this.topic, partition, this.offset));
}
}
}
Hiç yorum yok:
Yorum Gönder