3 Mart 2021 Çarşamba

SpringCloud Stream Processor Bean

Giriş
Açıklaması şöyle
Processors can forward the incoming messages towards output channels after processing it.
Açıklaması şöyle
The processor’s return type is the Function interface which has two generic parameters. The first one is the input data (reactive stream in our case) and the second one is the output. 
Spring otomatik olarak processor method ismi-in + index ve method ismi-out + index şeklinde bir topic oluşturur ve processor bu "in" topic'i dinleyip çıktısını "out" topic'e yazmaya başlar. 

Ancak bazen kullanmak istenilen topic ismi farklı olabilir. Bu durumda application.properties dosyasında 
dinlenecek topic myprocessor-in-0 altındaki destination alanında dinlemek istediğimiz topic belirtilir.
yazılacak topic ise myprocessor-out-0 altındaki destination alanında dinlemek istediğimiz topic belirtilir.

Topic İçin Index Numarası
Açıklaması şöyle
But these topics are created with a default naming standard. They are created as javaMethodName-in-<index> and javaMethodName-out-<index> where index corresponds to the index of the application instance. So, when this app is run in local, the Exchanges will get created as convertToUppercase-in-0 and convertToUppercase-out-0. But the Producer microservice publishes the event to an Exchange named as values-topic. So, unless we override the default Exchange names created by Spring, the message sent by Producer will not be read by Processor as they’ll be sending and listening to different Exchanges.
Örnek
Şöyle yaparız
spring:
cloud: function: definition: consumer;producer stream: bindings: producer-out-0: destination : first-topic consumer-in-0: destination : first-topic
Örnek
application.yaml şöyledir. processbean-in-0 ve processbean-out-0 başlıkları altında okunacak ve yazılacak topic isimleri belirtilir.
server:
  port: 9001

spring:
  cloud:
    stream:
      function:
        definition: fizzBuzzProducer;fizzBuzzProcessor;fizzBuzzConsumer

      bindings:
        fizzBuzzProducer-out-0:
          destination: numbers
        fizzBuzzProcessor-in-0:
          destination: numbers
        fizzBuzzProcessor-out-0:
          destination: fizz-buzz
        fizzBuzzConsumer-in-0:
          destination: fizz-buzz
      kafka:
        binder:
          brokers: localhost:9092
          auto-create-topics: true
Processor girdi ve çıktı olarak Flux kullanan bir Function döndürür. Şöyle yaparız
@Bean
public Function<Flux<Integer>, Flux<String>> fizzBuzzProcessor(){
  return longFlux -> longFlux
.map(i -> evaluateFizzBuzz(i))
.log();
}

String evaluateFizzBuzz(Integer value) {
  if (value % 15 == 0) {
    return "FizzBuzz";
  } else if (value % 5 == 0) {
    return "Buzz";
  } else if (value % 3 == 0) {
    return "Fizz";
  } else {
    return String.valueOf(value);
  }
}
Örnek
application.properties şöyledir
spring:
cloud: stream: bindings: convertToUppercase-in-0: destination: values-topic group: processor convertToUppercase-out-0: destination: uppercase-values-topic
Şöyle yaparız
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import java.util.function.Function; @Slf4j @Component public class ValueProcessor { @Bean public Function<String, String> convertToUppercase() { return (value) -> { log.info("Received {}", value); String upperCaseValue = value.toUpperCase(); log.info("Sending {}", upperCaseValue); return upperCaseValue; }; } }
Örnek
Şöyle yaparız. Burada enrichAndSendToRabbit isimli bean Kafka'daki product-topic isimli kuyruğu okur ve Rabbit'teki inventory.message.exchange isimili exchange'e yazar. Bu exchange için kullanılacak routing key değerleri de aşağıda belirtiliyor.
spring:
  cloud:
    stream:
      bindings:
        enrichAndSendToRabbit-in-0:
          destination: product-topic
          binder: kafka
          group: product-enrich-group
        enrichAndSendToRabbit-out-0:
          destination: inventory.message.exchange
          requiredGroups: inventory_group
          binder: rabbit
      rabbit:
        bindings:
          enrichAndSendToRabbit-out-0:
            producer:
              bindingRoutingKey: inventory_item_publication
              routing-key-expression: "'inventory_item_publication'"
              exchangeAutoDelete: false
              exchangeType: direct
Açıklaması şöyle
We have to bind both the input and the output streams to the corresponding channels. We already know how we can bind Kafka channels. RabbitMQ is similar, but brings in some special binding properties like the type of the exchange and the routing key. With the direct type the consumers will use the routing key to redirect the message from the given exchange towards the queue declared by the consumer. It is very useful in case of point-to-point communication.
Örnek
Şöyle yaparız. Rabbit üzerindeki inventory.message.exchange ve Kafka üzerindeki product-topic kuyruklarını okur
spring:
  cloud:
    stream:
      bindings:
        multiInMultiOut-in-0:
          group: multi_message_group
          destination: inventory.message.exchange
          binder: rabbit
        multiInMultiOut-in-1:
          destination: product-topic
          binder: kafka
          group: product-multimessage-group
        multiInMultiOut-out-0:
          destination: multi-name-topic
        multiInMultiOut-out-1:
          destination: multi-quantity-topic
      rabbit:
        bindings:
          multiInMultiOut-in-0:
            consumer:
              bindingRoutingKey: inventory_item_publication
              exchangeType: direct
