Şeklen şöyle
Kafka normalde asenkron çalışır. Ancak Request Reply için SpringBoot destek veriyor. Açıklaması şöyle.
How Spring solves the problem is by automatically setting a correlation ID in the producer record which is returned as-is by the @SendTo annotation at the consumer end.
Request Reply için şunları yapmak gerekir.
1. ReplyingKafkaTemplate direkt kullanılır
2. Consumer tarafta @KafkaListener ve @SendTo birlikte kullanılır. @SendTo metodun döndürdüğü nesneyi belirtilen topic'e gönderir.
Producer tarafında şöyle yaparız. repliesContainer() ile ReplyingKafkaTemplate nesnesine cevapların hangi topic'ten okunacağını belirtiriz.
@Bean public ProducerFactory<String, String> producerFactory() { ... } @Bean public ReplyingKafkaTemplate<String, String, String> replyKafkaTemplate( ProducerFactory<String, String> pf, ConcurrentMessageListenerContainer<String, String> repliesContainer) { return new ReplyingKafkaTemplate<>(pf, repliesContainer); } @Bean public ConcurrentMessageListenerContainer<String, String> repliesContainer( ConsumerFactory<String, String> cf) { ContainerProperties containerProperties = new ContainerProperties("replies"); return new ConcurrentMessageListenerContainer<>(cf, containerProperties); }
Producer tarafındaki Servisi şöyle yaparız
@Service public class RequestReplyService { private final ReplyingKafkaTemplate<String, Request, Reply> kafkaTemplate; public Reply process(Request request) throws ExecutionException, InterruptedException { ProducerRecord<String, Request> record = new ProducerRecord<>("request-topic", request); RequestReplyFuture<String, Request, Reply> future = kafkaTemplate .sendAndReceive(record); // confirm if producer produced successfully SendResult<String, Model> sendResult = future.getSendFuture().get(); // get consumer record ConsumerRecord<String, Reply> consumerRecord = future.get(); // return consumer value return consumerRecord.value(); } }
Consumer tarafındaki servisi şöyle yaparız
@Service public class RequestReplyService { @KafkaListener(topics = "request-topic") @SendTo public Reply process(Request request) { String message = "Received request: " + request.getMessage(); return new Reply(message); } }
Hiç yorum yok:
Yorum Gönder