Şeklen şöyle
Kafka normalde asenkron çalışır. Ancak Request Reply için SpringBoot destek veriyor. Açıklaması şöyle.
How Spring solves the problem is by automatically setting a correlation ID in the producer record which is returned as-is by the @SendTo annotation at the consumer end.
Request Reply için şunları yapmak gerekir.
1. ReplyingKafkaTemplate direkt kullanılır
2. Consumer tarafta @KafkaListener ve @SendTo birlikte kullanılır. @SendTo metodun döndürdüğü nesneyi belirtilen topic'e gönderir.
Producer tarafında şöyle yaparız. repliesContainer() ile ReplyingKafkaTemplate nesnesine cevapların hangi topic'ten okunacağını belirtiriz.
@Bean
public ProducerFactory<String, String> producerFactory() {
...
}
@Bean
public ReplyingKafkaTemplate<String, String, String> replyKafkaTemplate(
ProducerFactory<String, String> pf,
ConcurrentMessageListenerContainer<String, String> repliesContainer) {
return new ReplyingKafkaTemplate<>(pf, repliesContainer);
}
@Bean
public ConcurrentMessageListenerContainer<String, String> repliesContainer(
ConsumerFactory<String, String> cf) {
ContainerProperties containerProperties = new ContainerProperties("replies");
return new ConcurrentMessageListenerContainer<>(cf, containerProperties);
}Producer tarafındaki Servisi şöyle yaparız
@Service
public class RequestReplyService {
private final ReplyingKafkaTemplate<String, Request, Reply> kafkaTemplate;
public Reply process(Request request) throws ExecutionException, InterruptedException {
ProducerRecord<String, Request> record =
new ProducerRecord<>("request-topic", request);
RequestReplyFuture<String, Request, Reply> future = kafkaTemplate
.sendAndReceive(record);
// confirm if producer produced successfully
SendResult<String, Model> sendResult = future.getSendFuture().get();
// get consumer record
ConsumerRecord<String, Reply> consumerRecord = future.get();
// return consumer value
return consumerRecord.value();
}
}Consumer tarafındaki servisi şöyle yaparız
@Service
public class RequestReplyService {
@KafkaListener(topics = "request-topic")
@SendTo
public Reply process(Request request) {
String message = "Received request: " + request.getMessage();
return new Reply(message);
}
}
Hiç yorum yok:
Yorum Gönder