Giriş
Şu satırı dahil ederiz
Şu satırı dahil ederiz
import org.springframework.kafka.annotation.KafkaListener;
Bu anotasyon dışında mesajları dinlemek için başka yöntemler de var. Açıklaması şöyle. Ancak bu anotasyonu kullanmak daha kolay.
You can receive messages by configuring a MessageListenerContainer and providing a message listener or by using the @KafkaListener annotation.
Bu Anotasyon Kullanabilmek İçin Hangi Bean'ler Gerekir
Bu anotasyonun çalışması için diğer bazı ayarları yapmak gerekir. Açıklaması şöyleThis mechanism requires an @EnableKafka annotation on one of your @Configuration classes and a listener container factory, which is used to configure the underlying ConcurrentMessageListenerContainer. By default, a bean with name kafkaListenerContainerFactory is expected.
Yani DefaultKafkaConsumerFactory + ConcurrentKafkaListenerContainerFactory sınıfları gerelir.
Örnek
Bean'lerin otomatik yaratılmasını tercih edebiliriz. Şeklen şöyle
Örnek
Şöyle yaparız
@EnableKafka@Configurationpublic class KafkaConsumerConfig {@Beanpublic ConsumerFactory<String, PersonDto> consumerFactory() {return new DefaultKafkaConsumerFactory<>(...);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, PersonDto>
kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, PersonDto> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());return factory;}}
Bu Anotasyon Nerede Kullanılabilir?
Bu anotasyon metod veya sınıf üzerinde kullanılabilir. Metod üzerinde kullanılırsa açıklaması şöyleThe @KafkaListener annotation is used to designate a bean method as a listener for a listener container. The bean is wrapped in a MessagingMessageListenerAdapter configured with various features, such as converters to convert the data, if necessary, to match the method parameters.Sınıf üzerinde kullanılırsa açıklaması şöyle
When you use @KafkaListener at the class-level, you must specify @KafkaHandler at the method level. When messages are delivered, the converted message payload type is used to determine which method to call.- topics okunacak mesaj topic ismini belirtir.
- containerFactory ise bağlantı ile kullanılacak bean'i belirtir. Bağlantı bean'i KafkaListenerContainerFactory veya ConcurrentKafkaListenerContainerFactory tipindendir.
- Metod bazen @Payload anotasyonu ile işaretlidir
Manual Commit
Manual Commit yazısına taşıdım
Anotasyon Alanları Nelerdir?
autoStartup Alanı
Eğer listener'ı sonradan başlatmak istersek KafkaListenerEndpointRegistry kullanılır. Açıklaması şöyle
setting autoStartup to “false” will cause the kafka listener to not start with the application, set it to “true” if you need it to start the listener with the app. This property defaults to “true”
Bu alan version 2.2 ile geliyor. Açıklaması şöyle
Starting with version 2.2, you can now override the container factory’s concurrency and autoStartup properties by using properties on the annotation itself. The properties can be simple values, property placeholders, or SpEL expressions.
Örnek
Şöyle yaparız
@KafkaListener(topicPattern = "${kafka.topics.project-status-changed}",
autoStartup = "${kafka.enabled}")public void listenToProjectStatusChange(ConsumerRecord<String,
ProjectStatusChangeDto> record) {...}
containerFactory Alanı
KafkaListenerContainerFactory arayüzünü gerçekleştiren bean'i belirtir.Örnek
Şöyle yaparız.
@KafkaListener(topics = "TEST-TOPIC", groupId = "CONSUMER-1-GROUP",
containerFactory = "onlineKafkaListenerContainerFactory")
public void messageListener(ConsumerRecord<String, LogDay> consumerRecord) {
//....//
}
groupId Alanı - Consumer Group İsmi
@KafkaListener Anotasyonu groupId Alanı yazısına taşıdım
Örnek
application.properties şöyledir
spring: kafka: bootstrap-servers: localhost:9092 consumer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer auto-offset-reset: earliest
Şöyle yaparız
@Slf4j @Component public class ClaimReqEventListener { @Autowired private ClaimReviewService claimReviewService; @Autowired private KafkaTemplate<String, ClaimReviewResult> kafkaTemplate; @KafkaListener(id = "new-claim-handler", topics = "claim-submitted") public void handleClaimRequestEvent(ClaimRequest claimRequest) { log.info("Claim request received: {}", claimRequest); ClaimReviewResult result = claimReviewService.processClaimRequest(claimRequest); kafkaTemplate.send("claim-updated", result.getCustomerId(), result); } }
Örnek
Şöyle yaparız
@KafkaListener(topics = "my-topic")
public void consume(@Payload String message) {
logger.info(String.format("#### -> Consumed message -> %s", message));
}
ÖrnekŞöyle yaparız.
@Component
public class ReportsConsumer
{
@KafkaListener(topics = { "transactions" })
public void listen(ConsumerRecord<ReportsBetKeyDto, ReportsBetDto> record)
{
System.out.println(record);
}
}
ÖrnekŞöyle yaparız.
@KafkaListener(topics = "test")
public void receive(@Payload String message){
log.info("Message payload received: {}", message);
}
Çıktı olarak şunu alırız.Message payload received: �contentType"text/plain"{"foo":"bar"}
ÖrnekŞöyle yaparız.
@KafkaListener(topics = "test")
public void receive(Message message){
log.info("Message payload received: {}", message.getPayload());
}
Çıktı olarak şunu alırız.Message payload received: �contentType"text/plain"{"foo":"bar"}
topicPartitions Alanı
Açıklaması şöyle.Let’s say you want to always read all records from all partitions (such as when using a compacted topic to load a distributed cache), it can be useful to manually assign the partitions and not use Kafka’s group management.Örnek
Şöyle yaparız
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
Hiç yorum yok:
Yorum Gönder