2 Şubat 2021 Salı

SpringCloud Stream application.properties Ayarları

Örnek
Şöyle yaparız. Burada 3 tane consumer yaratılması isteniyor.
spring.cloud.stream:
  bindings:
    reservations-input:
      content-type: application/json
      consumer.concurrency: 3
      destination: reservations-topic
      group: consumer-service-group
Örnek
Elimizde şöyle bir kod olsun
@SpringBootApplication
public class SimpleConsumerApplication {

  @Bean
  public java.util.function.Consumer<KStream<String, String>> process() {
    return input ->
      input.foreach((key, value) -> {
        System.out.println("Key: " + key + " Value: " + value);
      });
  }
}
Açıklaması şöyle
For the above processor, you can provide the topic to consumes, as follows

spring.cloud.stream.bindings.process-in-0.destination: my-input-topic

In this case, we are saying that, for the function bean (process) and its first input (in-0), it shall be bound to a Kafka topic named my-input-topic. If you don’t provide an explicit destination like this, the binder assumes that you are using a topic that is the same as the binding name (process-in-0, in this case).
Örnek
Elimizde şöyle bir kod olsun
server:
  port: 8080
spring.cloud.stream:
  function:
    definition: orderSupplier;paymentEventConsumer;inventoryEventConsumer
  bindings:
    orderSupplier-out-0:
      destination: order-event
    paymentEventConsumer-in-0:
      destination: payment-event
    inventoryEventConsumer-in-0:
      destination: inventory-event
Event okuyan tarafı şöyle yaparız
@Configuration
public class EventHandlersConfig {

  @Bean
  public Consumer<PaymentEvent> paymentEventConsumer(){
    return pe -> {...}
  }

  @Bean
  public Consumer<InventoryEvent> inventoryEventConsumer(){
    return ie -> {...};
  }
}
Event gönderen tarafı şöyle yaparız
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

import java.util.function.Supplier;

@Configuration
public class OrderConfig {

  @Bean
  public Sinks.Many<OrderEvent> orderSink(){
    return Sinks.many().unicast().onBackpressureBuffer();
  }

  @Bean
  public Supplier<Flux<OrderEvent>> orderSupplier(Sinks.Many<OrderEvent> sink){
    return sink::asFlux;
  }
}
Event gönderen tarafı kullanmak için şöyle yaparız
@Service
public class OrderStatusPublisher {

  @Autowired
  private Sinks.Many<OrderEvent> orderSink;

  public void raiseOrderEvent(...){
    ...
    OrderEvent orderEvent = ...;
    this.orderSink.tryEmitNext(orderEvent);
  }
}
Kafka İçin Deserialization
Örnek
Şöyle yaparız
spring.cloud.stream.kafka.bindings.process-in-0.consumer.configuration.value.deserializer=
org.apache.kafka.common.serialization.StringSerializer
Örnek
Açıklaması şöyle
When a project uses a basic deserializer, like StringSerializer, if the first step fails, Kafka will throw an exception and get stuck in an endless loop trying to deserialize the same message. New messages will begin to build up behind this poison message and your logs will start filling with Kafka warnings.

When the ErrorHandlingDeserializer is implemented, a message that fails to deserialize, will not cause an exception to be thrown. Kafka will instead, create a consumer record where the header is set to “springDeserializerException” and the value of the message is set to null. Kafka will pass this custom message onto the Kafka consumer and move on to the next message in the queue.

When the Kafka consumer reads a new message, it first checks the headers associated with the message. If the “springDeserializerException” header is found the message is immediately sent to the dead letter queue and the consumer moves on to the next message. This process gives the developer the ability to investigate the faulty message without negatively impacting the entire system

There are cases, where a system would want to retry specific deserialization errors and not automatically move past them. For example, when trying to connect to a schema registry during deserialization the system might receive a java.net.UnknownHostException. In this case, the system might have temporarily lost connection to the schema registry and needs a couple of seconds to recover the connection. It would then be appropriate to retry deserializing the message while the system is trying to reconnect.
Şöyle yaparız. Burada retry için örnek verilmiyor ama alıntı yapılan sayfada örnekler vardı
spring.cloud.stream.kafka.bindings.process-in-0.consumer.configuration.value.deserializer=
org.springframework.kafka.support.serializer.ErrorHandlingDeserializer

Hiç yorum yok:

Yorum Gönder