3 Ocak 2021 Pazar

SpringKafka Test @EmbeddedKafka Anotasyonu - Integration Test İçindir

Giriş
Şu satırı dahil ederiz
import org.springframework.kafka.test.context.EmbeddedKafka;
Bence TestContainers kullanmaktan daha kolay. Açıklaması şöyle
... we use the @EmbeddedKafka annotation to inject an instance of an EmbeddedKafkaBroker into our tests
Örnek
Açıklaması şöyle
... using Embedded Kafka ensures that the Kafka topics always start empty.
Maven
Şu satırı dahil ederiz
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka-test</artifactId>
  <scope>test</scope>
</dependency>
Aslında tüm proje şöyle olmalı
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter</artifactId>
 </dependency>
 <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-test</artifactId>
   <scope>test</scope>
  </dependency>
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka-test</artifactId>
  <version>2.8.2</version>
  <scope>test</scope>
</dependency>
<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
</dependency>
bootstrapServersProperty Alanı
Örnek
application.yaml şöyle olsun
spring:
  config:
    activate:
      on-profile: default
  kafka:
    consumer:
      auto-offset-reset: earliest
      group-id: e2econsumer
      properties:
        max.poll.interval.ms: 8000000
        reconnect.backoff.ms: 1000
        reconnect.backoff.max.ms: 10_000
    producer:
      properties:
        reconnect.backoff.ms: 1000
        reconnect.backoff.max.ms: 10_000
test:
  topic: topic3
---
spring:
  config:
    activate:
      on-profile: serviceAProfile
  cloud:
    function:
      definition: receive
    stream:
      defaultBinder: kafka
      bindings:
        receive-in-0:
          destination: topic1
          group: service_A
          consumer:
            partitioned: true
        results-out-0:
          destination: topic2
      kafka:
        binder:
          consumer-properties:
            max.poll.interval.ms: 8000000
            reconnect.backoff.ms: 1000
            reconnect.backoff.max.ms: 10_000
          producer-properties:
            reconnect.backoff.ms: 1000
            reconnect.backoff.max.ms: 10_000
server:
  port: -1
---
spring:
  config:
    activate:
      on-profile: serviceBProfile
  cloud:
    function:
      definition: receive
    stream:
      defaultBinder: kafka
      bindings:
        receive-in-0:
          destination: topic2
          group: service_B
      kafka:
        binder:
          consumer-properties:
            max.poll.interval.ms: 8000000
            reconnect.backoff.ms: 1000
            reconnect.backoff.max.ms: 10_000
          producer-properties:
            reconnect.backoff.ms: 1000
            reconnect.backoff.max.ms: 10_000
server:
  port: -1
serviceAProfile topic1'den okuyup, topic2'ye yazıyor. serviceBProfile topic2'yi dinliyor.
Açıklaması şöyle
DirtiesContextTests — Indicates that the associated class modifies the application context as messages will be added to the topic dirtying the embedded Kafka application context.
EmbeddedKafka(partitions = 1, bootstrapServersProperty = “spring.kafka.bootstrap-servers”}) — Enables embedded kafka to start running on a random free port assigned by os.
Şöyle yaparız
@SpringBootTest
@EmbeddedKafka(partitions = 1,
        bootstrapServersProperty = "spring.kafka.bootstrap-servers",
        topics = { "topic1", "topic2" })
@DirtiesContext
@TestPropertySource(properties = "test.topic=topic2")
public class ListeningTopic2IT {
    private static ConfigurableApplicationContext serviceAContext = null;
    private static ConfigurableApplicationContext serviceBContext = null;

    @BeforeEach
    public void startServices() throws InterruptedException {
        Executors.newSingleThreadExecutor().execute(() -> {
            SpringApplication application = 
              new SpringApplication(com.demo.servicea.Application.class);
            application.setAdditionalProfiles("serviceAProfile");
            serviceAContext = application.run();
        });

        Executors.newSingleThreadExecutor().execute(() -> {
            SpringApplication application = 
              new SpringApplication(com.demo.serviceb.Application.class);
            application.setAdditionalProfiles("serviceBProfile");
            serviceBContext = application.run();
        });

        while (serviceAContext == null || !serviceAContext.isRunning()) {
            System.out.println("Waiting for service A to start, already waited for seconds:" );
            Thread.sleep(1000);
        }

        while (serviceBContext == null || !serviceBContext.isRunning()) {
            System.out.println("Waiting for service B to start, already waited for seconds:" );
            Thread.sleep(1000);
        }
    }

