Giriş
1. spring.cloud.stream.function.definition ile bean isimleri tanımlanır
2. Bu bean'ler spring.cloud.stream.bindings ile bir topic'e bağlanır. Binding name için açıklama şöyle. Yani tek girdi varsa 0 kullanmak yeterli.
... the binding name is determined by the framework based on this naming convention: <function name>-in-<index> where <index> is always 0 for most cases unless functions with multiple inputs and outputs.
Örnek - RabbitMq
Elimizde şöyle bir application.yaml olsun
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest cloud: stream: bindings: updateInventory-in-0: destination: orderSubmitted.exchange group: inventory updateMovement-in-0: destination: movementEvent.exchange group: inventory function: definition: updateInventory;updateMovement
Şöyle yaparız
@Slf4j @Configuration public class MessagingFunctionConfig { @Bean public Consumer<Order> updateInventory() { return order -> log.info("Update inventory for newly submitted order - {}", order.toString()); } @Bean public Consumer<MovementEvent> updateMovement() { return movementEvent -> log.info("Movement event received - {}", movementEvent.toString()); } }
Örnek - Kafka Topic
Şöyle yaparız. Burada producer, processor ve consumer bean'ler tanımlanıyor
spring: cloud: stream: function: definition: fizzBuzzProducer;fizzBuzzProcessor;fizzBuzzConsumer bindings: fizzBuzzProducer-out-0: destination: numbers fizzBuzzProcessor-in-0: destination: numbers fizzBuzzProcessor-out-0: destination: fizz-buzz fizzBuzzConsumer-in-0: destination: fizz-buzz kafka: binder: brokers: localhost:9092 auto-create-topics: true
Tüm bean'leri tanımlamak için şöyle yaparız
@Configuration@Slf4j public class KafkaConfiguration { @Bean public Supplier<Flux<Integer>> fizzBuzzProducer(){ return () -> Flux.interval(Duration.ofSeconds(5)) .map(value -> random.nextInt(1000 - 1) + 1) .log(); } @Bean public Function<Flux<Integer>, Flux<String>> fizzBuzzProcessor(){ return longFlux -> longFlux .map(i -> evaluateFizzBuzz(i)) .log(); } @Bean public Consumer<String> fizzBuzzConsumer(){ return (value) -> log.info("Consumer Received : " + value); } private String evaluateFizzBuzz(Integer value) { ... } }
Örnek - Kafka Topic + Kafka Group
Önce bean'ler tanımlanır. Şöyle yaparız. consumeMessage isimli bean, Kafka üzerindeki product-topic isimli kuyruğu tüketecek.
spring:cloud:stream:function:definition: consumeMessage;produceMessage
Daha sonra binding name tanımlanır. Şöyle yaparız.
spring:cloud:stream:bindings:consumeMessage-in-0:destination: product-topicbinder: kafkagroup: product-consumer-group
Açıklaması şöyle
The properties are similar to the producer properties but here we have to use the in keyword to indicate that we want to create an incoming channel. We set the consumer group with the help of group property. This is used by Kafka to determine the offset from where it has to continue reading after restart.
Örnek - Kafka Topic + Kafka Group
application.yml şöyledir
spring:cloud:stream:bindings:onReceive-in-0:destination: uppercase-values-topicgroup: consumer
Şöyle yaparız
Örnek - batch@Slf4j@Componentpublic class ValuesConsumer {@Beanpublic Consumer<String> onReceive() {return (message) -> {log.info("Received the value {} in Consumer", message);};}}
Consumer için batch işlemler artık destekleniyor. Şöyle yaparız. Burada batch-mode=true yapılıyor
spring.cloud.stream.bindings.input-in-0.destination=TOPIC-NAME spring.cloud.stream.bindings.input-in-0.group=grp spring.cloud.stream.bindings.input-in-0.content-type=application/json spring.cloud.stream.bindings.input-in-0.consumer.batch-mode=true spring.cloud.stream.bindings.input-in-0.consumer-properties.max.poll.records=500
Burada artık consumer List alır. Şöyle yaparız.
@Bean public Consumer<List<String>> input() { return list -> { System.out.println(list); ... }; }
Kullanılabilecek bazı alanların açıklaması şöyle
max.poll.recordsThe maximum number of records returned in a single call to poll(). Note, that max.poll.records does not impact the underlying fetching behaviour.
Hiç yorum yok:
Yorum Gönder