Giriş
Açıklaması şöyle
Processors can forward the incoming messages towards output channels after processing it.
Açıklaması şöyle
The processor’s return type is the Function interface which has two generic parameters. The first one is the input data (reactive stream in our case) and the second one is the output.
Spring otomatik olarak processor method ismi-in + index ve method ismi-out + index şeklinde bir topic oluşturur ve processor bu "in" topic'i dinleyip çıktısını "out" topic'e yazmaya başlar.
Ancak bazen kullanmak istenilen topic ismi farklı olabilir. Bu durumda application.properties dosyasında
dinlenecek topic myprocessor-in-0 altındaki destination alanında dinlemek istediğimiz topic belirtilir.
yazılacak topic ise myprocessor-out-0 altındaki destination alanında dinlemek istediğimiz topic belirtilir.
Topic İçin Index Numarası
Açıklaması şöyle
But these topics are created with a default naming standard. They are created as javaMethodName-in-<index> and javaMethodName-out-<index> where index corresponds to the index of the application instance. So, when this app is run in local, the Exchanges will get created as convertToUppercase-in-0 and convertToUppercase-out-0. But the Producer microservice publishes the event to an Exchange named as values-topic. So, unless we override the default Exchange names created by Spring, the message sent by Producer will not be read by Processor as they’ll be sending and listening to different Exchanges.
Örnek
Şöyle yaparız
spring:cloud: function: definition: consumer;producer stream: bindings: producer-out-0: destination : first-topic consumer-in-0: destination : first-topic
Örnek
application.yaml şöyledir. processbean-in-0 ve processbean-out-0 başlıkları altında okunacak ve yazılacak topic isimleri belirtilir.
server:port: 9001spring:cloud:stream:function:definition: fizzBuzzProducer;fizzBuzzProcessor;fizzBuzzConsumerbindings:fizzBuzzProducer-out-0:destination: numbersfizzBuzzProcessor-in-0:destination: numbersfizzBuzzProcessor-out-0:destination: fizz-buzzfizzBuzzConsumer-in-0:destination: fizz-buzzkafka:binder:brokers: localhost:9092auto-create-topics: true
Processor girdi ve çıktı olarak Flux kullanan bir Function döndürür. Şöyle yaparız
@Beanpublic Function<Flux<Integer>, Flux<String>> fizzBuzzProcessor(){return longFlux -> longFlux.map(i -> evaluateFizzBuzz(i)).log();}String evaluateFizzBuzz(Integer value) {if (value % 15 == 0) {return "FizzBuzz";} else if (value % 5 == 0) {return "Buzz";} else if (value % 3 == 0) {return "Fizz";} else {return String.valueOf(value);}}
Örnek
application.properties şöyledir
spring:cloud: stream: bindings: convertToUppercase-in-0: destination: values-topic group: processor convertToUppercase-out-0: destination: uppercase-values-topic
Şöyle yaparız
import lombok.extern.slf4j.Slf4j;import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import java.util.function.Function; @Slf4j @Component public class ValueProcessor { @Bean public Function<String, String> convertToUppercase() { return (value) -> { log.info("Received {}", value); String upperCaseValue = value.toUpperCase(); log.info("Sending {}", upperCaseValue); return upperCaseValue; }; } }
Örnek
Şöyle yaparız. Burada enrichAndSendToRabbit isimli bean Kafka'daki product-topic isimli kuyruğu okur ve Rabbit'teki inventory.message.exchange isimili exchange'e yazar. Bu exchange için kullanılacak routing key değerleri de aşağıda belirtiliyor.
Açıklaması şöylespring:cloud:stream:bindings:enrichAndSendToRabbit-in-0:destination: product-topicbinder: kafkagroup: product-enrich-groupenrichAndSendToRabbit-out-0:destination: inventory.message.exchangerequiredGroups: inventory_groupbinder: rabbitrabbit:bindings:enrichAndSendToRabbit-out-0:producer:bindingRoutingKey: inventory_item_publicationrouting-key-expression: "'inventory_item_publication'"exchangeAutoDelete: falseexchangeType: direct
We have to bind both the input and the output streams to the corresponding channels. We already know how we can bind Kafka channels. RabbitMQ is similar, but brings in some special binding properties like the type of the exchange and the routing key. With the direct type the consumers will use the routing key to redirect the message from the given exchange towards the queue declared by the consumer. It is very useful in case of point-to-point communication.
Örnek
Şöyle yaparız. Rabbit üzerindeki inventory.message.exchange ve Kafka üzerindeki product-topic kuyruklarını okur
spring:cloud:stream:bindings:multiInMultiOut-in-0:group: multi_message_groupdestination: inventory.message.exchangebinder: rabbitmultiInMultiOut-in-1:destination: product-topicbinder: kafkagroup: product-multimessage-groupmultiInMultiOut-out-0:destination: multi-name-topicmultiInMultiOut-out-1:destination: multi-quantity-topicrabbit:bindings:multiInMultiOut-in-0:consumer:bindingRoutingKey: inventory_item_publicationexchangeType: direct
Örnek
application.yml dosyası şöyle olsun
Şöyle yaparızspring.cloud.stream:function:definition: orderSupplier;orderProcessorbindings:orderSupplier-out-0:destination: order-receivedproducer:useNativeEncoding: trueorderProcessor-in-0:destination: order-receivedorderProcessor-out-0:destination: order-validatedkafka:bindings:orderSupplier-out-0:producer:configuration:key.serializer: org.apache.kafka.common.serialization.StringSerializervalue.serializer: io.confluent.kafka.serializers.KafkaAvroSerializerschema.registry.url: http://localhost:8081streams:binder:applicationId: kafka-cqrs-command-processorconfiguration:schema.registry.url: http://localhost:8081commit.interval.ms: 100default:key.serde: org.apache.kafka.common.serialization.Serdes$StringSerdevalue.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerdeserver.port: 9001
import org.apache.kafka.streams.kstream.KStream; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import java.util.function.Function; @Component public class CommandProcessor { @Bean public Function<KStream<String, ReceivedOrder>, KStream<String, ValidatedOrder>>Örnek
orderProcessor() {
return receivedOrdersStream -> receivedOrdersStream .mapValues(ProcessorUtil::validateOrder); } }
application.yml dosyası şöyle olsun
spring.cloud.stream:function: definition: itemProcessor # Processors bindings: # green itemProcessor-in-0: destination: order-validated itemProcessor-out-0: destination: cheap-item-ordered itemProcessor-out-1: destination: affordable-item-ordered itemProcessor-out-2: destination: expensive-item-ordered kafka: streams: binder: applicationId: kafka-cqrs-query-processor configuration: schema.registry.url: http://localhost:8081 commit.interval.ms: 100 default: key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde server.port: 9002
Processor için aggregation işlemi yapan kod şöyle olsun
import org.apache.kafka.streams.KeyValue; import org.mapstruct.factory.Mappers; public class ProcessorUtil { public static KeyValue<String, OrderedItem> getItem(String customerId,
ValidatedOrder validatedOrder) { return new KeyValue<>(customerId, Mappers.getMapper(QueryMapper.class).getOrderedItemMessage(validatedOrder)); } public static OrderedItemsList initializeItems() { return OrderedItemsList.newBuilder().setItems(new ArrayList<>()).build(); } public static OrderedItemsList aggregateItems(String aggKey, OrderedItem newValue,
OrderedItemsList aggValue) { int index = aggValue.getItems().indexOf(newValue); if (index >= 0) { int quantity = aggValue.getItems().get(index).getQuantity(); aggValue.getItems().get(index).setQuantity(quantity + 1); } else { aggValue.getItems().add(newValue); } return aggValue; } }
Şöyle yaparız. Burada aggregation sonucu KTable nesnelerine yazılır.
import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Predicate; @Component public class QueryProcessor { public static final String ITEM_STORE_SUFFIX = "-items-store"; Predicate<String, OrderedItem> isItemCheap = (k, v) -> v.getPrice() < 5; Predicate<String, OrderedItem> isItemAffordable = (k, v) -> v.getPrice() >= 5 && v.getPrice() < 50; Predicate<String, OrderedItem> isItemExpensive = (k, v) -> v.getPrice() > 50; @Bean public Function<KStream<String, ValidatedOrder>, KStream<String, OrderedItem>[]> itemProcessor() { return validatedOrdersStream -> { // group the ordered items by price KStream<String, OrderedItem>[] orderedItemsByPriceStream = validatedOrdersStream .map(ProcessorUtil::getItem) .branch(isItemCheap, isItemAffordable, isItemExpensive); // materialize the groups items into separate state stores. // Cheap items: orderedItemsByPriceStream[0].groupByKey().aggregate( ProcessorUtil::initializeItems, ProcessorUtil::aggregateItems, Materialized.as(Price.CHEAP.label + ITEM_STORE_SUFFIX)); // Affordable items: orderedItemsByPriceStream[1].groupByKey().aggregate( ProcessorUtil::initializeItems, ProcessorUtil::aggregateItems, Materialized.as(Price.AFFORDABLE.label + ITEM_STORE_SUFFIX)); // Expensive items: orderedItemsByPriceStream[2].groupByKey().aggregate( ProcessorUtil::initializeItems, ProcessorUtil::aggregateItems, Materialized.as(Price.EXPENSIVE.label + ITEM_STORE_SUFFIX)); return orderedItemsByPriceStream; }; }
Hiç yorum yok:
Yorum Gönder