10 Nisan 2023 Pazartesi

SpringKafka Consumer @RetryableTopic Anotasyonu - Non-Blocking Retry

Giriş
Şu satırı dahil ederiz
import org.springframework.kafka.annotation.RetryableTopic;
Açıklaması şöyle
Non-Blocking retries in Kafka are done via configuring retry topics for the main topic. An Additional Dead Letter Topic can also be configured if required. Events will be forwarded to DLT if all retries are exhausted.

Konfigürasyon için kullanılabilecek parametreler şeklen şöyle. springframework.kafka 2.7'den itibaren geliyor

Açıklaması şöyle
Spring Retryable Topics
Spring uses retryable topics to achieve non-blocking retry. Rather than retry an event from the original topic in a blocking manner, Spring Kafka instead writes the event to a separate retry topic. The event is marked as consumed from the original topic, so the next events continue to be polled and processed. Meanwhile a separate instance of the same consumer is instantiated by Spring as the consumer for the retry topic. This ensures that a single consumer instance is not polling and receiving events from both the original and a retry topic.

If the event needs to be retried multiple times, it can either be retried from the single retry topic, or it can be written to a further retry topic. The advantage of a single retry topic is that there are less topics to be dealing with. The downside is that events being retried from it will be blocking this retry topic. Alternatively any number of further retry topics can be used, ensuring each is not blocked when the event is retried. Each retry topic might have a longer back-off reflecting the need to give the system more time to be in a state that it can process the event successfully.

Once all retries are exhausted a dead letter topic can be configured to write the event to. Optionally a method in the consumer class can be annotated to consume from this topic.
Örnek
Şöyle yaparız. Burada non-blocking retry var. 3 tane retry için topic yaratılıyor. Ayrıca @Dlt ile bir dead letter topic yaratılıyor.
import org.springframework.kafka.annotation.DltHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.retrytopic.FixedDelayStrategy;
import org.springframework.kafka.retrytopic.TopicSuffixingStrategy;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.retry.annotation.Backoff;
import org.springframework.stereotype.Component;

@Slf4j
@RequiredArgsConstructor
@Component
public class UpdateItemConsumer {

  private final ItemService itemService;

  @RetryableTopic(
    attempts = "#{'${demo.retry.maxRetryAttempts}'}",
    autoCreateTopics = "#{'${demo.retry.autoCreateRetryTopics}'}",
    backoff = @Backoff(delayExpression = "#{'${demo.retry.retryIntervalMilliseconds}'}", multiplierExpression = "#{'${demo.retry.retryBackoffMultiplier}'}"),
    fixedDelayTopicStrategy = FixedDelayStrategy.MULTIPLE_TOPICS,
    include = {RetryableMessagingException.class},
    timeout = "#{'${demo.retry.maxRetryDurationMilliseconds}'}",
    topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
  @KafkaListener(topics = "#{'${demo.topics.itemUpdateTopic}'}", containerFactory = "kafkaListenerContainerFactory")
  public void listen(@Payload final String payload) {
    log.info("Update Item Consumer: Received message with payload: " + payload);
    try {
      UpdateItem event = JsonMapper.readFromJson(payload, UpdateItem.class);
      itemService.updateItem(event);
    } catch (RetryableMessagingException e) {
      // Ensure the message is retried.
      throw e;
    } catch (Exception e) {
      log.error("Update item - error processing message: " + e.getMessage());
    }
  }

  @DltHandler
  public void dlt(String data, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    log.error("Event from topic "+topic+" is dead lettered - event:" + data);
  }
}
backoff  Alanı
Örnek
Şöyle yaparız
@RetryableTopic(kafkaTemplate = "kafkaTemplate",
attempts = "4", backoff = @Backoff(delay = 3000, multiplier = 1.5, maxDelay = 15000) ) @KafkaListener(topics = ORDER_TOPIC, groupId = ORDER_STATUS_GROUP_ID_PREFIX + "#{ T(java.util.UUID).randomUUID().toString() }") @Transactional public void orderEventListener(@Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic, OrderEvent orderEvent, Acknowledgment ack) throws SocketException { log.info("Topic({}) handler receive data = {}", receivedTopic, orderEvent); try { orderEventRecordHandler.onEvent(orderEvent); if (receivedTopic.contains("retry")) { orderRecordHandler.onRequeueEvent(orderEvent); } else { orderRecordHandler.onEvent(orderEvent); } ack.acknowledge(); } catch (Exception e) { log.warn("Fail to handle event {}.", orderEvent); throw e; } }
exclude Alanı
Hangi exception olursa retry olmayacağını belirtir. Açıklaması şöyle.
In some cases, the message is definitely unprocessable (like parsing error, or invalid properties…). Then we should not waste our resources trying to consume it.

we can use the include and exclude properties to control which exception should/should not be retried 

Örnek
Şöyle yaparız
@RetryableTopic(kafkaTemplate = "kafkaTemplate",
  exclude = {DeserializationException.class,
             MessageConversionException.class,
             ConversionException.class,
             MethodArgumentResolutionException.class,
             NoSuchMethodException.class,
             ClassCastException.class},
  attempts = "4",
  backoff = @Backoff(delay = 3000, multiplier = 1.5, maxDelay = 15000)
)
include Alanı
Hangi exception olursa retry olacağını belirtir.
Örnek
Şöyle yaparız
@Slf4j
@Component
@RequiredArgsConstructor
public class CustomEventConsumer {

  private final CustomEventHandler handler;

  @RetryableTopic(attempts = "${retry.attempts}",
    backoff = @Backoff(
      delayExpression = "${retry.delay}",
      multiplierExpression = "${retry.delay.multiplier}"
    ),
    topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
    dltStrategy = FAIL_ON_ERROR,
    autoStartDltHandler = "true",
    autoCreateTopics = "false",
    include = {CustomRetryableException.class})
  @KafkaListener(topics = "${topic}", id = "${default-consumer-group:default}")
  public void consume(CustomEvent event,
                      @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    try {
      log.info("Received event on topic {}", topic);
      handler.handleEvent(event);
    } catch (Exception e) {
      log.error("Error occurred while processing event", e);
      throw e;
    }
  }

  @DltHandler
  public void listenOnDlt(@Payload CustomEvent event) {
    log.error("Received event on dlt.");
    handler.handleEventFromDlt(event);
  }
}




Hiç yorum yok:

Yorum Gönder