1 Ağustos 2023 Salı

SpringKafka Consumer KafkaListenerEndpoint Arayüzü - Dinamik Olarak Listener Tanımlamak İçindir

Şu satırı dahil ederiz
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
KafkaListenerEndpoint için açıklaması şöyle
The KafkaListenerEndpoint class is a class that stores information to define a kafka consumer, including information regarding the consumer id, the listened topics, the consumer group id, the consumer class, the methods that used to process messages, and so on. Because KafkaListenerEndpoint is an interface, we can use one of its implementation classes, one of them is the MethodKafkaListenerEndpoint class. This MethodKafkaListenerEndpoint class is also used to define kafka consumers when we use the @KafkaListener annotation.

MethodKafkaListenerEndpoint Sınıfı
Kalıtım şöyle
KafkaListenerEndpoint 
  MethodKafkaListenerEndpoint 

Örnek
Elimizde şöyle bir kod olsun
@Service
public class MyKafkaTemplateListener implements MessageListener<String, String> {

  @Override
  public void onMessage(ConsumerRecord<String, String> record) {
    System.out.println("RECORD PROCESSING: " + record);
  }
}
Şöyle yaparız
@Service
public class KafkaListenerCreator {
  String kafkaGroupId = "kafkaGroupId";
  String kafkaListenerId = "kafkaListenerId-";
  static AtomicLong endpointIdIndex = new AtomicLong(1);

  private KafkaListenerEndpoint createKafkaListenerEndpoint(String topic) {
    MethodKafkaListenerEndpoint<String, String> kafkaListenerEndpoint =
      createDefaultMethodKafkaListenerEndpoint(topic);
    kafkaListenerEndpoint.setBean(new MyKafkaTemplateListener());
    try {
      kafkaListenerEndpoint.setMethod(KafkaTemplateListener.class.getMethod("onMessage",
        ConsumerRecord.class));
    } catch (NoSuchMethodException e) {
      throw new RuntimeException("Attempt to call a non-existent method " + e);
    }
    return kafkaListenerEndpoint;
  }

  private MethodKafkaListenerEndpoint<String, String>
    createDefaultMethodKafkaListenerEndpoint(String topic) {
    MethodKafkaListenerEndpoint<String, String> kafkaListenerEndpoint = 
      new MethodKafkaListenerEndpoint<>();
    kafkaListenerEndpoint.setId(generateListenerId());
    kafkaListenerEndpoint.setGroupId(kafkaGroupId);
    kafkaListenerEndpoint.setAutoStartup(true);
    kafkaListenerEndpoint.setTopics(topic);
    kafkaListenerEndpoint.setMessageHandlerMethodFactory(
      new DefaultMessageHandlerMethodFactory());
    return kafkaListenerEndpoint;
  }
  private String generateListenerId() {
    return kafkaGeneralListenerEndpointId + endpointIdIndex.getAndIncrement();
  }
}
Örnek
Şöyle yaparız
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.messaging.handler.annotation.support
  .DefaultMessageHandlerMethodFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;

MethodKafkaListenerEndpoint<String, String> kafkaListenerEndpoint = 
  new MethodKafkaListenerEndpoint<>();
kafkaListenerEndpoint.setId("...");
kafkaListenerEndpoint.setGroupId("...");
kafkaListenerEndpoint.setAutoStartup(true);
kafkaListenerEndpoint.setTopics("...");
kafkaListenerEndpoint.setMessageHandlerMethodFactory(new
  DefaultMessageHandlerMethodFactory());
kafkaListenerEndpoint.setBean(new MyMessageListener());
kafkaListenerEndpoint.setMethod(MyMessageListener.class.getMethod("onMessage",
  ConsumerRecord.class));

import org.springframework.kafka.listener.MessageListener;
class MyMessageListener implements MessageListener<String, String> {

  @Override
  public void onMessage(ConsumerRecord<String, String> record) {
    ...
  }
}

Hiç yorum yok:

Yorum Gönder