Basit Kullanım
Örnek
Şöyle yaparız.
spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=runtime-kafka-registry
Örnek
Yerel broker kullanıyorsak şöyle yaparız
Mesajı işleyen kod şöyledirspring.kafka.consumer.group-id=foospring.kafka.consumer.auto-offset-reset=earliest
@KafkaListener(topics = "myTopic")public void listen(ConsumerRecord<?, ?> cr) throws Exception {logger.info(cr.toString());latch.countDown();}
Offset Alanı
Örnek
Şöyle yaparız
spring: kafka: bootstrap-servers: localhost:9092 consumer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer auto-offset-reset: earliest
Security Alanları
Örnek
Şöyle yaparız. security.protocol olarak sertifika kullanmadan, düz metin gönderiliyor.
spring:kafka:properties:security.protocol: 'PLAINTEXT'bootstrap-servers: kafka:9092consumer:group-id: messaging_apiauto-offset-reset: earliestkey-deserializer: org.springframework.kafka.support.serializer
.ErrorHandlingDeserializervalue-deserializer: org.springframework.kafka.support.serializer
.ErrorHandlingDeserializerproperties:spring.json.trusted.packages: '*'spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization
.StringDeserializerspring.deserializer.value.delegate.class: org.springframework.kafka.support
.serializer.JsonDeserializer
key-deserializer + value-deserializer Alanları
Örnek - JsonDeserializer
Spring normalde StringDeserializer kullanır. Değiştirmek için şöyle yaparız
spring.kafka.consumer.value-deserializer= org.springframework.kafka.support.serializer.JsonDeserializerspring.kafka.producer.value-serializer= org.springframework.kafka.support.serializer.JsonSerializer spring.json.add.type.headers=false spring.kafka.consumer.properties.spring.json.trusted.packages=*
Şöyle yaparız. Burada mesajı işleyen kodda groupId alanını belirtmek gereksiz
spring.kafka.consumer.bootstrap-servers = localhost:9092spring.kafka.consumer.group-id= group_idspring.kafka.consumer.auto-offset-reset = earliestspring.kafka.consumer.key-deserializer=
org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer =
org.apache.kafka.common.serialization.StringDeserializer@KafkaListener(topics = "kafkaTopic", groupId = "group_id")public void consume(String message) {logger.info(String.format("$$$$ => Consumed message: %s", message));}
max-poll-records Alanı
Örnek
Elimizde şöyle bir ayar olsun
spring:kafka:bootstrap-servers: ${kafka_bootstrap_servers:localhost:9092}consumer:auto-offset-reset: earliestgroup-id: examplekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 1fetch-max-wait: 36000enable-auto-commit: falseclient-id: examplelistener:poll-timeout: 1800000concurrency: 1ack-mode: manual_immediate
Açıklaması şöyle
Şöyle yaparızThis setup works for sequentially reading one message at a time (max-poll-records: 1) by one listener (concurrency: 1) and requires manual confirmation of message processing in listener (ack-mode: manual_immediate).
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
@Servicepublic class Consumer {private final Logger logger = LoggerFactory.getLogger(Producer.class);@KafkaListener(topics = {"INPUT_DATA"})public void consume(final @Payload String message,final @Header(KafkaHeaders.OFFSET) Integer offset,final @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,final @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,final @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,final @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts,final Acknowledgment acknowledgment) {logger.info(String.format("#### -> Consumed message -> TIMESTAMP: %d\n%s\noffset:
%d\nkey: %s\npartition: %d\ntopic: %s", ts, message, offset, key, partition, topic));acknowledgment.acknowledge();}}
properties Ayarları
Örnek
Şöyle yaparız
spring.kafka.bootstrap-servers=${kafka-service:localhost}:9092 spring.kafka.consumer.group-id=payment-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spring.json.add.type.headers=false spring.kafka.consumer.properties.spring.json.trusted.packages=* spring.kafka.listener.ack-mode=manual_immediate
Hiç yorum yok:
Yorum Gönder