3 Mart 2020 Salı

SpringKafka Producer KafkaTemplate Sınıfı - Sadece Mesaj Gönderir

Giriş
Şu satırı dahil ederiz
import org.springframework.kafka.core.KafkaTemplate;
Açıklaması şöyle
Spring for Apache Kafka brings the familiar Spring programming model to Kafka. It provides the KafkaTemplate for publishing records and a listener container for asynchronous execution of POJO listeners. Spring Boot auto-configuration wires up much of the infrastructure so that you can concentrate on your business logic.
Farklı topic'lere farklı mesaj tipleri göndermek için, o mesaj tipine mahsus KafkaTemplate yaratmak gerekir. Açıklaması şöyle
The KakfkaTemplate is generic for coding convenience. But if your different topics deals with different data types, you really should consider to have a separate type-safe KakfkaTemplate for that purpose.
...
So, yes, for different data types you need separate KafkaProducers and, therefore, KakfkaTemplate. Even if they look into the same Kafka Broker.
Maven
Şu satırı dahil ederiz
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>
Kullanım
En basit kullanımda @EnableKafka tanımlanır. Daha sonra send() metodu çağrılır. Topic ismi ve mesaj geçilir. Şeklen şöyle
Örnek
application.properties şöyledir
spring: kafka: bootstrap-servers: localhost:9092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
Göndermek için şöyle yaparız. Burada partition key değerine göre belirlenir.
@Autowired KafkaTemplate<String, ClaimRequest> kafkaTemplate; void sendNewClaimMessage() throws ExecutionException, InterruptedException { ClaimRequest request = ... kafkaTemplate.send("claim-submitted", request.getCustomerId(), request).get(); }
Örnek
Şöyle yaparız. Burada partition round robin olarak belirlenir.
@Autowired
private KafkaTemplate<String, String> kafkaTemplate; String kafkaTopic = "Java topic"; public void send(String message) { kafkaTemplate.send(kafkaTopic, message); }
Tanımlama
Örnek - XML
Şöyle yaparız.
<bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
  <constructor-arg ref="producerFactory"/>
</bean>

<bean id="producerFactory"
 class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
  <constructor-arg>
     <map>
      <entry key="bootstrap.servers" value-type="java.lang.String"
             value="${spring.kafka.bootstrap-servers}" />
      <entry key="key.serializer" value-type="java.lang.Class"
             value="org.apache.kafka.common.serialization.StringSerializer" />
      <entry key="value.serializer" value-type="java.lang.Class"
             value="org.apache.kafka.common.serialization.StringSerializer" />
    </map>
  </constructor-arg>
</bean>
Örnek - Kodla
DefaultKafkaProducerFactory tanımlamak gerekir. Şöyle yaparız
@Configuration
public class KafkaProducerConfig {
    
  @Bean
  public ProducerFactory producerFactory(KafkaProperties kafkaProperties) {
    HashMap<String, Object> properties = new HashMap<>();
    properties.putAll(kafkaProperties.buildProducerProperties());
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    StringSerializer keySerializer = new StringSerializer();
    StringSerializer valueSerializer = new StringSerializer();
    return new DefaultKafkaProducerFactory<>(properties, keySerializer, valueSerializer);
  }
    
  @Bean
  public KafkaTemplate kafkaTemplate(ProducerFactory producerFactory) {
    KafkaTemplate kafkaTemplate = new KafkaTemplate<>(producerFactory);
    return kafkaTemplate;
  }
}
constructor - DefaultKafkaProducerFactory
Bu sınıfı kullanabilmek için DefaultKafkaProducerFactory bean'i yaratmak gerekir. Mesajı okuma için  @KafkaListener anotasyonu kullanıldığından, bu sınıf ile okuma işlemi yapılamaz.
Örnek
Şöyle yaparız
@Configuration
public class KafkaProducerConfig {

  @Bean
  public ProducerFactory<String, PersonDto> producerFactory() {
    return new DefaultKafkaProducerFactory<>(...);
  }

