19 Ağustos 2020 Çarşamba

SpringKafka Consumer DefaultKafkaConsumerFactory Sınıfı

Giriş
Bu sınıfın producer karşılığı DefaultKafkaProducerFactory. Bazı açıklamalar şöyle
- ConsumerFactory bean is required only because of “CustomMessage”. You don’t have to create this bean explicitly if your value is of type string.

- Like many tutorials online DO NOT HARD CODE properties in “ConsumerFactory”. Instead, build it using “kafkaProperties.buildConsumerProperties()” and enhance it based on your needs. This will allow you to control your consumer from application.yml.

- Hot Tip: Most of the tutorials online create ConcurrentKafkaListenerContainerFactory bean explicitly. You don’t need to do so; Spring Boot will do it for you as soon as you will specify concurrency property for the consumer.   
Maven
Şu satırı dahil ederiz
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>
constructor - KafkaProperties 
KafkaProperties bir spring sınıfı
Örnek
Şöyle yaparız.
@Bean
public ConsumerFactory<String, CustomMessage> consumerFactory(KafkaProperties
  kafkaProperties){
  return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
}
constructor - Map + Deserializer + Deserializer
İmzası şöyle.
DefaultKafkaConsumerFactory(Map<String, Object> configs,
            Deserializer<K> keyDeserializer,
            Deserializer<V> valueDeserializer)
Örnek
Şöyle yaparız.
@Bean
public ConsumerFactory<String, MessageADto> xxxConsumerFactory() {
  return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
    new StringDeserializer(), 
    new JsonDeserializer<>(MessageADto.class));
}


private Map<String, Object> consumerConfigs() {
  Map<String, Object> props = new HashMap<>();
  ...
  return props;
}
Properties Örnekleri
KEY_SERIALIZER_CLASS_CONFIG ve VALUE_SERIALIZER_CLASS_CONFIG olarak 
- ByteArrayDeserializer,
- ErrorHandlingDeserializer
- JsonDeserializer
- StringSerializer
kullanılabilir

ErrorHandlingDeserializer için açıklama şöyle
In case you want to receive already deserialized records directly in your listener then you might need to set a ErrorHandlingDeserializer for the cases where a deserialization exception occurs before Spring gets the record.

Örnek
Şöyle yaparız
@Bean
public Map<String, Object> producerConfigs() {
  Map<String, Object> props = new HashMap<>();
  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
  return props;
}
Örnek - Avro
Şöyle yaparız. Burada key string, value ise avro. Dolayısıyla avro scheme registry URL'si de belirtiliyor.
@Bean
public ConsumerFactory consumerFactory(
    @Value("${kafka.bootstrap-servers}") String bootstrapServers, 
    @Value("${kafka.schema.registry.url}") String schemaRegistryUrl) {

  Map config = new HashMap<>();
  config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  config.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-consumer-group");

  config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
    ErrorHandlingDeserializer.class);
  config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
    ErrorHandlingDeserializer.class);

  config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, 
    StringDeserializer.class);
  config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, 
    KafkaAvroDeserializer.class);
  config.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, 
    schemaRegistryUrl);
  config.put(KafkaAvroDeserializerConfig.AUTO_REGISTER_SCHEMAS, 
    false);
  config.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, 
    true);
  
  return new DefaultKafkaConsumerFactory<>(config);
}
Örnek
Şöyle yaparız. Burada key string, value ise json. Ancak her json'ı kabul etmesin diye de json için trusted packages tanımlanıyor
@Bean
fun consumerFactory(): ConsumerFactory<String, FeedItem> {
  val configProps = HashMap<String, Any>()
  configProps[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
  configProps[ConsumerConfig.GROUP_ID_CONFIG] = "feedme"
  configProps[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = 
    StringDeserializer::class.java
  configProps[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] =
    JsonDeserializer::class.java
  configProps[JsonDeserializer.TRUSTED_PACKAGES] = "co.orders.feedme.feed.domain"
  return DefaultKafkaConsumerFactory(configProps)
}

Hiç yorum yok:

Yorum Gönder