28 Ocak 2021 Perşembe

SpringKafka Producer DefaultKafkaProducerFactory - KafkaTemplate Yaratmak İçin Gerekir

Giriş
Şu satırı dahil ederiz
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
DefaultKafkaProducerFactory nesnesi, KafkaTemplate nesnesine geçilir. KafkaTemplate ile mesaj gönderebilmek için gerekir. ProducerConfig sınıfında bazı sabitler tanımlıdır. Bunların açıklaması şöyle
BOOTSTRAP_SERVERS_CONFIG: This is the key for the Kafka bootstrap servers configuration.

KEY_SERIALIZER_CLASS_CONFIG: Serializer class for Producer Factory key

VALUE_SERIALIZER_CLASS_CONFIG: Serializer class for Producer Factory Value
BOOTSTRAP_SERVERS_CONFIG Kafka sunucusunun adresini belirtir.

Örnek
BOOTSTRAP_SERVERS_CONFIG + serializers. Şöyle yaparız
@Configuration
public class KafkaProducerConfig {
    
  @Bean
  public ProducerFactory producerFactory(KafkaProperties kafkaProperties) {
    HashMap<String, Object> properties = new HashMap<>();
    properties.putAll(kafkaProperties.buildProducerProperties());
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    StringSerializer keySerializer = new StringSerializer();
    StringSerializer valueSerializer = new StringSerializer();
    return new DefaultKafkaProducerFactory<>(properties, keySerializer, valueSerializer);
  }
    
  @Bean
  public KafkaTemplate kafkaTemplate(ProducerFactory producerFactory) {
    KafkaTemplate kafkaTemplate = new KafkaTemplate<>(producerFactory);
    return kafkaTemplate;
  }
}
Örnek
BOOTSTRAP_SERVERS_CONFIG + serializers. Şöyle yaparız
@Configuration
public class KafkaProducerConfig {
    
  @Bean
  public ProducerFactory producerFactory(KafkaProperties kafkaProperties) {
    HashMap<String, Object> properties = new HashMap<>();
    properties.putAll(kafkaProperties.buildProducerProperties());
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    StringSerializer keySerializer = new StringSerializer();
    StringSerializer valueSerializer = new StringSerializer();
    return new DefaultKafkaProducerFactory<>(properties, keySerializer, valueSerializer);
  }
    
  @Bean
  public KafkaTemplate kafkaTemplate(ProducerFactory producerFactory) {
    KafkaTemplate kafkaTemplate = new KafkaTemplate<>(producerFactory);
    return kafkaTemplate;
  }
}
Örnek
KEY_SERIALIZER_CLASS_CONFIG + VALUE_SERIALIZER_CLASS_CONFIG. Şöyle yaparız
@Configuration
public class KafkaProducerConfig {
  @Autowired
  private KafkaProperties kafkaProperties;

  @Bean
  public Map<String, Object> producerConfigs() {
    Map<String, Object> props = 
      new HashMap<>(kafkaProperties.buildProducerProperties());
    props.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      StringSerializer.class);
    props.put(org.apache.kafka.clients.producer.ProducerConfig
.VALUE_SERIALIZER_CLASS_CONFIG,
      JsonSerializer.class);
    return props;
  }

  @Bean
  public ProducerFactory<String, Object> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
  }

  @Bean
  public KafkaTemplate<String, Object> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
  }
}
Örnek - Avro
Avro için SCHEMA_REGISTRY_URL_CONFIG ve diğer bir sürü parametre. Şöyle yaparız
@Bean
public Map<String, Object> producerConfigs() {
  Map<String, Object> props = new HashMap<>();
  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
  props.put(ProducerConfig.CLIENT_ID_CONFIG, "kafka-producer");
  props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistry);
  props.put(ProducerConfig.ACKS_CONFIG, "all");
  props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000");
  props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "10000");
  props.put(ProducerConfig.RETRIES_CONFIG, "0");
  return props;
}
Örnek - Avro
Şöyle yaparız
@Bean
public ProducerFactory<String, PaymentSent> producerFactory(
  @Value("${kafka.bootstrap-servers}") final String bootstrapServers, 
  @Value("${kafka.schema.registry.url}") final String schemaRegistryUrl) {

  Map<String, Object> config = new HashMap<>();
  config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

  config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
  config.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
  config.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, false);

  return new DefaultKafkaProducerFactory<>(config);
}

Hiç yorum yok:

Yorum Gönder