  @Bean
  public KafkaTemplate<String, PersonDto> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
  }
}
send metodu - topicName + message
Açıklaması şöyle
The send() method is asynchronous. When called it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency.
Örnek
Şöyle yaparız.
String message = "Dummy Message: ";
String topic = "Dummy_Topic";
kafkaTemplate.send(topic,message);
send metodu - topicName + message ve ListenableFuture
Burada org.springframework.util.concurrent.ListenableFuture ve org.springframework.util.concurrent.ListenableFutureCallback kullanılıyor
Açıklaması şöyle
The send API returns a ListenableFuture object. If we want to block the sending thread and get the result about the sent message, we can call the get API of the ListenableFuture object. The thread will wait for the result, but it will slow down the producer.
Örnek 
Kafka callback metodunda onSuccess() ve onFailure() metodları var. Şöyle yaparız
public void send(String topic, String message) {
  ListenableFuture<SendResult<String, String>> future=kafkaTemplate.send(topic,message);
  future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

    @Override
    public void onSuccess(SendResult<String, String> message) {
      LOGGER.info( message + " with offset= " + message.getRecordMetadata().offset());
    }

    @Override
    public void onFailure(Throwable throwable) {
      LOGGER.error("unable to send message= " + message, throwable);
    }
  });
}
Örnek
Elimizde şöyle bir producer olsun
@Component
@RequiredArgsConstructor
public class SampleProducerWithCallback {

  private final KafkaTemplate<String, String> kafkaTemplate;
  private final ListenableFutureCallback<SendResult<String, String>> callback;

  public void send(Message<?> message) {
    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(message);
    future.addCallback(customProducerCallback);
  }
}
Callback için şöyle yaparız
public class CustomProducerCallback implements 
  ListenableFutureCallback<SendResult<String, String>> {

  @Override
  public void onFailure(Throwable ex) {
    System.out.println("Callback - send error: " + ex.getMessage());
  }

  @Override
  public void onSuccess(SendResult<String, String> result) {
    RecordMetadata recordMetadata = result.getRecordMetadata();
    System.out.println("Callback - sent to: " + recordMetadata.topic() + "-" 
      + recordMetadata.partition() + "-" + recordMetadata.offset());
  }
}
Birim testi için şöyle yaparız
class SampleProducerWithCallbackTests {

  @Mock
  private ListenableFutureCallback<SendResult<String, String>> mockProducerCallback;

  @Mock
  private KafkaTemplate<String, String> mockKafkaTemplate;
  private SampleProducerWithCallback producer;

  private ListenableFuture mockFuture;
  private RecordMetadata recordMetadata;
  private SendResult<String, String> mockSendResult;

  private Message<String> message;

  @BeforeEach
  void setUp() {
   producer = new SampleProducerWithCallback(mockKafkaTemplate, mockProducerCallback);

    mockFuture = mock(ListenableFuture.class);
    recordMetadata = new RecordMetadata(new TopicPartition("topic", 0), 1L, 0, 0L, 0, 0);

    mockSendResult = mock(SendResult.class);
    when(mockSendResult.getRecordMetadata())
                .thenReturn(recordMetadata);

    message = MessageBuilder.withPayload("test")
                .setHeader(KafkaHeaders.TOPIC, "topic")
                .build();
  }
  ... 
}
Birim testi için şöyle yaparız. mockKafkaTemplate.send() çağrılınca mockFuture dönülür.
@Test
void test_onSuccess_isCalled() {
  when(mockKafkaTemplate.send(message))
    .thenReturn(mockFuture);
  doAnswer(invocationOnMock -> {
    ListenableFutureCallback callback = invocationOnMock.getArgument(0);
    callback.onSuccess(mockSendResult);
    return null;
  }).when(mockFuture).addCallback(any(ListenableFutureCallback.class));

  producer.send(message);

  verify(mockProducerCallback).onSuccess(mockSendResult);
}

@Test
void test_onFailure_isCalled() {
  RuntimeException ex = new RuntimeException("error");
  when(mockKafkaTemplate.send(message))
    .thenReturn(mockFuture);
  doAnswer(invocationOnMock -> {
    ListenableFutureCallback callback = invocationOnMock.getArgument(0);
    callback.onFailure(ex);
    return null;
  }).when(mockFuture).addCallback(any(ListenableFutureCallback.class));

  producer.send(message);

  verify(mockProducerCallback).onFailure(ex);
}




Hiç yorum yok:

Yorum Gönder