Şu satırı dahil ederiz
import org.springframework.kafka.test.context.EmbeddedKafka;
Bence TestContainers kullanmaktan daha kolay. Açıklaması şöyle
... we use the @EmbeddedKafka annotation to inject an instance of an EmbeddedKafkaBroker into our tests
Örnek
Açıklaması şöyle
... using Embedded Kafka ensures that the Kafka topics always start empty.
Maven
Şu satırı dahil ederiz
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency>
Aslında tüm proje şöyle olmalı
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <version>2.8.2</version> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
bootstrapServersProperty Alanı
Örnek
application.yaml şöyle olsun
spring: config: activate: on-profile: default kafka: consumer: auto-offset-reset: earliest group-id: e2econsumer properties: max.poll.interval.ms: 8000000 reconnect.backoff.ms: 1000 reconnect.backoff.max.ms: 10_000 producer: properties: reconnect.backoff.ms: 1000 reconnect.backoff.max.ms: 10_000 test: topic: topic3 --- spring: config: activate: on-profile: serviceAProfile cloud: function: definition: receive stream: defaultBinder: kafka bindings: receive-in-0: destination: topic1 group: service_A consumer: partitioned: true results-out-0: destination: topic2 kafka: binder: consumer-properties: max.poll.interval.ms: 8000000 reconnect.backoff.ms: 1000 reconnect.backoff.max.ms: 10_000 producer-properties: reconnect.backoff.ms: 1000 reconnect.backoff.max.ms: 10_000 server: port: -1 --- spring: config: activate: on-profile: serviceBProfile cloud: function: definition: receive stream: defaultBinder: kafka bindings: receive-in-0: destination: topic2 group: service_B kafka: binder: consumer-properties: max.poll.interval.ms: 8000000 reconnect.backoff.ms: 1000 reconnect.backoff.max.ms: 10_000 producer-properties: reconnect.backoff.ms: 1000 reconnect.backoff.max.ms: 10_000 server: port: -1
serviceAProfile topic1'den okuyup, topic2'ye yazıyor. serviceBProfile topic2'yi dinliyor.
Açıklaması şöyle
DirtiesContextTests — Indicates that the associated class modifies the application context as messages will be added to the topic dirtying the embedded Kafka application context.
EmbeddedKafka(partitions = 1, bootstrapServersProperty = “spring.kafka.bootstrap-servers”}) — Enables embedded kafka to start running on a random free port assigned by os.
Şöyle yaparız
@SpringBootTest @EmbeddedKafka(partitions = 1, bootstrapServersProperty = "spring.kafka.bootstrap-servers", topics = { "topic1", "topic2" }) @DirtiesContext @TestPropertySource(properties = "test.topic=topic2") public class ListeningTopic2IT { private static ConfigurableApplicationContext serviceAContext = null; private static ConfigurableApplicationContext serviceBContext = null; @BeforeEach public void startServices() throws InterruptedException { Executors.newSingleThreadExecutor().execute(() -> { SpringApplication application = new SpringApplication(com.demo.servicea.Application.class); application.setAdditionalProfiles("serviceAProfile"); serviceAContext = application.run(); }); Executors.newSingleThreadExecutor().execute(() -> { SpringApplication application = new SpringApplication(com.demo.serviceb.Application.class); application.setAdditionalProfiles("serviceBProfile"); serviceBContext = application.run(); }); while (serviceAContext == null || !serviceAContext.isRunning()) { System.out.println("Waiting for service A to start, already waited for seconds:" ); Thread.sleep(1000); } while (serviceBContext == null || !serviceBContext.isRunning()) { System.out.println("Waiting for service B to start, already waited for seconds:" ); Thread.sleep(1000); } } @AfterEach public void cleanTestState() { serviceAContext.close(); serviceBContext.close(); } @Autowired private KafkaConsumer consumer; @Autowired private KafkaProducer producer; @Test public void messageSentOnTopic1CapturedOnTopic2() throws InterruptedException, IOException { String payload = "{\"message\" : \"message on topic 1\"}"; this.producer.send("topic1", payload); consumer.getLatch().await(1, TimeUnit.MINUTES); assertThat(consumer.getLatch().getCount(), equalTo(0L)); byte[] recordBytes = consumer.getRecordValue(); assertNotNull(recordBytes); ObjectMapper mapper = new ObjectMapper(); mapper.registerModule(new JavaTimeModule()); MessageEvent messageEvent = mapper.readValue(recordBytes, MessageEvent.class); assertNotNull(messageEvent); assertEquals("message on topic 1", messageEvent.getMessage()); } }
brokerProperties Alanı
Örnek - Listener'ı Test Eder
Açıklaması şöyle
The @EmbeddedKafka notation starts up a clean Kafka instance as part of the Spring test context. The brokerProperties sets a non-standard port for running the Embedded Kafka. In the application-test.properties file, property spring.kafka.bootstrap-servers=localhost:19092 configures the Spring context to use this address for Kafka producers and consumers.The test is reliable because using Embedded Kafka ensures that the Kafka topics always start empty.
test altındaki application.properties şöyledir
spring.kafka.consumer.group-id=reader-1spring.kafka.bootstrap-servers=localhost:19092
Elimizde şöyle bir test kodu olsun. Meter sınıfı Kafka'ya mesaj gönderir. Test listener'ın azalttığı sayaç 20 saniye içinde 0 olursa başarılıdır.
@ExtendWith(SpringExtension.class)@SpringBootTest@ActiveProfiles("test")@EmbeddedKafka(partitions = 2,
brokerProperties = { "listeners=PLAINTEXT://localhost:19092", "port=19092" })@DirtiesContextpublic class SmartMeterIntegrationTests {private CountDownLatch latch;@Autowiredprivate Meter meter;@Autowiredprivate TestListener listener;@Testpublic void shouldSendReadings() throws InterruptedException {latch = new CountDownLatch(3);listener.setLatch(latch);meter.start();assertTrue(latch.await(20, TimeUnit.SECONDS));meter.stop();}}
Topicleri dinleyen şöyle bir listener kod olsun. Her kafka mesajı geldiğince sayaç bir azaltılır
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Service;import java.util.concurrent.CountDownLatch;import static org.junit.jupiter.api.Assertions.assertEquals;@Servicepublic class TestListener {private CountDownLatch latch ;@KafkaListener(topics = "meter.reading")public void listen(ConsumerRecord<String,String> cr) {String[] values = cr.value().split(":");assertEquals(3, values.length);if (latch != null) {latch.countDown();}}public void setLatch(CountDownLatch latch) {this.latch = latch;}}
topics Alanı
Açıklaması şöyle
As can be seen, the topics used by the test are defined as an annotation parameter. The topics are required to be created upfront ..., even if they are configured as auto.create.topics.enable: true (which is the default) they are never automatically created. If they are not present the application will throw the following error:MissingSourceTopicException: One or more source topics were missing during rebalance
Örnek - Listener ve Producer Test Ediliyor
Şöyle yaparız. Burada orders ve payments isimli iki topic yaratılıyor.
@SpringBootTest@EmbeddedKafka(topics = {"orders", "payments"})public class PaymentServiceTest {@Autowiredprivate Producer<Order> orderProducer;@Autowiredprivate Consumer<Payment> paymentConsumer;@Testpublic void shouldProcessPaymentForValidOrder() {var validOrder = Order.withCustomerId(123).withItems(Item.withId("af12da"));orderProducer.send(validOrder);var payment = paymentConsumer.poll();assertThat(payment.getPaymentStatus()).isEqualTo("OK");assertThat(payment.getPaymentTransactionId()).isNotNull();}}
Hiç yorum yok:
Yorum Gönder