Örnek
application.yml dosyası şöyle olsun
spring.cloud.stream:
  function:
    definition: orderSupplier;orderProcessor
  bindings:
    orderSupplier-out-0:
      destination: order-received
      producer:
        useNativeEncoding: true
    orderProcessor-in-0:
      destination: order-received
    orderProcessor-out-0:
      destination: order-validated
  kafka:
    bindings:
      orderSupplier-out-0:
        producer:
          configuration:
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
            value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            schema.registry.url: http://localhost:8081
    streams:
      binder:
        applicationId: kafka-cqrs-command-processor
        configuration:
          schema.registry.url: http://localhost:8081
          commit.interval.ms: 100
          default:
            key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
            value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

server.port: 9001
Şöyle yaparız
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.function.Function;

@Component
public class CommandProcessor {
  @Bean
  public Function<KStream<String, ReceivedOrder>, KStream<String, ValidatedOrder>>
orderProcessor() {
return receivedOrdersStream -> receivedOrdersStream .mapValues(ProcessorUtil::validateOrder); } }
Örnek
application.yml dosyası şöyle olsun
spring.cloud.stream:
function: definition: itemProcessor # Processors bindings: # green itemProcessor-in-0: destination: order-validated itemProcessor-out-0: destination: cheap-item-ordered itemProcessor-out-1: destination: affordable-item-ordered itemProcessor-out-2: destination: expensive-item-ordered kafka: streams: binder: applicationId: kafka-cqrs-query-processor configuration: schema.registry.url: http://localhost:8081 commit.interval.ms: 100 default: key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde server.port: 9002
Processor için aggregation işlemi yapan kod şöyle olsun
import org.apache.kafka.streams.KeyValue;
import org.mapstruct.factory.Mappers;

public class ProcessorUtil {
  
  public static KeyValue<String, OrderedItem> getItem(String customerId,
ValidatedOrder validatedOrder) { return new KeyValue<>(customerId, Mappers.getMapper(QueryMapper.class).getOrderedItemMessage(validatedOrder)); } public static OrderedItemsList initializeItems() { return OrderedItemsList.newBuilder().setItems(new ArrayList<>()).build(); } public static OrderedItemsList aggregateItems(String aggKey, OrderedItem newValue,
OrderedItemsList aggValue) { int index = aggValue.getItems().indexOf(newValue); if (index >= 0) { int quantity = aggValue.getItems().get(index).getQuantity(); aggValue.getItems().get(index).setQuantity(quantity + 1); } else { aggValue.getItems().add(newValue); } return aggValue; } }
Şöyle yaparız. Burada aggregation sonucu KTable nesnelerine yazılır.
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;

@Component
public class QueryProcessor {
  public static final String ITEM_STORE_SUFFIX = "-items-store";

  Predicate<String, OrderedItem> isItemCheap = (k, v) -> v.getPrice() < 5;
  Predicate<String, OrderedItem> isItemAffordable = (k, v) -> v.getPrice() >= 5 && v.getPrice() < 50;
  Predicate<String, OrderedItem> isItemExpensive = (k, v) -> v.getPrice() > 50;

  @Bean
  public Function<KStream<String, ValidatedOrder>, KStream<String, OrderedItem>[]> itemProcessor() {
    return validatedOrdersStream -> {
      // group the ordered items by price
      KStream<String, OrderedItem>[] orderedItemsByPriceStream = validatedOrdersStream
        .map(ProcessorUtil::getItem)
        .branch(isItemCheap, isItemAffordable, isItemExpensive);

      // materialize the groups items into separate state stores.
      // Cheap items:
      orderedItemsByPriceStream[0].groupByKey().aggregate(
        ProcessorUtil::initializeItems,
        ProcessorUtil::aggregateItems,
        Materialized.as(Price.CHEAP.label + ITEM_STORE_SUFFIX));
      // Affordable items:
      orderedItemsByPriceStream[1].groupByKey().aggregate(
        ProcessorUtil::initializeItems,
        ProcessorUtil::aggregateItems,
        Materialized.as(Price.AFFORDABLE.label + ITEM_STORE_SUFFIX));
      // Expensive items:
      orderedItemsByPriceStream[2].groupByKey().aggregate(
        ProcessorUtil::initializeItems,
        ProcessorUtil::aggregateItems,
        Materialized.as(Price.EXPENSIVE.label + ITEM_STORE_SUFFIX));

      return orderedItemsByPriceStream;
  };
}


Hiç yorum yok:

Yorum Gönder