28 Mart 2023 Salı

SpringKafka Consumer MessageListenerContainer Arayüzü

Giriş
Şu satırı dahil ederiz
import org.springframework.kafka.listener.MessageListenerContainer;
stop metodu
Açıklaması şöyle. Yani amaç Kafka Consumer thread'i durdurmadan ve rebalance işlemini tetiklemeden poll() çağrısına devam etmek ama 0 kayıt çekmek.
But let’s say you trigger a non-deterministic batch job via Kafka. Usually, a Kafka consumer has its processing thread and it doesn’t allow us to use reactive or parallel programming techniques. That means every blocking call you do in consumer, will block the consumer’s thread since that will only block its thread that is not a big deal. But that will cause us to throw away reactive programming’s advantages. For example, what will we do if we want to use Kotlin coroutines in Kafka consumers?

Well, thanks to Spring Kafka we can pause the MessageListenerContainer, process the message, then resume the MessageListenerContainer:

When we paused the container, Spring continues to make poll request in the background. Simply it will poll zero records. Since we did make polling, rebalance won’t be triggered
Örnek
Şöyle yaparız.
@KafkaListener(id="assigned_listener_id", autoStartup = "false",
topics = "topic-to-listen-to")
public void listen(Message message){
  // interesting message processing logic
}

@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

//invoke this method to start the listener
public void startListener(){
  kafkaListenerEndpointRegistry.getListenerContainer("assigned_listener_id").start();
}

//invoke this method to stop the listener
public void stopListener(){
  kafkaListenerEndpointRegistry.getListenerContainer("assigned_listener_id").stop(()->{
    log.info("Listener Stopped.");
  });
}

Hiç yorum yok:

Yorum Gönder