Giriş
Şu satırı dahil ederiz
import org.springframework.data.transaction.ChainedTransactionManager;
Açıklaması şöyle
The traditional approach, using ChainedKafkaTransactionManager, has been deprecated in 2021.
Örnek
Açıklaması şöyle
First of all, we need to enable Spring Kafka's Kafka Transaction Manager. We can do it by simply setting the transactional id property in our application.properties:By setting this property, Spring Boot will automatically configure the Kafka Transaction Manager.
Şöyle yaparız
spring.kafka.producer.transaction-id-prefix=tx-
Sonra şöyle yaparız
@KafkaListener(id = "group1", topics = "topic1") @Transactional("transactionManager") public void listen1(String in) { log.info("Received from topic1: {}", in); log.info("Sending to topic2: {}", in.toUpperCase()); kafkaTemplate.send("topic2", in.toUpperCase()); log.info("Writing to database: {}", in); demoRepository.save(DemoEntity.builder() .name(in) .timestamp(System.currentTimeMillis()) .build() ); }
Açıklaması şöyle
The trick here is giving transactionManager as the value for the @Transactional annotation. This is because there will be two transaction managers available: transactionManager and kafkaTransactionManager.The transactionManager bean is an instance of JpaTransactionManager while the kafkaTransactionManager bean is an instance of KafkaTransactionManager.And the reason why we want to give the transactionManager as the value for the @Transactional annotation is because our KafkaMessageListenerContainer is already creating transactions for us on consuption. Whenever a new message comes in, it will automatically begin a Kafka transaction before it starts running our method.Therefore, all we have to do is tell Spring Boot to, before our method is run, to also begin a transaction, but at this time, for the JpaTransactionManager.
Hiç yorum yok:
Yorum Gönder