8 Aralık 2020 Salı

SpringKafka Producer application.properties Ayarları

Giriş
Bu dosyada belirtilen key-serializer ve value-serializer @Autowired ile kullanılacak KafkaTemplate sınıfının parametrelerini belirliyor. Bu değerlerle bir DefaultKafkaProducerFactory tanımlanıyor ve KafkaTemplate bunu kullanıyor

bootstrap-servers Alanı
Normalde  spring.kafka.bootstrap-servers şeklinde kullanılır. 
Ezmek istersek 
spring.kafka.consumer.bootstrap-servers 
veya 
spring.kafka.producer.bootstrap-servers şeklinde kullanırız. Açıklaması şöyle
Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Applies to all components unless overridden.
Serializer Ayarları
Örnek -- StringDeserializer + StringDeserializer
Şöyle yaparız
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
Örnek - StringDeserializer + StringDeserializer
Şöyle yaparız
server: port: 9000
spring:
   kafka:
     consumer:
        bootstrap-servers: localhost:9092
        group-id: group_id
        auto-offset-reset: earliest
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
     producer:
        bootstrap-servers: localhost:9092
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.apache.kafka.common.serialization.StringSerializer
group-id verildiği için consumer kodunda şöyle yaparız
@Service
public class Consumer {

  private final Logger logger = LoggerFactory.getLogger(Producer.class);

  @KafkaListener(topics = "users", groupId = "group_id")
  public void consume(String message) throws IOException {
    logger.info(String.format("#### -> Consumed message -> %s", message));
  }
}
Örnek - StringDeserializer + JsonDeserializer
Şöyle yaparız
spring:
  kafka:
    bootstrap-servers: localhost:9092
    listener:
      ack-mode: manual_immediate
    producer:
      topic: topic-1p
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      retries: 3
      acks: 1
    consumer:
      groupId: test-group-1
      topic: topic-1p
      enable-auto-commit: false
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring:
          json:
            trusted:
              packages: 'learn.kafka.model'
Örnek -  StringDeserializer + KafkaAvroDeserializer
Şöyle yaparız. Burada Confluent Avro serializer kullanıldığı için ilave ayarlar da var.
 kafka:
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
    bootstrap-servers: localhost:9092
    properties:
      schema:
        registry:
          url: http://localhost:8081
        reflection: true
      value:
        subject:
          name:
            strategy: io.confluent.kafka.serializers.subject.TopicNameStrategy
      auto:
        register:
          schemas: false
      use:
        latest:
          version: false
KafkaTemplate Ayarları
Örnek
Şöyle yaparız
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.template.default-topic=stock-market-data
Açıklaması şöyle
The property spring.kafka.template.default-topic is used to configure the default topic that will be used by the KafkaTemplate for producing messages.
Producer Ayaları
Örnek
Şöyle yaparız
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.add.type.headers=false

Örnek - acks
Şöyle yaparız
spring:
  kafka:
    producer:
      bootstrap-servers: 127.0.0.1:9092 //zookeper ip addresses
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      properties:
        acks: 0
        reconnect:
          backoff:
            ms: 1000
        max:
          block:
            ms: 80
        request:
          timeout:
            ms: 100

Hiç yorum yok:

Yorum Gönder