Giriş
Açıklaması şöyle
CloudEvents is an open standard that provides a common format for describing event data and metadata, making it easier to share events between different systems.
Örnek
Elimizde bir NewsEvent ve AlertEvent sınıf hiyerarşisi olsun. Producer CloudEventMessageBuilder sınıfını kullanarak şöyle gönderir.
@Componentpublic class NewsEventProducer { private final StreamBridge streamBridge; public NewsEventProducer(StreamBridge streamBridge) { this.streamBridge = streamBridge; } ... public Message<NewsEvent> send(String key, NewsEvent newsEvent) { Message<NewsEvent> message = CloudEventMessageBuilder .withData(newsEvent) .setHeader("partitionKey", key) .build(); streamBridge.send("news-out-0", message); ... return message; } } @Component public class AlertEventProducer { private final StreamBridge streamBridge; public AlertEventProducer(StreamBridge streamBridge) { this.streamBridge = streamBridge; } ... public Message<AlertEvent> send(String key, AlertEvent alertEvent) { Message<AlertEvent> message = CloudEventMessageBuilder .withData(alertEvent) .setHeader("partitionKey", key) .build(); streamBridge.send("alert-out-0", message); return message; } }
Producer için application.properties şöyledir. alert-out-0 ve news-out-0 kanallarına yazılan mesajlar Kafka'daki news.event ve alerts.events topiclerine yazılır
spring.cloud.stream.output-bindings=news;alert ... spring.cloud.stream.bindings.news-out-0.destination=news.events spring.cloud.stream.bindings.alert-out-0.destination=alert.events ...
Consumer tarafında application.properties şöyledir. Hangi topic'leri dinlemek istediğimizi belirtiriz. Kafka'daki news.event ve alerts.events topiclerini dinleriz
spring.cloud.function.definition=functionRouter spring.cloud.stream.bindings.functionRouter-in-0.destination=news.events,alert.events
Consumer MessageRoutingCallback ile event'leri dağıtır. Şöyle yaparız. Tek yapmamız gereken appConfigurationProperties map nesnesine sınıfın fully qualified ismine karşılık gelen kanalın ismini yazmak
@Configurationpublic class MessageRoutingConfig {private AppConfigurationProperties appConfigurationProperties;public MessageRoutingConfig(AppConfigurationProperties appConfigurationProperties) {this.appConfigurationProperties = appConfigurationProperties;}@Beanpublic MessageRoutingCallback messageRoutingCallback() {return new MessageRoutingCallback() {@Overridepublic String routingResult(Message<?> message) {return appConfigurationProperties.getRoutingMap().getOrDefault(CloudEventMessageUtils.getType(message), "unknownEvent");}};}@Beanpublic Consumer<Message<?>> unknownEvent() {return message -> log.warn("...", message.getHeaders(), message.getPayload());}}
Gerçek consumer kodları ise şöyle
@Component public class NewsEventConsumer { @Bean public Consumer<Message<CNNNewsCreated>> cnnNewsCreated() { return message -> ...; } @Bean public Consumer<Message<DWNewsCreated>> dwNewsCreated() { return message -> ...; } @Bean public Consumer<Message<RAINewsCreated>> raiNewsCreated() { return message -> ...; } } @Component public class AlertEventConsumer { @Bean public Consumer<Message<EarthquakeAlert>> earthquakeAlert() { return message -> ...; } @Bean public Consumer<Message<WeatherAlert>> weatherAlert() { return message -> ...; } }
Hiç yorum yok:
Yorum Gönder