Giriş
Şu satırı dahil ederiz
import org.springframework.amqp.rabbit.annotation.RabbitListener;
Bu anotasyon metod üzerine konulur. Metodun bulunduğu sınıf bean olmalıdır. Yani @Service gibi bir anotasyona sahip olmalıdır. SpringBoot kullanıyorsak @EnableRabbit anotasyonuna gerek yok
concurrency Alanı
Örnek
Şöyle yaparız
@Service public class OrderProcessor { @RabbitListener(queues = "order-processing-queue", concurrency = "5") public void processOrder(Order order) { ... }
Retry
Örnek
Şöyle yaparız
import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.retry.annotation.Backoff;import org.springframework.retry.annotation.Retryable;import org.springframework.stereotype.Component;@Componentpublic class MessageListener {@RabbitListener(queues = "queue-name")@Retryable(value={ Exception.class }, maxAttempts = 3, backoff = @Backoff(delay = 1000))public void handleMessage(String message) throws Exception {try {// Process the incoming messageif (someCondition) {throw new Exception("Simulated exception");}// Message processing succeeded} catch (Exception e) {// Handle the exception or log itthrow e; // This rethrows the exception for retry}}}
AcknowledgeMode AUTOMATIC İse
Açıklaması şöyle
- In this mode, RabbitMQ automatically acknowledges (ack) a message as soon as it is delivered to a consumer.- The message broker assumes that the message is successfully processed and removed from the queue as soon as it is sent to the consumer.
Örnek
Şöyle yaparız
@RabbitListener(queues = "my-queue", autoAck = "true") public void handleMessage(String message) { // Message processing logic }
AcknowledgeMode MANUAL İse
Açıklaması şöyle
- Consumers explicitly acknowledge messages after processing them, confirming that the message was successfully handled.- Manual acknowledgment provides more control and reliability, as the message is not removed from the queue until explicitly acknowledged.
Örnek
Şöyle yaparız
@RabbitListener(queues = "my-queue") public void handleMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { try { ... // Acknowledge the message // false indicates not to acknowledge multiple messages channel.basicAck(deliveryTag, false); } catch (Exception e) { // Handle exceptions and optionally reject the message // Requeue the message if true channel.basicNack(deliveryTag, false, true); } }
Açıklaması şöyle
channel.basicAck is used to acknowledge the message after successful processing.channel.basicNack can be used to reject a message (optionally with requeuing) in case of processing failure.
1. Asenkron Çalışma
Açıklaması şöyle. ListenableFuture<?> veya Mono dönmek gerekir.
Şöyle yaparız.
Örnek - Mesaj
Şöyle yaparız. Burada gelen mesaj otomatik olarak Foo tipine çevriliyor
Elimizde şöyle bir kod olsun. Mesaj olarak gönderilen POJO yanında, header alanlarına da erişmek mümkün.
Açıklaması şöyle. ListenableFuture<?> veya Mono dönmek gerekir.
ÖrnekThe listener container factory must be configured with AcknowledgeMode.MANUAL so that the consumer thread will not ack the message; instead, the asynchronous completion will ack or nack the message when the async operation completes.
Şöyle yaparız.
@RabbitListener
public Mono<Void> myListener(MyMessage myMessage) {
Mono<Void> mono = myService.doSomething(myMessage);
return mono;
}
2. Senkron ÇalışmaÖrnek - Mesaj
Şöyle yaparız. Burada gelen mesaj otomatik olarak Foo tipine çevriliyor
public class MyListener {
@Autowired
private SomeService service;
@RabbitListener(id = "myListener", queues = "foo")
public void listen(Foo foo) {
this.service.process(foo);
}
}
Örnek
Şöyle yaparız. Burada @Payload kullanılıyor
@Component
public class QueueConsumer {
@RabbitListener(queues = {"${queue.name}"})
public void receive(@Payload String fileBody) throws BusinessException {
System.out.println("Message " + fileBody + LocalDateTime.now());
if(Integer.parseInt(fileBody) == 1){
throw new BusinessException("testing for exception");
}
}
}
Örnek - JSON
Bir Topic Exchange'e JSON yazarsak iki tane kuyruğa dağıtılır. Her kuyruğu dinleyen bir listener tanımlarız. Bir listener mesajı nesneye çevrilmiş olarak alır. Diğer ise saf JSON olarak alır. Şöyle yaparız
import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Service;@Servicepublic class CustomMessageListener {private static final Logger log = LoggerFactory.getLogger(CustomMessageListener.class);//appGenericQueue@RabbitListener(queues = MessagingApplication.QUEUE_GENERIC_NAME)public void receiveMessage(final Message message) {log.info("Received message as generic: {}", message.toString());}//appSpecificQueue@RabbitListener(queues = MessagingApplication.QUEUE_SPECIFIC_NAME)public void receiveMessage(final CustomMessage customMessage) {log.info("Received message as specific class: {}", customMessage.toString());}}
Örnek - Mesaj + Header
Message message = MessageBuilder.withBody(messageText.getBytes())
.setMessageId("123")
.setContentType("application/json")
.setHeader("foo", "bar")
.build();
Açıklaması şöyle.Alternatively, consider using the newer, annotation-based, POJO listener, where you can get access to headers as well as the converted payload...Şöyle yaparız
@RabbitListener(queues = "foo")
public void listen(MyPojo pojo, @Header("foo") String fooHeader) { ... }
Hiç yorum yok:
Yorum Gönder