8 Mayıs 2023 Pazartesi

SpringKafka Request Reply

Giriş
Şeklen şöyle

Kafka normalde asenkron çalışır. Ancak Request Reply için SpringBoot destek veriyor. Açıklaması şöyle.
How Spring solves the problem is by automatically setting a correlation ID in the producer record which is returned as-is by the @SendTo annotation at the consumer end.
Request Reply için şunları yapmak gerekir.
1. ReplyingKafkaTemplate direkt kullanılır 
2. Consumer tarafta @KafkaListener ve @SendTo birlikte kullanılır. @SendTo metodun döndürdüğü nesneyi belirtilen topic'e gönderir.

Producer tarafında şöyle yaparız. repliesContainer() ile ReplyingKafkaTemplate nesnesine cevapların hangi topic'ten okunacağını belirtiriz.
@Bean
public ProducerFactory<String, String> producerFactory() {
   ...
}

 
@Bean
public ReplyingKafkaTemplate<String, String, String> replyKafkaTemplate(
  ProducerFactory<String, String> pf, 
  ConcurrentMessageListenerContainer<String, String> repliesContainer) {
  return new ReplyingKafkaTemplate<>(pf, repliesContainer);
}
 
@Bean
public ConcurrentMessageListenerContainer<String, String> repliesContainer(
  ConsumerFactory<String, String> cf) {
  ContainerProperties containerProperties = new ContainerProperties("replies");
  return new ConcurrentMessageListenerContainer<>(cf, containerProperties);
}
Producer tarafındaki Servisi şöyle yaparız
@Service
public class RequestReplyService {
 
  private final ReplyingKafkaTemplate<String, Request, Reply> kafkaTemplate;
 
  public Reply process(Request request) throws ExecutionException, InterruptedException {
    ProducerRecord<String, Request> record = 
      new ProducerRecord<>("request-topic", request);
    RequestReplyFuture<String, Request, Reply> future = kafkaTemplate
      .sendAndReceive(record);

    // confirm if producer produced successfully
    SendResult<String, Model> sendResult = future.getSendFuture().get();

    // get consumer record
    ConsumerRecord<String, Reply> consumerRecord = future.get();
    // return consumer value
    return consumerRecord.value();
  }
}
Consumer tarafındaki servisi şöyle yaparız
@Service
public class RequestReplyService {
 
  @KafkaListener(topics = "request-topic")
  @SendTo
  public Reply process(Request request) {
    String message = "Received request: " + request.getMessage();
    return new Reply(message);
  }
}






Hiç yorum yok:

Yorum Gönder