    @AfterEach
    public void cleanTestState() {
        serviceAContext.close();
        serviceBContext.close();
    }

    @Autowired
    private KafkaConsumer consumer;

    @Autowired
    private KafkaProducer producer;

    @Test
    public void messageSentOnTopic1CapturedOnTopic2() throws InterruptedException, IOException {
        String payload = "{\"message\" : \"message on topic 1\"}";
        this.producer.send("topic1", payload);

        consumer.getLatch().await(1, TimeUnit.MINUTES);
        assertThat(consumer.getLatch().getCount(), equalTo(0L));

        byte[] recordBytes = consumer.getRecordValue();
        assertNotNull(recordBytes);

        ObjectMapper mapper = new ObjectMapper();
        mapper.registerModule(new JavaTimeModule());

        MessageEvent messageEvent = mapper.readValue(recordBytes, MessageEvent.class);
        assertNotNull(messageEvent);
        assertEquals("message on topic 1", messageEvent.getMessage());
    }
}

brokerProperties Alanı
Örnek - Listener'ı Test Eder
Açıklaması şöyle
The @EmbeddedKafka notation starts up a clean Kafka instance as part of the Spring test context. The brokerProperties sets a non-standard port for running the Embedded Kafka. In the application-test.properties file, property spring.kafka.bootstrap-servers=localhost:19092 configures the Spring context to use this address for Kafka producers and consumers.

The test is reliable because using Embedded Kafka ensures that the Kafka topics always start empty.
test altındaki application.properties şöyledir
spring.kafka.consumer.group-id=reader-1
spring.kafka.bootstrap-servers=localhost:19092
Elimizde şöyle bir test kodu olsun. Meter sınıfı Kafka'ya mesaj gönderir. Test listener'ın azalttığı sayaç 20 saniye içinde 0 olursa başarılıdır.
@ExtendWith(SpringExtension.class)
@SpringBootTest
@ActiveProfiles("test")
@EmbeddedKafka(partitions = 2,
brokerProperties = { "listeners=PLAINTEXT://localhost:19092", "port=19092" })
@DirtiesContext
public class SmartMeterIntegrationTests {

  private CountDownLatch latch;

  @Autowired
  private Meter meter;

  @Autowired
  private TestListener listener;

  @Test
  public void shouldSendReadings() throws InterruptedException {
    latch = new CountDownLatch(3);
    listener.setLatch(latch);
    meter.start();
    assertTrue(latch.await(20, TimeUnit.SECONDS));
    meter.stop();
  }
}
Topicleri dinleyen şöyle bir listener kod olsun. Her kafka mesajı geldiğince sayaç bir azaltılır
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.util.concurrent.CountDownLatch;

import static org.junit.jupiter.api.Assertions.assertEquals;

@Service
public class TestListener {

  private CountDownLatch latch ;

  @KafkaListener(topics = "meter.reading")
  public void listen(ConsumerRecord<String,String> cr) {
    String[] values = cr.value().split(":");
    assertEquals(3, values.length);
    if (latch != null) {
      latch.countDown();
    }
  }

  public void setLatch(CountDownLatch latch) {
    this.latch = latch;
  }
}
topics Alanı
Açıklaması şöyle
As can be seen, the topics used by the test are defined as an annotation parameter. The topics are required to be created upfront ..., even if they are configured as auto.create.topics.enable: true (which is the default) they are never automatically created. If they are not present the application will throw the following error:

MissingSourceTopicException: One or more source topics were missing during rebalance
Örnek - Listener ve Producer Test Ediliyor
Şöyle yaparız. Burada orders ve payments isimli iki topic yaratılıyor.
@SpringBootTest
@EmbeddedKafka(topics = {"orders", "payments"})
public class PaymentServiceTest {

  @Autowired
  private Producer<Order> orderProducer;
  
  @Autowired
  private Consumer<Payment> paymentConsumer;
  
  @Test
  public void shouldProcessPaymentForValidOrder() {
    var validOrder = Order
      .withCustomerId(123)
      .withItems(Item.withId("af12da"));
    
    orderProducer.send(validOrder);
    
    var payment = paymentConsumer.poll();
    assertThat(payment.getPaymentStatus()).isEqualTo("OK");
    assertThat(payment.getPaymentTransactionId()).isNotNull();
  }
}

Hiç yorum yok:

Yorum Gönder