21 Ocak 2021 Perşembe

SpringBoot Amqp RabbitMQ Kullanımı - DirectExchange

Giriş
Direct Exchange'e bir kuyruk ya da birden fazla kuyruk takılabilir. 
Mesaj gönderirken kuyruk ismi routing key olarak belirtilir.

Örnek - DirectExchange  ve Tek Kuyruk
Şöyle yaparız. Burada iki tane farklı DirectExchange yaratılıyor.
@Configuration
public class RabbitConfigure {

  // build queue
  @Bean
  public Queue helloQuery() {
    return new Queue("hello");// Pass ordinary string message
  }

  @Bean
  public Queue userQuery() {
    return new Queue("user");// Pass java object message
  }
}
Mesaj göndermek için şöyle yaparız. convertAndSend() metoduna exchange ismi veriliyor.
@Component
public class HelloSender {

  @Autowired
  public RabbitTemplate rabbitTemplate;

  //sent to hello queue
  public void sendHello() {
    String message = "...";
    rabbitTemplate.convertAndSend("hello", message);
  }

  //Send to user queue
  public void sendUser() {
    User user = new User();
    ...
    rabbitTemplate.convertAndSend("user", user);
  }
}
Örnek - DirectExchange  ve Tek Kuyruk
Küçük gemilerin ana gemiye mesaj göndermesi durumunda kullanılabilir. Küçük gemi için elimizde şöyle bir kod olsun
## application.yml for the ship's application
## have to change property values for each ship
ship:
  name: rocinante
  update-freq: 1000

broker:
  exchange:
    direct:
      ship:
        name: rocinante-direct-exchange
        routing-key: __rocinante 
      station:
        name: tyco-direct-exchange 
        routing-key: __scheduled-update
    fanout:
      name: tyco-fanout-exchange
    queue:
      name: rocinante
Küçük gemi kodu şöyledir
@Component
@EnableScheduling
public class UpdateScheduler {

  @Value("${ship.name}")
  private String shipName;
  @Value("${broker.exchange.direct.station.name}")
  private String directExchange; //tyco-direct-exchange

  @Value("${broker.exchange.direct.station.routing-key}")
  private String directExchangeRoutingKey; //__scheduled-update

  private Long shipUpdateFrequency;

  @Value("${ship.update-freq}")
  private void setShipUpdateFrequency(String frequency) {
    this.shipUpdateFrequency = Long.parseLong(frequency);
  }

  @Autowired
  private final RabbitTemplate rabbitTemplate;


  @SneakyThrows
  @Scheduled(fixedDelay = 1)
  public void sendUpdates() {
    String updateMessage = shipName + ": Update at " + new Date() + " " +
ParameterFactory.getParameter();
    rabbitTemplate.convertAndSend(directExchange, directExchangeRoutingKey, updateMessage);
    Thread.sleep(shipUpdateFrequency);
  }
}
Ana gemi yaml dosyası şöyledir
## application.yml for the station's application
station:
  name: Tyco

broker:
  exchange:
    direct:
      name: tyco-direct-exchange
      routing-key: __scheduled-update
      queue:
        auto-queue: auto-queue
    fanout:
      name: tyco-fanout-exchange
Ana gemi broker configuration bean'leri şöyledir
@Configuration
public class BrokerConfiguration {
  static String directExchangeQueue;
  static String directExchange;
  static String directRoutingKey;
  @Value("${broker.exchange.direct.routing-key}")
  private void setDirectRoutingKey(String routingKey) {
    BrokerConfiguration.directRoutingKey = routingKey;
  }
  @Value("${broker.exchange.direct.name}")
  private void setDirectExchange(String exchangeName) {
    BrokerConfiguration.directExchange = exchangeName;
  }

  @Value("${broker.exchange.direct.queue.auto-queue}")
  private void setQueueName(String queueName) {
    BrokerConfiguration.directExchangeQueue = queueName;
  }
  @Bean
  DirectExchange directExchange() {
    return new DirectExchange(BrokerConfiguration.directExchange);
  }

  @Bean
  Queue directExchangeQueue() {
    return new Queue(BrokerConfiguration.directExchangeQueue);
  }

  @Bean
  Binding updateQueueBinding(Queue directExchangeQueue, DirectExchange directExchange) {
    return BindingBuilder
      .bind(directExchangeQueue)
      .to(directExchange)
      .with(BrokerConfiguration.directRoutingKey);
  }
}
Mesajları dinleyen kod şöyledir
// message listener configuration
@Configuration
public class MessageListenerConfiguration {
  @Autowired
  private final BrokerConfiguration brokerConfiguration;

  @Bean
  MessageListenerAdapter listenerAdapter(MessageHandler messageHandler) {
    return new MessageListenerAdapter(messageHandler, "receiveMessage");
  }

  @Bean
  SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
                                          MessageListenerAdapter listenerAdapter) {
    SimpleMessageListenerContainer container = new  SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(brokerConfiguration.directExchangeQueue);
    container.setMessageListener(listenerAdapter);
    return container;
  }
}

@Component
public class MessageHandler {
  // Callback method to handle the recived messages
  public void receiveMessage(String message) {
    System.out.println("> " + message);
  }
}
Örnek - DirectExchange  ve İki Kuyruk
Şöyle yaparız. Burada yine tek bir DirectExchange var ancak bu sefer iki tane queue bağlanıyor.
@Configuration
public class RabbitMqConfiguration {
  @Value("${myapp.rabbitmq.exchange-name}")
  private String exchangeName;
  @Bean
  Queue someQueue() {
    return new Queue("someQueue", false);
  }
  @Bean
  Queue anotherQueue() {
    return new Queue("anotherQueue", false);
  }
  @Bean
  public DirectExchange directExchange() {
    return new DirectExchange(exchangeName);
  }
  @Bean
  Binding someBinding(final Queue someQueue, DirectExchange exchange) {
    return BindingBuilder.bind(someQueue).to(exchange).with("some-router");
  }
  @Bean
  Binding anotherBinding(final Queue anotherQueue, DirectExchange exchange) {
    return BindingBuilder.bind(anotherQueue).to(exchange).with("another-router");
  }
}
Routing key değerine göre istenilen kuyruğa göndermek için şöyle yaparız
@Service
public class SomeJobProducer {

  private final RabbitTemplate rabbitTemplate;

  @Value("${myapp.rabbitmq.exchange-name}")
  private String exchangeName;

  public void sendToQueue(String workId) {
    rabbitTemplate.convertAndSend(exchangeName, "some-router", workId);
  }
}

@Service
public class AnotherJobProducer {

  private final RabbitTemplate rabbitTemplate;

  @Value("${myapp.rabbitmq.exchange-name}")
  private String exchangeName;

  public void sendToQueue(String workId) {
    rabbitTemplate.convertAndSend(exchangeName, "another-router", workId);
  }
}
Listener'ların doğru kuyruğu dinlemesi gerekir. Şöyle yaparız
@Service
public class SomeQueueListener {

  @RabbitListener(queues = "someQueue")
  public void handleSomeJob(String workId) {
    System.out.printf("Working on SomeQueue... %s%n", workId);
  }
}
@Service
public class AnotherQueueListener {

  @RabbitListener(queues = "anotherQueue")
  public void handleAnotherJob(String workId) {
    System.out.printf("Working on anotherQueue... %s%n", workId);
  }
}

Hiç yorum yok:

Yorum Gönder