10 Aralık 2020 Perşembe

SpringKafka Consumer application.properties Ayarları

Basit Kullanım
Örnek
Şöyle yaparız
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=runtime-kafka-registry
Örnek
Yerel broker kullanıyorsak şöyle yaparız
spring.kafka.consumer.group-id=foo
spring.kafka.consumer.auto-offset-reset=earliest
Mesajı işleyen kod şöyledir
@KafkaListener(topics = "myTopic")
public void listen(ConsumerRecord<?, ?> cr) throws Exception {
  logger.info(cr.toString());
  latch.countDown();
}
Offset Alanı
Örnek
Şöyle yaparız
spring: kafka: bootstrap-servers: localhost:9092 consumer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer auto-offset-reset: earliest
Security Alanları
Örnek
Şöyle yaparız. security.protocol olarak sertifika kullanmadan, düz metin gönderiliyor.
spring:
  kafka:
    properties:
      security.protocol: 'PLAINTEXT'
    bootstrap-servers: kafka:9092
    consumer:
      group-id: messaging_api
      auto-offset-reset: earliest
      key-deserializer: org.springframework.kafka.support.serializer
.ErrorHandlingDeserializer
      value-deserializer: org.springframework.kafka.support.serializer
.ErrorHandlingDeserializer
      properties:
        spring.json.trusted.packages: '*'
        spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization
.StringDeserializer
        spring.deserializer.value.delegate.class: org.springframework.kafka.support
.serializer.JsonDeserializer

key-deserializer + value-deserializer Alanları

Örnek - JsonDeserializer
Spring normalde StringDeserializer kullanır. Değiştirmek için şöyle yaparız
spring.kafka.consumer.value-deserializer= org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.producer.value-serializer= org.springframework.kafka.support.serializer.JsonSerializer spring.json.add.type.headers=false spring.kafka.consumer.properties.spring.json.trusted.packages=*
Örnek - StringDeserializer
Şöyle yaparız. Burada mesajı işleyen kodda groupId alanını belirtmek gereksiz
spring.kafka.consumer.bootstrap-servers = localhost:9092
spring.kafka.consumer.group-id= group_id
spring.kafka.consumer.auto-offset-reset = earliest
spring.kafka.consumer.key-deserializer=
org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer =
org.apache.kafka.common.serialization.StringDeserializer

@KafkaListener(topics = "kafkaTopic", groupId = "group_id")
public void consume(String message) {
  logger.info(String.format("$$$$ => Consumed message: %s", message));
}
max-poll-records Alanı
Örnek
Elimizde şöyle bir ayar olsun
spring:
  kafka:
    bootstrap-servers: ${kafka_bootstrap_servers:localhost:9092}
    consumer:
      auto-offset-reset: earliest
      group-id: example
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 1
      fetch-max-wait: 36000
      enable-auto-commit: false
      client-id: example
    listener:
      poll-timeout: 1800000
      concurrency: 1
      ack-mode: manual_immediate
Açıklaması şöyle
This setup works for sequentially reading one message at a time (max-poll-records: 1) by one listener (concurrency: 1) and requires manual confirmation of message processing in listener (ack-mode: manual_immediate).
Şöyle yaparız
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
@Service
public class Consumer {

  private final Logger logger = LoggerFactory.getLogger(Producer.class);

  @KafkaListener(topics = {"INPUT_DATA"})
  public void consume(final @Payload String message,
                      final @Header(KafkaHeaders.OFFSET) Integer offset,
                      final @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
                      final @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                      final @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                      final @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts,
                      final Acknowledgment acknowledgment
  ) {
    logger.info(String.format("#### -> Consumed message -> TIMESTAMP: %d\n%s\noffset:
%d\nkey: %s\npartition: %d\ntopic: %s"
, ts, message, offset, key, partition, topic));
    acknowledgment.acknowledge();
  }
}
properties Ayarları
Örnek
Şöyle yaparız
spring.kafka.bootstrap-servers=${kafka-service:localhost}:9092

spring.kafka.consumer.group-id=payment-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.add.type.headers=false
spring.kafka.consumer.properties.spring.json.trusted.packages=*

spring.kafka.listener.ack-mode=manual_immediate


Hiç yorum yok:

Yorum Gönder