16 Şubat 2021 Salı

SpringCloud Stream Kullanımı - Abstraction Layer On Top Of the Messaging Middleware

Giriş
SpringCloud Stream (SCS) alt katmanda kullanılan kuyruk teknolojisini soyutlar. Açıklaması şöyle
SCS framework provides an abstraction layer for event-driven systems to communicate over asynchronous messages. It hides the underlying middlewares from the application so it can use unified programming model to implement services.
SCS Reactive programlama kullanır. Açıklaması şöyle
SCS supports the reactive programming model. It uses Reactor library which allows us to write asynchronous, non blocking, declarative code.
Hangi Messaging Middleware Desteklenir
Açıklaması şöyle
Spring maintains the binder implementations for RabbitMQ, Kafka, Kafka Streams and Amazon Kinesis while Google PubSub, Azure EventHub, Apache RocketMQ and Solace PubSub binders are maintained by the corresponding organisations.
Maven
Kafka için şu satırı dahil ederiz
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
Bileşenler
Şeklen şöyle


Default Binder Tanımlanır
Eğer projede birden fazla binder varsa ve her bir binding için binder tanımlı değilse, kullanılacak binder tanımlanır. Ayrıca custom serializer/deserializer kullanmak istiyorsak yine binder tanımlamak gerekir.

Örnek - custom serializer/deserializer
Şöyle yaparız
spring:
  cloud:
    stream:
      default-binder: kafka
      kafka:
        binder:
          auto-create-topics: true
          consumer-properties:
            key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value.deserializer: org.springframework.kafka.support.serializer
.JsonDeserializer
          producer-properties:
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
            value.serializer: org.springframework.kafka.support.serializer.JsonSerializer
Örnek - custom serializer/deserializer
Şöyle yaparız
spring:
  cloud:
    function:
      definition: consumer;producer
    stream:
      kafka:
        bindings:
          producer-out-0:
            producer:
              configuration:
                value.serializer: com.foo.MessageSerializer
          consumer-in-0:
            consumer:
              configuration:
                value.deserializer: com.foo.MessageDeSerializer
        binder:
          brokers: localhost:9092
            
      bindings:
        producer-out-0:
            destination : first-topic
            producer:
                useNativeEncoding: true # Enables using the custom serializer
        consumer-in-0:
            destination : first-topic
            consumer:
              use-native-decoding: true # Enables using the custom deserializer
Şöyle yaparız
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

public class MessageSerializer implements Serializer<Message> {

  private final ObjectMapper objectMapper = new ObjectMapper();

  @Override
  public byte[] serialize(String topic, Message data) {
    try {
      return objectMapper.writeValueAsBytes(data);
    } catch (JsonProcessingException e) {
      throw new SerializationException(e);
    }
  }
}
Şöyle yaparız
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

public class MessageDeSerializer implements Deserializer<Message> {

  private final ObjectMapper objectMapper = new ObjectMapper();

  @Override
  public Message deserialize(String topic, byte[] data) {
    try {
      return objectMapper.readValue(new String(data), Message.class);
    } catch (IOException e) {
      throw new SerializationException(e);
    }
  }
}
Eski Anotasyonlar
SpringCloud Stream ilk çıktığında anotasyon kullanıyordu. Daha sonra bu anotasyonlar bırakıldı ve "Functional Style" kullanılmaya başlandı. Deprecate edilen anotasyolar ve sınıflar şöyle
- @EnableBinding 
- @Input
@Output 
- @StreamListener + @SendTo : Processor'da kullanılır

Açıklaması şöyle
If you are used to building applications based on Spring framework, then you should be familiar with the use of annotation. You can simply tag features to classes and methods by adding annotations. For example, annotation @Streamlistener is to specify the method to be triggered by incoming messages. While annotation can still be used for development for a while, such approach is already deprecated and it is no longer recommended since Spring Cloud Stream version 3.x.

Function Definition Tanımlanır
application.properties dosyasında spring.cloud.stream.function.definition değeri altında Producer, Consumer, Processor işlevi gören bean'ler tanımlanır.

StreamBridge yazısına da bakabilirsiniz

Producer Bean
Producer Bean yazısına taşıdım

Consumer Bean
Consumer Bean yazısına taşıdım

Processor Bean
Processor Bean yazısına taşıdım






Hiç yorum yok:

Yorum Gönder