Giriş
Şu satırı dahil ederiz
Şu satırı dahil ederiz
import org.springframework.kafka.core.KafkaTemplate;
Açıklaması şöyle.
Spring for Apache Kafka brings the familiar Spring programming model to Kafka. It provides the KafkaTemplate for publishing records and a listener container for asynchronous execution of POJO listeners. Spring Boot auto-configuration wires up much of the infrastructure so that you can concentrate on your business logic.
Farklı topic'lere farklı mesaj tipleri göndermek için, o mesaj tipine mahsus KafkaTemplate yaratmak gerekir. Açıklaması şöyle
The KakfkaTemplate is generic for coding convenience. But if your different topics deals with different data types, you really should consider to have a separate type-safe KakfkaTemplate for that purpose....So, yes, for different data types you need separate KafkaProducers and, therefore, KakfkaTemplate. Even if they look into the same Kafka Broker.
Maven
Şu satırı dahil ederiz
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
Kullanım
En basit kullanımda @EnableKafka tanımlanır. Daha sonra send() metodu çağrılır. Topic ismi ve mesaj geçilir. Şeklen şöyle
Örnekapplication.properties şöyledir
Tanımlama
Örnek - XML
spring: kafka: bootstrap-servers: localhost:9092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
Göndermek için şöyle yaparız. Burada partition key değerine göre belirlenir.
@Autowired KafkaTemplate<String, ClaimRequest> kafkaTemplate; void sendNewClaimMessage() throws ExecutionException, InterruptedException { ClaimRequest request = ... kafkaTemplate.send("claim-submitted", request.getCustomerId(), request).get(); }
Örnek
Şöyle yaparız. Burada partition round robin olarak belirlenir.
@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate; String kafkaTopic = "Java topic"; public void send(String message) { kafkaTemplate.send(kafkaTopic, message); }
Örnek - XML
Şöyle yaparız.
<bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg ref="producerFactory"/>
</bean>
<bean id="producerFactory"
class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value-type="java.lang.String"
value="${spring.kafka.bootstrap-servers}" />
<entry key="key.serializer" value-type="java.lang.Class"
value="org.apache.kafka.common.serialization.StringSerializer" />
<entry key="value.serializer" value-type="java.lang.Class"
value="org.apache.kafka.common.serialization.StringSerializer" />
</map>
</constructor-arg>
</bean>
Örnek - Kodla
DefaultKafkaProducerFactory tanımlamak gerekir. Şöyle yaparız@Configurationpublic class KafkaProducerConfig {@Beanpublic ProducerFactory producerFactory(KafkaProperties kafkaProperties) {HashMap<String, Object> properties = new HashMap<>();properties.putAll(kafkaProperties.buildProducerProperties());properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");StringSerializer keySerializer = new StringSerializer();StringSerializer valueSerializer = new StringSerializer();return new DefaultKafkaProducerFactory<>(properties, keySerializer, valueSerializer);}@Beanpublic KafkaTemplate kafkaTemplate(ProducerFactory producerFactory) {KafkaTemplate kafkaTemplate = new KafkaTemplate<>(producerFactory);return kafkaTemplate;}}
constructor - DefaultKafkaProducerFactory
Bu sınıfı kullanabilmek için DefaultKafkaProducerFactory bean'i yaratmak gerekir. Mesajı okuma için @KafkaListener anotasyonu kullanıldığından, bu sınıf ile okuma işlemi yapılamaz.
Örnek
Şöyle yaparız
@Configurationpublic class KafkaProducerConfig {@Beanpublic ProducerFactory<String, PersonDto> producerFactory() {return new DefaultKafkaProducerFactory<>(...);}@Beanpublic KafkaTemplate<String, PersonDto> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}}
send metodu - topicName + message
Açıklaması şöyle
The send() method is asynchronous. When called it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency.
Örnek
Şöyle yaparız.
Şöyle yaparız.
String message = "Dummy Message: ";
String topic = "Dummy_Topic";
kafkaTemplate.send(topic,message);
send metodu - topicName + message ve ListenableFuture
Burada org.springframework.util.concurrent.ListenableFuture ve org.springframework.util.concurrent.ListenableFutureCallback kullanılıyor
Açıklaması şöyle
The send API returns a ListenableFuture object. If we want to block the sending thread and get the result about the sent message, we can call the get API of the ListenableFuture object. The thread will wait for the result, but it will slow down the producer.
Örnek
Kafka callback metodunda onSuccess() ve onFailure() metodları var. Şöyle yaparız
public void send(String topic, String message) {
ListenableFuture<SendResult<String, String>> future=kafkaTemplate.send(topic,message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> message) {
LOGGER.info( message + " with offset= " + message.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable throwable) {
LOGGER.error("unable to send message= " + message, throwable);
}
});
}
ÖrnekElimizde şöyle bir producer olsun
@Component
@RequiredArgsConstructor
public class SampleProducerWithCallback {
private final KafkaTemplate<String, String> kafkaTemplate;
private final ListenableFutureCallback<SendResult<String, String>> callback;
public void send(Message<?> message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(message);
future.addCallback(customProducerCallback);
}
}
Callback için şöyle yaparız
public class CustomProducerCallback implements
ListenableFutureCallback<SendResult<String, String>> {
@Override
public void onFailure(Throwable ex) {
System.out.println("Callback - send error: " + ex.getMessage());
}
@Override
public void onSuccess(SendResult<String, String> result) {
RecordMetadata recordMetadata = result.getRecordMetadata();
System.out.println("Callback - sent to: " + recordMetadata.topic() + "-"
+ recordMetadata.partition() + "-" + recordMetadata.offset());
}
}
Birim testi için şöyle yaparızclass SampleProducerWithCallbackTests {
@Mock
private ListenableFutureCallback<SendResult<String, String>> mockProducerCallback;
@Mock
private KafkaTemplate<String, String> mockKafkaTemplate;
private SampleProducerWithCallback producer;
private ListenableFuture mockFuture;
private RecordMetadata recordMetadata;
private SendResult<String, String> mockSendResult;
private Message<String> message;
@BeforeEach
void setUp() {
producer = new SampleProducerWithCallback(mockKafkaTemplate, mockProducerCallback);
mockFuture = mock(ListenableFuture.class);
recordMetadata = new RecordMetadata(new TopicPartition("topic", 0), 1L, 0, 0L, 0, 0);
mockSendResult = mock(SendResult.class);
when(mockSendResult.getRecordMetadata())
.thenReturn(recordMetadata);
message = MessageBuilder.withPayload("test")
.setHeader(KafkaHeaders.TOPIC, "topic")
.build();
}
...
}
Birim testi için şöyle yaparız. mockKafkaTemplate.send() çağrılınca mockFuture dönülür.@Test
void test_onSuccess_isCalled() {
when(mockKafkaTemplate.send(message))
.thenReturn(mockFuture);
doAnswer(invocationOnMock -> {
ListenableFutureCallback callback = invocationOnMock.getArgument(0);
callback.onSuccess(mockSendResult);
return null;
}).when(mockFuture).addCallback(any(ListenableFutureCallback.class));
producer.send(message);
verify(mockProducerCallback).onSuccess(mockSendResult);
}
@Test
void test_onFailure_isCalled() {
RuntimeException ex = new RuntimeException("error");
when(mockKafkaTemplate.send(message))
.thenReturn(mockFuture);
doAnswer(invocationOnMock -> {
ListenableFutureCallback callback = invocationOnMock.getArgument(0);
callback.onFailure(ex);
return null;
}).when(mockFuture).addCallback(any(ListenableFutureCallback.class));
producer.send(message);
verify(mockProducerCallback).onFailure(ex);
}
Hiç yorum yok:
Yorum Gönder