3 Mart 2021 Çarşamba

SpringCloud Stream Producer Bean

Giriş
Producer Bean için sadece "foo-out-0" şeklinde bir topic vermek yeterli.

Örnek
Şöyle yaparız. Burada iki tane bean ismi belirtilmiş.
spring:
  cloud:
    stream:
      function:
        definition: consumeMessage;produceMessage
Producer Bean Supplier arayüzünü gerçekleştirirler. Bir Sink'e yazarlar. Producer'ın yazdığı topic'i belirtmek için şöyle yaparız. produceMessage isimli bean Kafka üzerindeki product-topic isimli kuyruğa yazacak
spring:
  cloud:
    stream:
      bindings:
        produceMessage-out-0:
          destination: product-topic
          binder: kafka
Örnek
StreamBridge ile  yazma yapılabilir.

Örnek - Reactor Sink
Elimizde şöyle bir application.yaml olsun. orderSupplier bean, order-event kuyruğuna yazar.
server:
  port: 8080
spring.cloud.stream:
  function:
    definition: orderSupplier;paymentEventConsumer;inventoryEventConsumer
  bindings:
    orderSupplier-out-0:
      destination: order-event
    paymentEventConsumer-in-0:
      destination: payment-event
    inventoryEventConsumer-in-0:
      destination: inventory-event
Kuyruğu işleyen taraf şöyledir. Processor order-event kuyruğundan okur ve payment-event kuyruğuna yazar.
spring.cloud.stream:
  function:
    definition: paymentProcessor
  bindings:
    paymentProcessor-in-0:
      destination: order-event
    paymentProcessor-out-0:
      destination: payment-event
Event gönderen tarafı şöyle yaparız. Sinks.Many Flux arayüzüne çevriliyor.
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

import java.util.function.Supplier;

@Configuration
public class OrderConfig {

  @Bean
  public Sinks.Many<OrderEvent> orderSink(){
    return Sinks.many().unicast().onBackpressureBuffer();
  }

  @Bean
  public Supplier<Flux<OrderEvent>> orderSupplier(Sinks.Many<OrderEvent> sink){
    return sink::asFlux;
  }
}
Event gönderen tarafı kullanmak için şöyle yaparızBurada Sink için Sinks.Many kullanılıyor.
@Service
public class OrderStatusPublisher {

  @Autowired
  private Sinks.Many<OrderEvent> orderSink;

  public void raiseOrderEvent(...){
    ...
    OrderEvent orderEvent = ...;
    this.orderSink.tryEmitNext(orderEvent);
  }
}
Kullanmak için şöyle yaparız
@Service
public class OrderCommandService {
    
  @Autowired
  private OrderStatusPublisher publisher;

  @Transactional
  public PurchaseOrder createOrder(OrderRequestDto orderRequestDTO){
    PurchaseOrder purchaseOrder = ...;
    this.publisher.raiseOrderEvent(purchaseOrder, OrderStatus.ORDER_CREATED);
    return purchaseOrder;
  }
}
Örnek - Reactor EmitterProcessor
Elimizde şöyle bir application.yaml dosyası olsun
spring.cloud.stream:
  function:
    definition: orderSupplier;orderProcessor
  bindings:
    orderSupplier-out-0:
      destination: order-received
      producer:
        useNativeEncoding: true
    orderProcessor-in-0:
      destination: order-received
    orderProcessor-out-0:
      destination: order-validated
  kafka:
    bindings:
      orderSupplier-out-0:
        producer:
          configuration:
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
            value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            schema.registry.url: http://localhost:8081
    streams:
      binder:
        applicationId: kafka-cqrs-command-processor
        configuration:
          schema.registry.url: http://localhost:8081
          commit.interval.ms: 100
          default:
            key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
            value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

server.port: 9001
Şöyle yaparız. Burada Sink için EmitterProcessor.create() kullanılıyor.
import org.mapstruct.factory.Mappers;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;

@RestController
public class CommandController {
  private final EmitterProcessor<Message<ReceivedOrder>> messageEmitterProcessor =
EmitterProcessor.create();

  @PostMapping(value = "/orders", consumes = MediaType.APPLICATION_JSON_VALUE)
  public ResponseEntity<Order> createOrder(@RequestBody Order order) {
    // initiate asynchronous processing of the order
    ReceivedOrder receivedOrderMessage = Mappers.getMapper(CommandMapper.class)
.getReceivedOrderMessage(order);
    messageEmitterProcessor.onNext(MessageBuilder.withPayload(receivedOrderMessage)
      .setHeader(KafkaHeaders.MESSAGE_KEY, receivedOrderMessage.getCustomerId()).build());

    // send back a response confirming the recipient of the order
    return ResponseEntity.status(HttpStatus.ACCEPTED).body(order);
  }

  @Bean
  public Supplier<Flux<Message<ReceivedOrder>>> orderSupplier() {
    return () -> messageEmitterProcessor;
  }
}

Hiç yorum yok:

Yorum Gönder