Şu satırı dahil ederiz
import org.springframework.kafka.core.DefaultKafkaProducerFactory;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.core.ProducerFactory;
BOOTSTRAP_SERVERS_CONFIG: This is the key for the Kafka bootstrap servers configuration.KEY_SERIALIZER_CLASS_CONFIG: Serializer class for Producer Factory keyVALUE_SERIALIZER_CLASS_CONFIG: Serializer class for Producer Factory Value
BOOTSTRAP_SERVERS_CONFIG Kafka sunucusunun adresini belirtir.
Örnek
BOOTSTRAP_SERVERS_CONFIG + serializers. Şöyle yaparız@Configurationpublic class KafkaProducerConfig {@Beanpublic ProducerFactory producerFactory(KafkaProperties kafkaProperties) {HashMap<String, Object> properties = new HashMap<>();properties.putAll(kafkaProperties.buildProducerProperties());properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");StringSerializer keySerializer = new StringSerializer();StringSerializer valueSerializer = new StringSerializer();return new DefaultKafkaProducerFactory<>(properties, keySerializer, valueSerializer);}@Beanpublic KafkaTemplate kafkaTemplate(ProducerFactory producerFactory) {KafkaTemplate kafkaTemplate = new KafkaTemplate<>(producerFactory);return kafkaTemplate;}}
Örnek
BOOTSTRAP_SERVERS_CONFIG + serializers. Şöyle yaparız
@Configurationpublic class KafkaProducerConfig {@Beanpublic ProducerFactory producerFactory(KafkaProperties kafkaProperties) {HashMap<String, Object> properties = new HashMap<>();properties.putAll(kafkaProperties.buildProducerProperties());properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");StringSerializer keySerializer = new StringSerializer();StringSerializer valueSerializer = new StringSerializer();return new DefaultKafkaProducerFactory<>(properties, keySerializer, valueSerializer);}@Beanpublic KafkaTemplate kafkaTemplate(ProducerFactory producerFactory) {KafkaTemplate kafkaTemplate = new KafkaTemplate<>(producerFactory);return kafkaTemplate;}}
Örnek
KEY_SERIALIZER_CLASS_CONFIG + VALUE_SERIALIZER_CLASS_CONFIG. Şöyle yaparızÖrnek - Avro@Configurationpublic class KafkaProducerConfig {@Autowiredprivate KafkaProperties kafkaProperties;@Beanpublic Map<String, Object> producerConfigs() {Map<String, Object> props =new HashMap<>(kafkaProperties.buildProducerProperties());props.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);props.put(org.apache.kafka.clients.producer.ProducerConfig
.VALUE_SERIALIZER_CLASS_CONFIG,JsonSerializer.class);return props;}@Beanpublic ProducerFactory<String, Object> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic KafkaTemplate<String, Object> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}}
@Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); props.put(ProducerConfig.CLIENT_ID_CONFIG, "kafka-producer"); props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistry); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000"); props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "10000"); props.put(ProducerConfig.RETRIES_CONFIG, "0"); return props; }
Örnek - Avro
Şöyle yaparız
@Bean public ProducerFactory<String, PaymentSent> producerFactory( @Value("${kafka.bootstrap-servers}") final String bootstrapServers, @Value("${kafka.schema.registry.url}") final String schemaRegistryUrl) { Map<String, Object> config = new HashMap<>(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); config.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); config.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, false); return new DefaultKafkaProducerFactory<>(config); }
Hiç yorum yok:
Yorum Gönder