6 Nisan 2020 Pazartesi

SpringKafka Consumer @KafkaListener Anotasyonu - Mesajları Dinler ve İşler

Giriş
Şu satırı dahil ederiz
import org.springframework.kafka.annotation.KafkaListener;
Bu anotasyon dışında mesajları dinlemek için başka yöntemler de var. Açıklaması şöyle. Ancak bu anotasyonu kullanmak daha kolay.
You can receive messages by configuring a MessageListenerContainer and providing a message listener or by using the @KafkaListener annotation.
Bu Anotasyon Kullanabilmek İçin Hangi Bean'ler Gerekir
Bu anotasyonun çalışması için diğer bazı ayarları yapmak gerekir. Açıklaması şöyle
This mechanism requires an @EnableKafka annotation on one of your @Configuration classes and a listener container factory, which is used to configure the underlying ConcurrentMessageListenerContainer. By default, a bean with name kafkaListenerContainerFactory is expected.
Yani DefaultKafkaConsumerFactory + ConcurrentKafkaListenerContainerFactory sınıfları gerelir. 
Örnek
Bean'lerin otomatik yaratılmasını tercih edebiliriz. Şeklen şöyle

Örnek
Şöyle yaparız
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
  @Bean
  public ConsumerFactory<String, PersonDto> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(...);
  }

  @Bean
  public ConcurrentKafkaListenerContainerFactory<String, PersonDto>
kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, PersonDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
  }
}
Bu Anotasyon Nerede Kullanılabilir?
Bu anotasyon metod veya sınıf üzerinde kullanılabilir. Metod üzerinde kullanılırsa açıklaması şöyle
The @KafkaListener annotation is used to designate a bean method as a listener for a listener container. The bean is wrapped in a MessagingMessageListenerAdapter configured with various features, such as converters to convert the data, if necessary, to match the method parameters.
Sınıf üzerinde kullanılırsa açıklaması şöyle
When you use @KafkaListener at the class-level, you must specify @KafkaHandler at the method level. When messages are delivered, the converted message payload type is used to determine which method to call.
- topics okunacak mesaj topic ismini belirtir.
- containerFactory ise bağlantı ile kullanılacak bean'i belirtir. Bağlantı bean'i KafkaListenerContainerFactory veya ConcurrentKafkaListenerContainerFactory tipindendir.
- Metod bazen @Payload anotasyonu ile işaretlidir

Manual Commit
Manual Commit yazısına taşıdım

Anotasyon Alanları Nelerdir?

autoStartup  Alanı
Eğer listener'ı sonradan başlatmak istersek KafkaListenerEndpointRegistry kullanılır. Açıklaması şöyle
setting autoStartup to “false” will cause the kafka listener to not start with the application, set it to “true” if you need it to start the listener with the app. This property defaults to “true”
Bu alan version 2.2 ile geliyor. Açıklaması şöyle
Starting with version 2.2, you can now override the container factory’s concurrency and autoStartup properties by using properties on the annotation itself. The properties can be simple values, property placeholders, or SpEL expressions.
Örnek
Şöyle yaparız
@KafkaListener(topicPattern = "${kafka.topics.project-status-changed}",
autoStartup = "${kafka.enabled}")
public void listenToProjectStatusChange(ConsumerRecord<String,
ProjectStatusChangeDto> record) {
  ...    
}
containerFactory Alanı
KafkaListenerContainerFactory arayüzünü gerçekleştiren bean'i belirtir.
Örnek
Şöyle yaparız.
@KafkaListener(topics = "TEST-TOPIC", groupId = "CONSUMER-1-GROUP",
  containerFactory = "onlineKafkaListenerContainerFactory")
public void messageListener(ConsumerRecord<String, LogDay> consumerRecord) {
//....//
}
groupId Alanı - Consumer Group İsmi

id Alanı
Örnek
application.properties şöyledir
spring: kafka: bootstrap-servers: localhost:9092 consumer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer auto-offset-reset: earliest
Şöyle yaparız
@Slf4j @Component public class ClaimReqEventListener { @Autowired private ClaimReviewService claimReviewService; @Autowired private KafkaTemplate<String, ClaimReviewResult> kafkaTemplate; @KafkaListener(id = "new-claim-handler", topics = "claim-submitted") public void handleClaimRequestEvent(ClaimRequest claimRequest) { log.info("Claim request received: {}", claimRequest); ClaimReviewResult result = claimReviewService.processClaimRequest(claimRequest); kafkaTemplate.send("claim-updated", result.getCustomerId(), result); } }
topics Alanı
Örnek
Şöyle yaparız
@KafkaListener(topics = "my-topic")
public void consume(@Payload String message) {
  logger.info(String.format("#### -> Consumed message -> %s", message));
}
Örnek
Şöyle yaparız.
@Component
public class ReportsConsumer
{
  @KafkaListener(topics = { "transactions" })
  public void listen(ConsumerRecord<ReportsBetKeyDto, ReportsBetDto> record)
  {
    System.out.println(record);
  }
}
Örnek
Şöyle yaparız.
@KafkaListener(topics = "test")
public void receive(@Payload String message){
  log.info("Message payload received: {}", message);
}
Çıktı olarak şunu alırız.
Message payload received: �contentType"text/plain"{"foo":"bar"}
Örnek
Şöyle yaparız.
@KafkaListener(topics = "test")
public void receive(Message message){
  log.info("Message payload received: {}", message.getPayload());
}
Çıktı olarak şunu alırız.
Message payload received: �contentType"text/plain"{"foo":"bar"}
topicPartitions Alanı
Açıklaması şöyle.
Let’s say you want to always read all records from all partitions (such as when using a compacted topic to load a distributed cache), it can be useful to manually assign the partitions and not use Kafka’s group management.
Örnek
Şöyle yaparız
@KafkaListener(id = "thing2", topicPartitions =
    { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
      @TopicPartition(topic = "topic2", partitions = "0",
         partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
    })
public void listen(ConsumerRecord<?, ?> record) {
  ...
}

Hiç yorum yok:

Yorum Gönder