Giriş
Direct Exchange'e bir kuyruk ya da birden fazla kuyruk takılabilir.
Mesaj gönderirken kuyruk ismi routing key olarak belirtilir.
Örnek - DirectExchange ve Tek Kuyruk
Şöyle yaparız. Burada iki tane farklı DirectExchange yaratılıyor.
@Configurationpublic class RabbitConfigure {// build queue@Beanpublic Queue helloQuery() {return new Queue("hello");// Pass ordinary string message}@Beanpublic Queue userQuery() {return new Queue("user");// Pass java object message}}
Mesaj göndermek için şöyle yaparız. convertAndSend() metoduna exchange ismi veriliyor.
@Componentpublic class HelloSender {@Autowiredpublic RabbitTemplate rabbitTemplate;//sent to hello queuepublic void sendHello() {String message = "...";rabbitTemplate.convertAndSend("hello", message);}//Send to user queuepublic void sendUser() {User user = new User();...rabbitTemplate.convertAndSend("user", user);}}
Örnek - DirectExchange ve Tek Kuyruk
Küçük gemilerin ana gemiye mesaj göndermesi durumunda kullanılabilir. Küçük gemi için elimizde şöyle bir kod olsun
Küçük gemi kodu şöyledir## application.yml for the ship's application## have to change property values for each shipship:name: rocinanteupdate-freq: 1000broker:exchange:direct:ship:name: rocinante-direct-exchangerouting-key: __rocinantestation:name: tyco-direct-exchangerouting-key: __scheduled-updatefanout:name: tyco-fanout-exchangequeue:name: rocinante
@Component@EnableSchedulingpublic class UpdateScheduler {@Value("${ship.name}")private String shipName;@Value("${broker.exchange.direct.station.name}")private String directExchange; //tyco-direct-exchange@Value("${broker.exchange.direct.station.routing-key}")private String directExchangeRoutingKey; //__scheduled-updateprivate Long shipUpdateFrequency;@Value("${ship.update-freq}")private void setShipUpdateFrequency(String frequency) {this.shipUpdateFrequency = Long.parseLong(frequency);}@Autowiredprivate final RabbitTemplate rabbitTemplate;@SneakyThrows@Scheduled(fixedDelay = 1)public void sendUpdates() {String updateMessage = shipName + ": Update at " + new Date() + " " +
ParameterFactory.getParameter();rabbitTemplate.convertAndSend(directExchange, directExchangeRoutingKey, updateMessage);Thread.sleep(shipUpdateFrequency);}}
Ana gemi yaml dosyası şöyledir
## application.yml for the station's applicationstation:name: Tycobroker:exchange:direct:name: tyco-direct-exchangerouting-key: __scheduled-updatequeue:auto-queue: auto-queuefanout:name: tyco-fanout-exchange
Ana gemi broker configuration bean'leri şöyledir
@Configurationpublic class BrokerConfiguration {static String directExchangeQueue;static String directExchange;static String directRoutingKey;@Value("${broker.exchange.direct.routing-key}")private void setDirectRoutingKey(String routingKey) {BrokerConfiguration.directRoutingKey = routingKey;}@Value("${broker.exchange.direct.name}")private void setDirectExchange(String exchangeName) {BrokerConfiguration.directExchange = exchangeName;}@Value("${broker.exchange.direct.queue.auto-queue}")private void setQueueName(String queueName) {BrokerConfiguration.directExchangeQueue = queueName;}@BeanDirectExchange directExchange() {return new DirectExchange(BrokerConfiguration.directExchange);}@BeanQueue directExchangeQueue() {return new Queue(BrokerConfiguration.directExchangeQueue);}@BeanBinding updateQueueBinding(Queue directExchangeQueue, DirectExchange directExchange) {return BindingBuilder.bind(directExchangeQueue).to(directExchange).with(BrokerConfiguration.directRoutingKey);}}
Mesajları dinleyen kod şöyledir
// message listener configuration@Configurationpublic class MessageListenerConfiguration {@Autowiredprivate final BrokerConfiguration brokerConfiguration;@BeanMessageListenerAdapter listenerAdapter(MessageHandler messageHandler) {return new MessageListenerAdapter(messageHandler, "receiveMessage");}@BeanSimpleMessageListenerContainer container(ConnectionFactory connectionFactory,MessageListenerAdapter listenerAdapter) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames(brokerConfiguration.directExchangeQueue);container.setMessageListener(listenerAdapter);return container;}}@Componentpublic class MessageHandler {// Callback method to handle the recived messagespublic void receiveMessage(String message) {System.out.println("> " + message);}}
Şöyle yaparız. Burada yine tek bir DirectExchange var ancak bu sefer iki tane queue bağlanıyor.
Routing key değerine göre istenilen kuyruğa göndermek için şöyle yaparız@Configurationpublic class RabbitMqConfiguration {@Value("${myapp.rabbitmq.exchange-name}")private String exchangeName;@BeanQueue someQueue() {return new Queue("someQueue", false);}@BeanQueue anotherQueue() {return new Queue("anotherQueue", false);}@Beanpublic DirectExchange directExchange() {return new DirectExchange(exchangeName);}@BeanBinding someBinding(final Queue someQueue, DirectExchange exchange) {return BindingBuilder.bind(someQueue).to(exchange).with("some-router");}@BeanBinding anotherBinding(final Queue anotherQueue, DirectExchange exchange) {return BindingBuilder.bind(anotherQueue).to(exchange).with("another-router");}}
Listener'ların doğru kuyruğu dinlemesi gerekir. Şöyle yaparız@Servicepublic class SomeJobProducer {private final RabbitTemplate rabbitTemplate;@Value("${myapp.rabbitmq.exchange-name}")private String exchangeName;public void sendToQueue(String workId) {rabbitTemplate.convertAndSend(exchangeName, "some-router", workId);}}@Servicepublic class AnotherJobProducer {private final RabbitTemplate rabbitTemplate;@Value("${myapp.rabbitmq.exchange-name}")private String exchangeName;public void sendToQueue(String workId) {rabbitTemplate.convertAndSend(exchangeName, "another-router", workId);}}
@Servicepublic class SomeQueueListener {@RabbitListener(queues = "someQueue")public void handleSomeJob(String workId) {System.out.printf("Working on SomeQueue... %s%n", workId);}}@Servicepublic class AnotherQueueListener {@RabbitListener(queues = "anotherQueue")public void handleAnotherJob(String workId) {System.out.printf("Working on anotherQueue... %s%n", workId);}}
Hiç yorum yok:
Yorum Gönder