22 Kasım 2020 Pazar

SpringBoot Amqp RabbitMQ Kullanımı

Giriş
RabbitMQ kullanırken hem Producer hem de Consumer tarafından durable olmayan kuyruk'ları yaratan kodlar bulunur. Bunun sebebi önce Consumer çalışırsa, kuyruk yok hatası almamaktır. Eğer kuyruk zaten mevcutsa, Consumer tarafından tekrar yaratılmaya çalışmasının bir zararı/etkisi yok. Sadece boş işlem gibi düşünülebilir.

Maven
Şu satırı dahil ederiz. Eğer reactive çalışmak istersek "reactor-rabbitmq" projesi de var.
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Önemli Sınıflar
ConnectionFactory 
RabbitAdmin 
Jackson2JsonMessageConverter
RabbitListenerConfigurer 
Queue 
Binding 

Producer İçin Önemli Sınıflar
Jackson2JsonMessageConverter
RabbitTemplate 

Consumer İçin Önemli Sınıflar
MappingJackson2MessageConverter
RabbitListenerEndpointRegistry 
DefaultMessageHandlerMethodFactory 


Ayarlar
Bağlantı ayarları için SpringBoot spring.rabbitmq Ayarları yazısına bakınız

Mesaj Göndermek
RabbitTemplate sınıfının convertAndSend() metodu kullanılır. Kullanılacak serialization yöntemini RabbitTemplate.setMessageConverter() çağrısı ile belirlemek gerekir. Eğer bir MessageConverter atanmadıysa, RabbitTemplate Java Binary Serialization yöntemini kullanır.

Mesaj Almak
@RabbitListener Anotasyonu ile mesajı alacağımız kuyruk belirtilir.

AmqpAdmin Arayüzü
Bu arayüzün tam yolu org.springframework.amqp.core.AmqpAdmin şeklinde. Kuyruk yaratmak için kullanılan arayüz. Açıklaması şöyle
The AMQP specification describes how the protocol can be used to configure queues, exchanges, and bindings on the broker. These operations (which are portable from the 0.8 specification and higher) are present in the AmqpAdmin interface in the org.springframework.amqp.core package. The RabbitMQ implementation of that class is RabbitAdmin located in the org.springframework.amqp.rabbit.core package.
Kuyruk - Queue
org.springframework.amqp.core.Queue sınıfından bir kuyruk yaratılmalıdır. Queue sınıfı direkt yaratılabileceği gibi, QueueBuilder sınıfı da kullanılabilir.

DirectExchange
DirectExchange yazısına taşıdım

Örnek - FanoutExchange
FanoutExchange yazısına taşıdım

Örnek - TopicExchange
Şöyle yaparız
@Configuration
public class TopicConfigure {

  @Bean
  public Queue topicMessageA() {
    return new Queue("topic.messageA");
  }

  @Bean
  public Queue topicMessageB() {
    return new Queue("topic.messageB");
  }

  @Bean
  public TopicExchange exchangeTopic() {
    return new TopicExchange("topicExchange");
  }

  //The matching rule set here is topic.messageA
  @Bean
  public Binding bindingExchangeMessageA(TopicExchange exchange) {
    return BindingBuilder.bind(topicMessageA()).to(exchange).with("topic.messageA");
  }
  //The matching rule here is topic.[any word]
  @Bean
  public Binding bindingExchangeMessageB(TopicExchange exchange) {
    return BindingBuilder.bind(topicMessageB()).to(exchange).with("topic.#");
  }
}
Mesaj gönder göndermek için şöyle yaparız
@Component
public class TopicSender {

  @Autowired
  public RabbitTemplate rabbitTemplate;

  public void sendTopicA() {
    String message = "...";
    rabbitTemplate.convertAndSend("topicExchange", "topic.messageA", message);
  }

  public void sendTopicB() {
    String message = "...";
    rabbitTemplate.convertAndSend("topicExchange", "topic.messageB", message);
  }
}
Örnek - TopicExchange
Eğer istenirse farklı bir exchange de kullanılabilir. Şöyle yaparız
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;

static final String topicExchangeName = "spring-boot-exchange";

static final String queueName = "spring-boot";

@Bean
Queue queue() {
  return new Queue(queueName, false);
}

@Bean
TopicExchange exchange() {
  return new TopicExchange(topicExchangeName);
}

@Bean
Binding binding(Queue queue, TopicExchange exchange) {
  return BindingBuilder.bind(queue).to(exchange).with("foo.bar.#");
}
Örnek - headersExchange
Şöyle yaparız
@Configuration
public class HeaderConfigure {

  @Bean
  public Queue headerQueue() {
    return new Queue("header");
  }

  @Bean
  public HeadersExchange headersExchange() {
    return new HeadersExchange("headersExchange");
  }

  // Here is bound to the key value pair key = color value = red
  // Of course you can bind multiple sets of key-value pairs
  @Bean
  HeadersExchangeMapBindingCreator headersBinding(HeadersExchange headersExchange) {
    Map<String, Object> map = new HashMap<>();
    map.put("color", "red");
    return BindingBuilder.bind(headerQueue()).to(headersExchange).whereAll(map);
    //return BindingBuilder.bind(headerQueue()).to(headersExchange).where("color")
// .matches("red");
  }
}
Şöyle yaparız
@Component
public class HeadersSender {

  @Autowired
  private RabbitTemplate rabbitTemplate;

  // Set the key value pair of the message header through MessageProperties
  // After conversion through SimpleMessageConverter ()
  // If you pass the java object, you need to use Jackson2JsonMessageConverter ()
  public void sendHeaders() {
    String str = "...";
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setHeader("color", "red");
    MessageConverter messageConverter = new SimpleMessageConverter();
    Message message = messageConverter.toMessage(str, messageProperties);
    rabbitTemplate.convertAndSend("headersExchange", "", message);
  }

  public void sendWrongHeaders() {
    String str = "...";
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setHeader("color", "gold");
    MessageConverter messageConverter = new SimpleMessageConverter();
    Message message = messageConverter.toMessage(str, messageProperties);
    rabbitTemplate.convertAndSend("headersExchange", "", message);
  }
}
Eğer Kuyruk Broker'da Zaten Varsa
Kuyruğu tekrar yaratmaya çalışmanın bir etkisi yok. Açıklaması şöyle
So, you should declare Queue, Exchange and Binding beans in your application context and AmqpAdmin will take care about their definition on the target Broker.

There must be a note that according AMQP protocol, if entity already exists on the Broker, the declaration is just silent and idempotent.

So, in your case you don't need to worry about queues existence and just provide their declarations as beans in the application context.

Hiç yorum yok:

Yorum Gönder