Örnek
Şöyle yaparız. Burada 3 tane consumer yaratılması isteniyor.
spring.cloud.stream: bindings: reservations-input: content-type: application/json consumer.concurrency: 3 destination: reservations-topic group: consumer-service-group
Örnek
Elimizde şöyle bir kod olsun
@SpringBootApplicationpublic class SimpleConsumerApplication {@Beanpublic java.util.function.Consumer<KStream<String, String>> process() {return input ->input.foreach((key, value) -> {System.out.println("Key: " + key + " Value: " + value);});}}
Açıklaması şöyle
For the above processor, you can provide the topic to consumes, as followsspring.cloud.stream.bindings.process-in-0.destination: my-input-topicIn this case, we are saying that, for the function bean (process) and its first input (in-0), it shall be bound to a Kafka topic named my-input-topic. If you don’t provide an explicit destination like this, the binder assumes that you are using a topic that is the same as the binding name (process-in-0, in this case).
Elimizde şöyle bir kod olsun
Event okuyan tarafı şöyle yaparızserver:port: 8080spring.cloud.stream:function:definition: orderSupplier;paymentEventConsumer;inventoryEventConsumerbindings:orderSupplier-out-0:destination: order-eventpaymentEventConsumer-in-0:destination: payment-eventinventoryEventConsumer-in-0:destination: inventory-event
Event gönderen tarafı şöyle yaparız@Configurationpublic class EventHandlersConfig {@Beanpublic Consumer<PaymentEvent> paymentEventConsumer(){return pe -> {...}}@Beanpublic Consumer<InventoryEvent> inventoryEventConsumer(){return ie -> {...};}}
import reactor.core.publisher.Flux;import reactor.core.publisher.Sinks;import java.util.function.Supplier;@Configurationpublic class OrderConfig {@Beanpublic Sinks.Many<OrderEvent> orderSink(){return Sinks.many().unicast().onBackpressureBuffer();}@Beanpublic Supplier<Flux<OrderEvent>> orderSupplier(Sinks.Many<OrderEvent> sink){return sink::asFlux;}}
Event gönderen tarafı kullanmak için şöyle yaparız
Kafka İçin Deserialization@Servicepublic class OrderStatusPublisher {@Autowiredprivate Sinks.Many<OrderEvent> orderSink;public void raiseOrderEvent(...){...OrderEvent orderEvent = ...;this.orderSink.tryEmitNext(orderEvent);}}
Örnek
Şöyle yaparız
spring.cloud.stream.kafka.bindings.process-in-0.consumer.configuration.value.deserializer= org.apache.kafka.common.serialization.StringSerializer
Örnek
Açıklaması şöyle
When a project uses a basic deserializer, like StringSerializer, if the first step fails, Kafka will throw an exception and get stuck in an endless loop trying to deserialize the same message. New messages will begin to build up behind this poison message and your logs will start filling with Kafka warnings.When the ErrorHandlingDeserializer is implemented, a message that fails to deserialize, will not cause an exception to be thrown. Kafka will instead, create a consumer record where the header is set to “springDeserializerException” and the value of the message is set to null. Kafka will pass this custom message onto the Kafka consumer and move on to the next message in the queue.When the Kafka consumer reads a new message, it first checks the headers associated with the message. If the “springDeserializerException” header is found the message is immediately sent to the dead letter queue and the consumer moves on to the next message. This process gives the developer the ability to investigate the faulty message without negatively impacting the entire systemThere are cases, where a system would want to retry specific deserialization errors and not automatically move past them. For example, when trying to connect to a schema registry during deserialization the system might receive a java.net.UnknownHostException. In this case, the system might have temporarily lost connection to the schema registry and needs a couple of seconds to recover the connection. It would then be appropriate to retry deserializing the message while the system is trying to reconnect.
Şöyle yaparız. Burada retry için örnek verilmiyor ama alıntı yapılan sayfada örnekler vardı
spring.cloud.stream.kafka.bindings.process-in-0.consumer.configuration.value.deserializer= org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
Hiç yorum yok:
Yorum Gönder