9 Ağustos 2022 Salı

SpringKafka Consumer Reactor Kafka

Giriş
Önce bir tane KafkaReceiver bean yaratmak gerekir. Şöyle yaparız
protected Map<String, Object> kafkaConsumerProperties() {
Map<String, Object> kafkaPropertiesMap = new HashMap<>(); kafkaPropertiesMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); kafkaPropertiesMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); kafkaPropertiesMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); kafkaPropertiesMap.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class); kafkaPropertiesMap.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, KafkaAvroDeserializer.class); ... return kafkaPropertiesMap; } protected ReceiverOptions<K, V> kafkaReceiverOptions() { ReceiverOptions<K, V> options = ReceiverOptions.create(kafkaConsumerProperties()); return options.pollTimeout(Duration.ofMillis(pollTimeout)) .subscription(List.of(consumerTopicName)); } @Bean KafkaReceiver<K, V> kafkaReceiver() { return KafkaReceiver.create(kafkaReceiverOptions()); }
Burada ErrorHandlingDeserializer kullanılıyor. KafkaReceiver nesnesinin receive(), receiveAtmostOnce(), receiveAutoAck(), receiveExactlyOnce() metodlarından birisini çağırmak gerekiyor. Şöyle yaparız
@EventListener(ApplicationStartedEvent.class)
public Disposable startKafkaConsumer() {
  return kafkaReceiver.receive()
    .doOnError(error -> log.error("Error receiving event, will retry", error))
    .retryWhen(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofMinutes(1)))
    .doOnNext(record -> log.debug("Received event: key {}", record.key()))
    .concatMap(this::handleEvent)
    .subscribe(record -> record.receiverOffset().acknowledge());
}
Event handling için 3 tane metod kullanılabilir. Açıklaması şöyle
Choice of Event Handling Operator and Acknowledgement
Once an event is received, it must be processed by the application and subsequently acknowledged. This sequencing provides at-least-once delivery semantics - other types of semantics (at-most-once, exactly-once) may cause the pipeline to look differently. The pattern we propose delegates the responsibility of event handling to a separate method called handleEvent, which always returns the ReceiverRecord used by the subscriber to acknowledge the offset (this method is described in detail in the next section). However, the operator that we choose to call this method has a critical impact on the behavior of the consumer. Let's analyze three different options:

flatMap - this operator applies the provided mapper function to create inner publishers to which it then subscribes eagerly. Provided that these inner publishers are non-blocking, they will be subscribed to in parallel, and the elements produced downstream are not guaranteed to preserve the order in which the original elements (the Kafka events) were received from upstream. In the case of our Kafka consumer, this means that the Kafka events will be processed in parallel, and the offsets will be committed as each event is handled and passed downstream. But whenever one offset is committed, it implicitly commits all the lower offsets. Imagine that the processing of one event finishes and its offset is committed, but later on, the processing of another event with a lower offset fails: the second event will not be re-processed since we already implicitly committed its offset. This can be problematic, especially in cases where at-least-once semantics is required, and it's an important consideration to keep in mind when deciding to use flatMap (Reactor Kafka has recently implemented an out-of-order commits feature to mitigate precisely this issue)

flatMapSequential - much like flatMap, this operator subscribes to the inner publishers eagerly; however, the difference here is that flatMapSequential will publish elements downstream in the same order in which they were originally received from upstream (this is done by delaying publishing if needed, to preserve the order). The fact that events will still be processed in parallel can come with performance benefits in scenarios where this does not impact correctness, e.g., where events refer to distinct entities and can be processed in any order. In addition, the preservation of the sequence will ensure that the offsets are committed in order and thus avoid the problem described above. Of course, deferring the offset commit also increases the risk of duplicate event processing in case the consumer crashes, which is something the application must be prepared to handle (e.g., by ensuring the event processing is idempotent)

concatMap - unlike the two previous operators, concatMap creates and subscribes to the inner publishers sequentially. This is extremely important in scenarios where the events must be processed in the exact order in which they were read from the partition.
Daha sonra event handling kodu şöyledir. Eğer hiç hata yoksa processEvent() metodu çağrılır
/*
  This method will handle the received event and then re-publish it regardless of the 
  result.
  The method must never return an error signal as that will terminate the main consumer 
  pipeline.
 */
private Mono<ReceiverRecord<String, Event>> handleEvent(
  ReceiverRecord<String, Event> record) {

  return Mono.just(record)
    .map(KafkaDeserializerUtils::extractDeserializerError)
    .<Event> handle((result, sink) -> {
      if (result.getT2().isPresent() && Objects.nonNull(result.getT1().value())) {
        // Deserialization error processing
        log.error("Deserialization error encountered", result.getT2().get());
      } else {
        // Publish the event value downstream
        sink.next(result.getT1().value());
      }
    })
    .flatMap(businessLayerService::processEvent)
    .doOnError(ex -> log.warn("Error processing event: key {}", record.key(), ex))
    .onErrorResume(ex -> Mono.empty())
    .doOnNext(record -> log.debug("Successfully processed event: key {}", record.key()))
    .then(Mono.just(record));
}


Hiç yorum yok:

Yorum Gönder