30 Mayıs 2023 Salı

SpringCloud Stream MessageRoutingCallback Sınıfı

Giriş
Açıklaması şöyle
CloudEvents is an open standard that provides a common format for describing event data and metadata, making it easier to share events between different systems. 
Örnek
Elimizde bir NewsEvent ve AlertEvent sınıf hiyerarşisi olsun. Producer CloudEventMessageBuilder sınıfını kullanarak şöyle gönderir
@Component
public class NewsEventProducer { private final StreamBridge streamBridge; public NewsEventProducer(StreamBridge streamBridge) { this.streamBridge = streamBridge; } ... public Message<NewsEvent> send(String key, NewsEvent newsEvent) { Message<NewsEvent> message = CloudEventMessageBuilder .withData(newsEvent) .setHeader("partitionKey", key) .build(); streamBridge.send("news-out-0", message); ... return message; } } @Component public class AlertEventProducer { private final StreamBridge streamBridge; public AlertEventProducer(StreamBridge streamBridge) { this.streamBridge = streamBridge; } ... public Message<AlertEvent> send(String key, AlertEvent alertEvent) { Message<AlertEvent> message = CloudEventMessageBuilder .withData(alertEvent) .setHeader("partitionKey", key) .build(); streamBridge.send("alert-out-0", message); return message; } }
Producer için application.properties şöyledir. alert-out-0 ve news-out-0 kanallarına yazılan mesajlar Kafka'daki news.event ve alerts.events topiclerine yazılır
spring.cloud.stream.output-bindings=news;alert
...
spring.cloud.stream.bindings.news-out-0.destination=news.events
spring.cloud.stream.bindings.alert-out-0.destination=alert.events
...
Consumer tarafında  application.properties şöyledir. Hangi topic'leri dinlemek istediğimizi belirtiriz. Kafka'daki news.event ve alerts.events topiclerini dinleriz
spring.cloud.function.definition=functionRouter
spring.cloud.stream.bindings.functionRouter-in-0.destination=news.events,alert.events
Consumer MessageRoutingCallback ile event'leri dağıtır. Şöyle yaparız. Tek yapmamız gereken appConfigurationProperties map nesnesine sınıfın fully qualified ismine karşılık gelen kanalın ismini yazmak
@Configuration
public class MessageRoutingConfig {

  private AppConfigurationProperties appConfigurationProperties;

  public MessageRoutingConfig(AppConfigurationProperties appConfigurationProperties) {
    this.appConfigurationProperties = appConfigurationProperties;
  }

  @Bean
  public MessageRoutingCallback messageRoutingCallback() {
    return new MessageRoutingCallback() {
      @Override
      public String routingResult(Message<?> message) {
        return appConfigurationProperties.getRoutingMap()
          .getOrDefault(CloudEventMessageUtils.getType(message), "unknownEvent");
      }
    };
  }

  @Bean
  public Consumer<Message<?>> unknownEvent() {
    return message -> log.warn("...", message.getHeaders(), message.getPayload());
  }
}
Gerçek consumer kodları ise şöyle
@Component
public class NewsEventConsumer {

  @Bean
  public Consumer<Message<CNNNewsCreated>> cnnNewsCreated() {
    return message -> ...;
  }

  @Bean
  public Consumer<Message<DWNewsCreated>> dwNewsCreated() {
    return message -> ...;
  }

  @Bean
  public Consumer<Message<RAINewsCreated>> raiNewsCreated() {
    return message -> ...;
  }
}

@Component
public class AlertEventConsumer {

  @Bean
  public Consumer<Message<EarthquakeAlert>> earthquakeAlert() {
    return message -> ...;
  }

  @Bean
  public Consumer<Message<WeatherAlert>> weatherAlert() {
    return message -> ...;
  }
}


Hiç yorum yok:

Yorum Gönder