1 Temmuz 2021 Perşembe

SpringCloud Stream Consumer Bean

Giriş
1. spring.cloud.stream.function.definition ile bean isimleri tanımlanır
2. Bu bean'ler spring.cloud.stream.bindings ile bir topic'e bağlanır. Binding name için açıklama şöyle. Yani tek girdi varsa 0 kullanmak yeterli.
... the binding name is determined by the framework based on this naming convention: <function name>-in-<index> where <index> is always 0 for most cases unless functions with multiple inputs and outputs.
Örnek - RabbitMq
Elimizde şöyle bir application.yaml olsun
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
  cloud:
    stream:
      bindings:
        updateInventory-in-0:
          destination: orderSubmitted.exchange
          group: inventory
        updateMovement-in-0:
          destination: movementEvent.exchange
          group: inventory
      function:
        definition: updateInventory;updateMovement
Şöyle yaparız
@Slf4j
@Configuration
public class MessagingFunctionConfig {
    
  @Bean
  public Consumer<Order> updateInventory() {
    return order -> log.info("Update inventory for newly submitted order - {}", 
      order.toString());
   }

   @Bean
   public Consumer<MovementEvent> updateMovement() {
     return movementEvent -> log.info("Movement event received - {}", 
       movementEvent.toString());
  }
}
Örnek - Kafka Topic
Şöyle yaparız. Burada producer, processor ve consumer bean'ler tanımlanıyor
spring:
  cloud:
    stream:
      function:
        definition: fizzBuzzProducer;fizzBuzzProcessor;fizzBuzzConsumer

      bindings:
        fizzBuzzProducer-out-0:
          destination: numbers
        fizzBuzzProcessor-in-0:
          destination: numbers
        fizzBuzzProcessor-out-0:
          destination: fizz-buzz
        fizzBuzzConsumer-in-0:
          destination: fizz-buzz
      kafka:
        binder:
          brokers: localhost:9092
          auto-create-topics: true
Tüm bean'leri tanımlamak için şöyle yaparız
@Configuration
@Slf4j public class KafkaConfiguration { @Bean public Supplier<Flux<Integer>> fizzBuzzProducer(){ return () -> Flux.interval(Duration.ofSeconds(5)) .map(value -> random.nextInt(1000 - 1) + 1) .log(); } @Bean public Function<Flux<Integer>, Flux<String>> fizzBuzzProcessor(){ return longFlux -> longFlux .map(i -> evaluateFizzBuzz(i)) .log(); } @Bean public Consumer<String> fizzBuzzConsumer(){ return (value) -> log.info("Consumer Received : " + value); } private String evaluateFizzBuzz(Integer value) { ... } }
Örnek - Kafka Topic + Kafka Group
Önce bean'ler tanımlanır. Şöyle yaparız. consumeMessage isimli bean, Kafka üzerindeki product-topic isimli kuyruğu tüketecek.
spring:
  cloud:
    stream:
      function:
        definition: consumeMessage;produceMessage
Daha sonra binding name tanımlanır. Şöyle yaparız
spring:
  cloud:
    stream:
      bindings:
        consumeMessage-in-0:
          destination: product-topic
          binder: kafka
          group: product-consumer-group
Açıklaması şöyle
The properties are similar to the producer properties but here we have to use the in keyword to indicate that we want to create an incoming channel. We set the consumer group with the help of group property. This is used by Kafka to determine the offset from where it has to continue reading after restart.
Örnek - Kafka Topic + Kafka Group
application.yml şöyledir
spring:
  cloud:
    stream:
      bindings:
        onReceive-in-0:
          destination: uppercase-values-topic
          group: consumer
Şöyle yaparız
@Slf4j
@Component
public class ValuesConsumer {

  @Bean
  public Consumer<String> onReceive() {
    return (message) -> {
      log.info("Received the value {} in Consumer", message);
    };
  }
}
Örnek - batch
Consumer için batch işlemler artık destekleniyor. Şöyle yaparız. Burada batch-mode=true yapılıyor
spring.cloud.stream.bindings.input-in-0.destination=TOPIC-NAME
spring.cloud.stream.bindings.input-in-0.group=grp
spring.cloud.stream.bindings.input-in-0.content-type=application/json

spring.cloud.stream.bindings.input-in-0.consumer.batch-mode=true
spring.cloud.stream.bindings.input-in-0.consumer-properties.max.poll.records=500
Burada artık consumer List alır. Şöyle yaparız.
@Bean
public Consumer<List<String>> input() {
  return list -> {
    System.out.println(list);
    ...
  };
}
Kullanılabilecek bazı alanların açıklaması şöyle
max.poll.records
The maximum number of records returned in a single call to poll(). Note, that max.poll.records does not impact the underlying fetching behaviour.


Hiç yorum yok:

Yorum Gönder