Giriş
Bu sınıfın producer karşılığı DefaultKafkaProducerFactory. Bazı açıklamalar şöyle
Şu satırı dahil ederiz
KafkaProperties bir spring sınıfı
Örnek
Şöyle yaparız.
İmzası şöyle.
Şöyle yaparız.
Bu sınıfın producer karşılığı DefaultKafkaProducerFactory. Bazı açıklamalar şöyle
- ConsumerFactory bean is required only because of “CustomMessage”. You don’t have to create this bean explicitly if your value is of type string.Maven
- Like many tutorials online DO NOT HARD CODE properties in “ConsumerFactory”. Instead, build it using “kafkaProperties.buildConsumerProperties()” and enhance it based on your needs. This will allow you to control your consumer from application.yml.
- Hot Tip: Most of the tutorials online create ConcurrentKafkaListenerContainerFactory bean explicitly. You don’t need to do so; Spring Boot will do it for you as soon as you will specify concurrency property for the consumer.
Şu satırı dahil ederiz
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
constructor - KafkaProperties KafkaProperties bir spring sınıfı
Örnek
Şöyle yaparız.
@Bean
public ConsumerFactory<String, CustomMessage> consumerFactory(KafkaProperties
kafkaProperties){
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
}
constructor - Map + Deserializer + Deserializerİmzası şöyle.
DefaultKafkaConsumerFactory(Map<String, Object> configs,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer)
ÖrnekŞöyle yaparız.
@Bean
public ConsumerFactory<String, MessageADto> xxxConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
new StringDeserializer(),
new JsonDeserializer<>(MessageADto.class));
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
...
return props;
}
Properties ÖrnekleriKEY_SERIALIZER_CLASS_CONFIG ve VALUE_SERIALIZER_CLASS_CONFIG olarak
- ByteArrayDeserializer,
- ErrorHandlingDeserializer
- JsonDeserializer
- StringSerializer
kullanılabilir
ErrorHandlingDeserializer için açıklama şöyle
In case you want to receive already deserialized records directly in your listener then you might need to set a ErrorHandlingDeserializer for the cases where a deserialization exception occurs before Spring gets the record.
Örnek
Şöyle yaparız
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return props;
}
Örnek - Avro
Şöyle yaparız. Burada key string, value ise avro. Dolayısıyla avro scheme registry URL'si de belirtiliyor.
@Bean
public ConsumerFactory consumerFactory(
@Value("${kafka.bootstrap-servers}") String bootstrapServers,
@Value("${kafka.schema.registry.url}") String schemaRegistryUrl) {
Map config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-consumer-group");
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ErrorHandlingDeserializer.class);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ErrorHandlingDeserializer.class);
config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS,
StringDeserializer.class);
config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS,
KafkaAvroDeserializer.class);
config.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
schemaRegistryUrl);
config.put(KafkaAvroDeserializerConfig.AUTO_REGISTER_SCHEMAS,
false);
config.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
true);
return new DefaultKafkaConsumerFactory<>(config);
}
Örnek
@Bean
fun consumerFactory(): ConsumerFactory<String, FeedItem> {
val configProps = HashMap<String, Any>()
configProps[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
configProps[ConsumerConfig.GROUP_ID_CONFIG] = "feedme"
configProps[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] =
StringDeserializer::class.java
configProps[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] =
JsonDeserializer::class.java
configProps[JsonDeserializer.TRUSTED_PACKAGES] = "co.orders.feedme.feed.domain"
return DefaultKafkaConsumerFactory(configProps)
}
Hiç yorum yok:
Yorum Gönder