20 Aralık 2018 Perşembe

Spring Amqp RabbitMQ SimpleMessageListenerContainer Sınıfı - Client Side Code İçindir

Giriş
Şu satırı dahil ederiz. org.springframework.jms.listener.SimpleMessageListenerContainer ile aynı isme sahiptir.
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.ClassMapper;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
AbstractMessageListenerContainer sınıfından kalıtan iki sınıftan bir tanesidir. Bu sınıfın amacı kuyruktan mesaj çekerek onu listener'a geçmek. Açıklaması şöyle.
Finally, we need to define a container that has the responsibility to consume messages from the queue and forward them to some specified listener.
Eğer @RabbitListener  anotasyonu kullanıyorsak sanırım bu bean otomatik olarak yaratılıyor. Eğer elle yaratırsak setMessageListener() metodunu çağırmak gerekiyor.
Kullanım
Şöyle yaparız.
@Bean 
public SimpleMessageListenerContainer container(ConnectionFactory connection) {
  SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
  ...
  return container;
}
constructor
Şöyle yaparız.
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
setAdviceChain metodu
Örnek
Şöyle yaparız
container.setAdviceChain(new Advice[] {
  org.springframework.amqp.rabbit.config.RetryInterceptorBuilder
    .stateless()
    .maxAttempts(5)
    .backOffOptions(1000, 2, 15000)
    .build()
});
Açıklaması şöyle
With this setup, retries will happen within the message processing thread (using Thread.sleep()) without rejecting the message on each retry (so without going back to RabbitMQ for each retry). When retries are exhausted, by default a warning will be logged and the message will be consumed. If you instead want to send the failed message to a DLQ you will need either a RepublishMessageRecoverer (which publishes the failed message to a different Exchange/Queue) or a custom MessageRecoverer (which rejects the message without requeuing). In that latter case you should also set up a RabbitMQ DLQ on the queue as explained above.

Note that the above example used a stateless retry interceptor because all retry attempts happen without exiting the interceptor call, so the interceptor is called only once regardless of the number of retries. Any required state (e.g. number of attempts) is kept on the stack. If we didn’t mind for Threads to block, this would actually be a very convenient and elegant solution for the problem. However, we felt that occupying Threads waiting for retry wasn’t optimum resource consumption so we wanted the message to actually return to a Queue while waiting to be reprocessed.
setConcurrentConsumers metodu
Şöyle yaparız
container.setConcurrentConsumers(1);
setConnectionFactory metodu
ConnectionFactory nesnesi spring'in sağladığı sınıf. RabitMq kullanıyorsak şu satırı dahil ederiz.
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
Şöyle yaparız.
ConnectionFactory connectionFactory = ...;
container.setConnectionFactory(connectionFactory);
setDefaultRequeueRejected metodu
Açıklaması şöyle. Eğer true ise onMessage() içinden uncheked exception fırlatılırsa, Spring mesajı requeue eder, eğer false ise mesaj varsa Dead Letter Queue'ya gönderilir, yoksa mesaj çöp olur
When using manual acknowledgements, a message can be redelivered by RabbitMQ if it is rejected by the consumer (meaning a negative acknowledgement is sent) and the flag requeue is set to true.
Spring AMQP provides a higher level of error handling, as it allows the message listener to throw an unchecked Exception to indicate failure, and Spring will take care of rejecting the message. Spring provides the following interface for listening for messages:
public interface MessageListener {

  /**
   * Delivers a single message.
   * @param message the message.
   */
  void onMessage(Message message);
}
Note that when a message is rejected, by default Spring AMQP sets the requeue flag to true, which means that if the error is not temporary, this will result in an infinite loop of delivery — rejection. The default behaviour can be changed by calling setDefaultRequeueRejected(false) on the org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer

This will cause the rejected message to go to the configured Dead Letter Queue (DLQ) or to be discarded if no DLQ is configured. What we want however is to be able to retry the message in a controlled manner, i.e. to be executed for a maximum number of times with a specified frequency pattern.

setMaxConcurrentConsumers metodu
Şöyle yaparız.
container.setMaxConcurrentConsumers(8);
setMessageConverter metodu
Şöyle yaparız.
container.setMessageConverter(messageConverter);

@Bean
public MessageConverter messageConverter() {
  Jackson2JsonMessageConverter jackson2JsonMessageConverter = ...;
  ...
  return jackson2JsonMessageConverter;
}
setMessageListener metodu
RabitMq kullanıyorsak şu satırı dahil ederiz.
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
Örnek
Elimizde şöyle bir kod olsun.
@Bean
MessageListenerAdapter listenerAdapter(Worker worker) {
  MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(worker,
    "processMessage");
  messageListenerAdapter.setMessageConverter(messageConverter());
  return messageListenerAdapter;
}

@Bean
public Worker worker() {
  return new Worker();
}

public static class Worker {

  public void processMessage(Foo foo) {
    ...
  }
}
Şöyle yaparız.
container.setMessageListener(listenerAdapter);
Örnek
Şöyle yaparız
@Log4j2
@Component
public class Receiver {

    public void receive(String message) {
        log.info("Received: {}", message);
    }
}

@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
                                                MessageListenerAdapter listenerAdapter) {
  SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
  container.setConnectionFactory(connectionFactory);
  container.setQueueNames(QUEUE_NAME);
  container.setMessageListener(listenerAdapter);
  return container;
}

@Bean
public MessageListenerAdapter listenerAdapter(Receiver receiver) {
  return new MessageListenerAdapter(receiver, "receive");
}
Açıklaması şöyle
MessageListernerAdapter takes two arguments in the constructor. The first is a consumer that handles a message and the second is the name of the method that handles a message.
setQueueNames metodu
Şöyle yaparız.
container.setQueueNames("my.queue");

Hiç yorum yok:

Yorum Gönder