21 Mayıs 2020 Perşembe

Spring Amqp RabbitMQ @RabbitListener Anotasyonu

Giriş
Şu satırı dahil ederiz
import org.springframework.amqp.rabbit.annotation.RabbitListener;
Bu anotasyon metod üzerine konulur. Metodun bulunduğu sınıf bean olmalıdır. Yani @Service gibi bir anotasyona sahip olmalıdır. SpringBoot kullanıyorsak @EnableRabbit anotasyonuna gerek yok

concurrency Alanı
Örnek
Şöyle yaparız
@Service public class OrderProcessor { @RabbitListener(queues = "order-processing-queue", concurrency = "5") public void processOrder(Order order) { ... }
Retry

Örnek
Şöyle yaparız
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Component;

@Component
public class MessageListener {
  @RabbitListener(queues = "queue-name")
  @Retryable(value={ Exception.class }, maxAttempts = 3, backoff = @Backoff(delay = 1000))
  public void handleMessage(String message) throws Exception {
    try {
      // Process the incoming message
      if (someCondition) {
        throw new Exception("Simulated exception");
      }
      // Message processing succeeded
    } catch (Exception e) {
      // Handle the exception or log it
      throw e; // This rethrows the exception for retry
    }
  }
}

AcknowledgeMode AUTOMATIC İse
Açıklaması şöyle 
- In this mode, RabbitMQ automatically acknowledges (ack) a message as soon as it is delivered to a consumer.
- The message broker assumes that the message is successfully processed and removed from the queue as soon as it is sent to the consumer.
Örnek
Şöyle yaparız
@RabbitListener(queues = "my-queue", autoAck = "true") public void handleMessage(String message) { // Message processing logic }
AcknowledgeMode MANUAL İse
Açıklaması şöyle 
- Consumers explicitly acknowledge messages after processing them, confirming that the message was successfully handled.
- Manual acknowledgment provides more control and reliability, as the message is not removed from the queue until explicitly acknowledged.
Örnek
Şöyle yaparız
@RabbitListener(queues = "my-queue") public void handleMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { try { ... // Acknowledge the message // false indicates not to acknowledge multiple messages channel.basicAck(deliveryTag, false); } catch (Exception e) { // Handle exceptions and optionally reject the message // Requeue the message if true channel.basicNack(deliveryTag, false, true); } }
Açıklaması şöyle 
channel.basicAck is used to acknowledge the message after successful processing.
channel.basicNack can be used to reject a message (optionally with requeuing) in case of processing failure.
1. Asenkron Çalışma
Açıklaması şöyle. ListenableFuture<?> veya Mono dönmek gerekir.
The listener container factory must be configured with AcknowledgeMode.MANUAL so that the consumer thread will not ack the message; instead, the asynchronous completion will ack or nack the message when the async operation completes.
Örnek
Şöyle yaparız.
@RabbitListener
public Mono<Void> myListener(MyMessage myMessage) {
  Mono<Void> mono = myService.doSomething(myMessage);
  return mono;
}
2. Senkron Çalışma
Örnek - Mesaj
Şöyle yaparız. Burada gelen mesaj otomatik olarak Foo tipine çevriliyor
public class MyListener {

  @Autowired
  private SomeService service;

  @RabbitListener(id = "myListener", queues = "foo")
  public void listen(Foo foo) {
    this.service.process(foo);
  }

}
Örnek
Şöyle yaparız. Burada @Payload kullanılıyor
@Component
public class QueueConsumer {

  @RabbitListener(queues = {"${queue.name}"})
  public void receive(@Payload String fileBody) throws BusinessException {
    System.out.println("Message " + fileBody + LocalDateTime.now());

    if(Integer.parseInt(fileBody) == 1){
      throw new BusinessException("testing for exception");
    }
  }
}

Örnek - JSON
Bir Topic Exchange'e JSON yazarsak iki tane kuyruğa dağıtılır. Her kuyruğu dinleyen bir listener tanımlarız. Bir listener mesajı nesneye çevrilmiş olarak alır. Diğer ise saf JSON olarak alır. Şöyle yaparız
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class CustomMessageListener {

  private static final Logger log = LoggerFactory.getLogger(CustomMessageListener.class);

  //appGenericQueue
  @RabbitListener(queues = MessagingApplication.QUEUE_GENERIC_NAME)
  public void receiveMessage(final Message message) {
    log.info("Received message as generic: {}", message.toString());
  }

  //appSpecificQueue
  @RabbitListener(queues = MessagingApplication.QUEUE_SPECIFIC_NAME)
  public void receiveMessage(final CustomMessage customMessage) {
    log.info("Received message as specific class: {}", customMessage.toString());
  }
}
Örnek - Mesaj + Header
Elimizde şöyle bir kod olsun. Mesaj olarak gönderilen POJO yanında, header alanlarına da erişmek mümkün.
Message message = MessageBuilder.withBody(messageText.getBytes())
            .setMessageId("123")
            .setContentType("application/json")
            .setHeader("foo", "bar")
            .build();
Açıklaması şöyle.
Alternatively, consider using the newer, annotation-based, POJO listener, where you can get access to headers as well as the converted payload...
Şöyle yaparız
@RabbitListener(queues = "foo")
public void listen(MyPojo pojo, @Header("foo") String fooHeader) { ... }

Hiç yorum yok:

Yorum Gönder