SpringCloud Stream (SCS) alt katmanda kullanılan kuyruk teknolojisini soyutlar. Açıklaması şöyle
SCS framework provides an abstraction layer for event-driven systems to communicate over asynchronous messages. It hides the underlying middlewares from the application so it can use unified programming model to implement services.
SCS Reactive programlama kullanır. Açıklaması şöyle
SCS supports the reactive programming model. It uses Reactor library which allows us to write asynchronous, non blocking, declarative code.
Hangi Messaging Middleware Desteklenir
Açıklaması şöyle
Spring maintains the binder implementations for RabbitMQ, Kafka, Kafka Streams and Amazon Kinesis while Google PubSub, Azure EventHub, Apache RocketMQ and Solace PubSub binders are maintained by the corresponding organisations.
Maven
Kafka için şu satırı dahil ederiz
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency>
Bileşenler
Şeklen şöyle
Default Binder Tanımlanır
Eğer projede birden fazla binder varsa ve her bir binding için binder tanımlı değilse, kullanılacak binder tanımlanır. Ayrıca custom serializer/deserializer kullanmak istiyorsak yine binder tanımlamak gerekir.
Örnek - custom serializer/deserializer
Şöyle yaparız
spring:cloud:stream:default-binder: kafkakafka:binder:auto-create-topics: trueconsumer-properties:key.deserializer: org.apache.kafka.common.serialization.StringDeserializervalue.deserializer: org.springframework.kafka.support.serializer
.JsonDeserializerproducer-properties:key.serializer: org.apache.kafka.common.serialization.StringSerializervalue.serializer: org.springframework.kafka.support.serializer.JsonSerializer
Örnek - custom serializer/deserializer
Şöyle yaparız
spring: cloud: function: definition: consumer;producer stream: kafka: bindings: producer-out-0: producer: configuration: value.serializer: com.foo.MessageSerializer consumer-in-0: consumer: configuration: value.deserializer: com.foo.MessageDeSerializer binder: brokers: localhost:9092 bindings: producer-out-0: destination : first-topic producer: useNativeEncoding: true # Enables using the custom serializer consumer-in-0: destination : first-topic consumer: use-native-decoding: true # Enables using the custom deserializer
Şöyle yaparız
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serializer; public class MessageSerializer implements Serializer<Message> { private final ObjectMapper objectMapper = new ObjectMapper(); @Override public byte[] serialize(String topic, Message data) { try { return objectMapper.writeValueAsBytes(data); } catch (JsonProcessingException e) { throw new SerializationException(e); } } }
Şöyle yaparız
import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Deserializer; public class MessageDeSerializer implements Deserializer<Message> { private final ObjectMapper objectMapper = new ObjectMapper(); @Override public Message deserialize(String topic, byte[] data) { try { return objectMapper.readValue(new String(data), Message.class); } catch (IOException e) { throw new SerializationException(e); } } }
Eski Anotasyonlar
SpringCloud Stream ilk çıktığında anotasyon kullanıyordu. Daha sonra bu anotasyonlar bırakıldı ve "Functional Style" kullanılmaya başlandı. Deprecate edilen anotasyolar ve sınıflar şöyle
- @EnableBinding
- @Input
- @Output
- @StreamListener + @SendTo : Processor'da kullanılır
Açıklaması şöyle
If you are used to building applications based on Spring framework, then you should be familiar with the use of annotation. You can simply tag features to classes and methods by adding annotations. For example, annotation @Streamlistener is to specify the method to be triggered by incoming messages. While annotation can still be used for development for a while, such approach is already deprecated and it is no longer recommended since Spring Cloud Stream version 3.x.
Function Definition Tanımlanır
application.properties dosyasında spring.cloud.stream.function.definition değeri altında Producer, Consumer, Processor işlevi gören bean'ler tanımlanır.
StreamBridge yazısına da bakabilirsiniz
Producer Bean
Producer Bean yazısına taşıdım
Consumer Bean
Consumer Bean yazısına taşıdım
Processor Bean
Processor Bean yazısına taşıdım
Hiç yorum yok:
Yorum Gönder