Giriş
Producer Bean için sadece "foo-out-0" şeklinde bir topic vermek yeterli.
Şöyle yaparız. Burada iki tane bean ismi belirtilmiş.
spring:cloud:stream:function:definition: consumeMessage;produceMessage
Producer Bean Supplier arayüzünü gerçekleştirirler. Bir Sink'e yazarlar. Producer'ın yazdığı topic'i belirtmek için şöyle yaparız. produceMessage isimli bean Kafka üzerindeki product-topic isimli kuyruğa yazacak
spring:cloud:stream:bindings:produceMessage-out-0:destination: product-topicbinder: kafka
Örnek
StreamBridge ile yazma yapılabilir.
Elimizde şöyle bir application.yaml olsun. orderSupplier bean, order-event kuyruğuna yazar.
server:port: 8080spring.cloud.stream:function:definition: orderSupplier;paymentEventConsumer;inventoryEventConsumerbindings:orderSupplier-out-0:destination: order-eventpaymentEventConsumer-in-0:destination: payment-eventinventoryEventConsumer-in-0:destination: inventory-event
Kuyruğu işleyen taraf şöyledir. Processor order-event kuyruğundan okur ve payment-event kuyruğuna yazar.
spring.cloud.stream:function:definition: paymentProcessorbindings:paymentProcessor-in-0:destination: order-eventpaymentProcessor-out-0:destination: payment-event
Event gönderen tarafı şöyle yaparız. Sinks.Many Flux arayüzüne çevriliyor.
import reactor.core.publisher.Flux;import reactor.core.publisher.Sinks;import java.util.function.Supplier;@Configurationpublic class OrderConfig {@Beanpublic Sinks.Many<OrderEvent> orderSink(){return Sinks.many().unicast().onBackpressureBuffer();}@Beanpublic Supplier<Flux<OrderEvent>> orderSupplier(Sinks.Many<OrderEvent> sink){return sink::asFlux;}}
Event gönderen tarafı kullanmak için şöyle yaparız. Burada Sink için Sinks.Many kullanılıyor.
@Servicepublic class OrderStatusPublisher {@Autowiredprivate Sinks.Many<OrderEvent> orderSink;public void raiseOrderEvent(...){...OrderEvent orderEvent = ...;this.orderSink.tryEmitNext(orderEvent);}}
Kullanmak için şöyle yaparız
@Servicepublic class OrderCommandService {@Autowiredprivate OrderStatusPublisher publisher;@Transactionalpublic PurchaseOrder createOrder(OrderRequestDto orderRequestDTO){PurchaseOrder purchaseOrder = ...;this.publisher.raiseOrderEvent(purchaseOrder, OrderStatus.ORDER_CREATED);return purchaseOrder;}}
Elimizde şöyle bir application.yaml dosyası olsun
spring.cloud.stream:function:definition: orderSupplier;orderProcessorbindings:orderSupplier-out-0:destination: order-receivedproducer:useNativeEncoding: trueorderProcessor-in-0:destination: order-receivedorderProcessor-out-0:destination: order-validatedkafka:bindings:orderSupplier-out-0:producer:configuration:key.serializer: org.apache.kafka.common.serialization.StringSerializervalue.serializer: io.confluent.kafka.serializers.KafkaAvroSerializerschema.registry.url: http://localhost:8081streams:binder:applicationId: kafka-cqrs-command-processorconfiguration:schema.registry.url: http://localhost:8081commit.interval.ms: 100default:key.serde: org.apache.kafka.common.serialization.Serdes$StringSerdevalue.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerdeserver.port: 9001
Şöyle yaparız. Burada Sink için EmitterProcessor.create() kullanılıyor.
import org.mapstruct.factory.Mappers;import org.springframework.kafka.support.KafkaHeaders;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import reactor.core.publisher.EmitterProcessor;import reactor.core.publisher.Flux;@RestControllerpublic class CommandController {private final EmitterProcessor<Message<ReceivedOrder>> messageEmitterProcessor =
EmitterProcessor.create();@PostMapping(value = "/orders", consumes = MediaType.APPLICATION_JSON_VALUE)public ResponseEntity<Order> createOrder(@RequestBody Order order) {// initiate asynchronous processing of the orderReceivedOrder receivedOrderMessage = Mappers.getMapper(CommandMapper.class)
.getReceivedOrderMessage(order);messageEmitterProcessor.onNext(MessageBuilder.withPayload(receivedOrderMessage).setHeader(KafkaHeaders.MESSAGE_KEY, receivedOrderMessage.getCustomerId()).build());// send back a response confirming the recipient of the orderreturn ResponseEntity.status(HttpStatus.ACCEPTED).body(order);}@Beanpublic Supplier<Flux<Message<ReceivedOrder>>> orderSupplier() {return () -> messageEmitterProcessor;}}
Hiç yorum yok:
Yorum Gönder