30 Ağustos 2022 Salı

SpringIntegration Redis RedisLockRegistry Sınıfı

Giriş
Açıklaması şöyle. Kilidi @RedisLockable ile otomatik bırakmak yerine kodla bırakmak için kullanılabilir
To release the lock early, you can use the LockRegistry interface provided by Spring Integration. You can inject the LockRegistry bean and obtain a lock on the same key used by the @RedisLockable annotation.
constructor - RedisConnectionFactory + registryKey
Örnek
Şöyle yaparız
private static final String LOCK_NAME = "lock";

@Bean(destroyMethod = "destroy")
public RedisLockRegistry redisLockRegistry(
  RedisConnectionFactory redisConnectionFactory) {
    return new RedisLockRegistry(redisConnectionFactory, LOCK_NAME);
}
constructor - RedisConnectionFactory + registryKey + expireAfter
Açıklaması şöyle
When creating a RedisLockRegistry instance, you need to provide three parameters: `connectionFactory`, `registryKey`, and `expireAfter`.

The `connectionFactory` parameter is used to generate the redisTemplate instance field within the RedisLockRegistry. This redisTemplate facilitates interaction with Redis using string values.

The `registryKey` parameter serves as a prefix for key names. These key names are formed by combining the `registryKey` and the key’s specific value, separated by a colon. For example, if `registryKey` is set to “Spring” and you intend to lock a key with the value “Integration”, the actual key used to communicate with Redis would be “Spring:Integration”.

The `expireAfter` parameter, of type long, determines the Time To Live (TTL) for the lock status data inserted into Redis by RedisLock instances.
Örnek
Şöyle yaparız
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.integration.redis.util.RedisLockRegistry;
import org.springframework.integration.support.locks.ExpirableLockRegistry;

@Configuration
public class RedisDistributedLockConfiguration {
    private static final String LOCK_REGISTRY_REDIS_KEY = "MY_REDIS_KEY";
    private static final Duration RELEASE_TIME_DURATION = Duration.ofSeconds(30);

  @Bean(LOCK_REGISTRY_BEAN)
  public ExpirableLockRegistry lockRegistry(RedisConnectionFactory redisConnectionFactory){
    RedisLockRegistry redisLockRegistry =
      new RedisLockRegistry(redisConnectionFactory, LOCK_REGISTRY_REDIS_KEY,
        RELEASE_TIME_DURATION.toMillis());
      return redisLockRegistry;
  }
}
expireUnusedOlderThan metodu
Açıklaması şöyle
In some situations, you might need to acquire a lock but not release it. For example, you have a @Scheduled task that runs each 15 seconds on each instance and you don’t want it to be run more often than once per 15 seconds.
To do it you can get a lock and exit from a method without releasing. In such cases, I suggest calling lockRegistry.expireUnusedOlderThan(TTL) each time before obtaining a lock (actually it is better to call it for all cases). This method removes old not released locks from the map locks and prevents the situation when one instance has a map with old locks and all threads of this instance (except the one which acquired this lock) cannot acquire it.
obtain metodu
Açıklaması şöyle
Each object LockRegistry has a random id of type UUID and contains a map of locks (lock name / lock object) that are called locks and that are held by the current instance. When we do lockRegistry.obtain(lockKey), lockRegistry first checks if the map contains this lock (in other words, if the current instance has the lock with the same name acquired at this moment), if so then this lock is returned. Otherwise, lockRegistry checks if Redis contains the key with the name “registry key:lock name” and if the value equals the id of the lockRegistry object. If so, it returns this lock, else it tries to create the key “registry key:lock name” in Redis. When you got a lock and call lock.tryLock() basically the similar steps are performed as in lockRegistry.obtain(lockKey), first the map locks with locks is checked, then Redis.
Şöyle yaparız
import org.springframework.integration.support.locks.ExpirableLockRegistry;

@Slf4j
@Component
public class LockService implements ILockService {

  @Qualifier(LOCK_REGISTRY_BEAN)
  @Autowired
  private ExpirableLockRegistry lockRegistry;

  @Override
  public boolean update(UpdateRequest request) {
    Lock lock = lockRegistry.obtain(request.getId());
    boolean success = lock.tryLock();

    if (!success) {
      return false;
    }
        
    // ...
    // update a shared resource  
    // ... 
        
    lock.unlock();
    return true;
  }
}
Örnek
Şöyle yaparız
@Service
public class MyService {

  @Autowired
  private RedisLockRegistry redisLockRegistry;

  @RedisLockable(key = "my-lock-key", leaseTime = 60_000, waitTime = 5_000)
  public void doSomethingWithLock() throws InterruptedException {
    Lock lock = redisLockRegistry.obtain("my-lock-key");
    try {
      // Do something with the lock
    } finally {
       lock.unlock();
    }
  }
}
setRedisLockType metodu
Açıklaması şöyle
... there can be configured 2 types of locks (both Reentrant ones)
- RedisLockType.SPIN_LOCK - the lock is acquired by periodic loop (100ms) checking whether the lock can be acquired. Default.
- RedisLockType.PUB_SUB_LOCK - The lock is acquired by redis pub-sub subscription.

The pub-sub is preferred mode — less network chatter between client Redis server, and more performant — the lock is acquired immediately when subscription is notified about unlocking in the other process. However, the Redis does not support pub-sub in the Master/Replica connections (for example in AWS ElastiCache environment), therefore a busy-spin mode is chosen as a default to make the registry working in any environment.



SpringCloud AWS Secrets Manager - AWS Secrets Manager'dan Bilgileri Alır

Giriş
AWS secret değerlerini actuator ile env altında görebiliriz.
http://localhost:8080/actuator/env
SpringCloud AWS Systems Manager kullanımına çok benziyor

Maven
Şu satırı dahil ederiz
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>io.awspring.cloud</groupId>
      <artifactId>spring-cloud-aws-dependencies</artifactId>
      <version>3.0.0-M2</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
</dependencies>

<dependency>
  <groupId>io.awspring.cloud</groupId>
  <artifactId>spring-cloud-aws-starter-secrets-manager</artifactId>
</dependency>
Gradle
Şu satırı dahil ederiz
dependencies {
  implementation 'org.springframework.boot:spring-boot-starter'
  implementation('io.awspring.cloud:spring-cloud-starter-aws-secrets-manager-config:2.4.2')
}
application.yaml Dosyası
spring.config.import AWS'ten parametreleri çeker ve bunları Spring’in ortam değişkenlerine (environment properties) ekler

Örnek
Maven ile şöyle yaparız
<dependency>
     <groupId>com.amazonaws</groupId>
     <artifactId>aws-java-sdk-secretsmanager</artifactId>
  </dependency>
  <dependency>
     <groupId>com.amazonaws.secretsmanager</groupId>
     <artifactId>aws-secretsmanager-jdbc</artifactId>
     <version>1.0.5</version>
  </dependency>
application.properties ile şöyle yaparız
spring.datasource.url = (name/path of your secrets manager)
spring.datasource.username = (name/path of your secrets manager)
spring.datasource.driver-class-name = com.amazonaws.secretsmanager.sql.AWSSecretsManagerMySQLDriver
Örnek - Other Secret Type

Örnek
yaml dosyası şöyle olsun. 3 tane farklı secret dosyası içinde sev/value çiftleri tanımladık.
spring:
  config:
    import:
      - aws-secretsmanager:dev/my-app/database-creds
      - aws-secretsmanager:dev/my-app/oauth-creds
      - optional:aws-secretsmanager:dev/my-app/some-other-creds
Bu çiftlerin herhangi birine @Value ile erişebiliriz. Şöyle yaparız
@SpringBootApplication
public class SpringBootAwsSecretsApplication implements CommandLineRunner {

  @Value("${dbUser}")
  private String dbUser;

  @Value("${dbPassword}")
  private String dbPassword;
  ...
}
Örnek
Şöyle bir secret yaratalım
aws secretsmanager create-secret 
  --name /secret/db-credential 
  --secret-string '{"dbuser": "user1", "dbpassword": "password"}'
Şöyle yaparız. spring/cloud/config/import ile secret ismi belirtiliyor. Secret içindeki dbuser ve dbpassword değişkenleri kullanılıyor. AWS için kullanılan profile ismi "personal"
# actuator configuration
management:
  endpoints:
    web:
      exposure:
        include:
        - env
spring:
  datasource:
    url: jdbc:mysql://localhost:3306/database
    username: ${dbuser}
    password: ${dbpassword}
  jpa:
    hibernate:
      ddl-auto: create

#  AWS configuration
  cloud:
    aws:
      secretsmanager:
        region: eu-central-1
          
      credentials:
        profile:
          name: personal
  config:
    import:
       - aws-secretsmanager:/secret/db-credential
       - optional:aws-secretsmanager:/secrets/optional-secret
Localstack İle Unit Test
Docker compose ile Localstack'i çalıştırmak için şöyle yaparız
version: "3.8"

services:
  localstack:
    image: localstack/localstack
    ports:
      - "4566:4566"            # LocalStack endpoint
    environment:
      - DOCKER_HOST=unix:///var/run/docker.sock
      - DEFAULT_REGION=eu-central-1
    volumes:
      - ./localstack-script:/docker-entrypoint-initaws.d
      - /var/run/docker.sock:/var/run/docker.sock
localstack-script/script.sh içinde şöyle yaparız. awslocal localstack tarafından sağlanır ve aws komutu için bir wrapper
awslocal secretsmanager create-secret 
  --name /secret/spring-boot-app 
  --secret-string '{"property1": "property1-value", "property2": "property2-value"}'

awslocal secretsmanager create-secret 
  --name /secret/db-credential 
  --secret-string '{"dbuser": "user1", "dbpassword": "password"}'
test için application.properties şöyledir
spring:
  cloud:
     aws:
      secretsmanager:
        region: eu-central-1
        endpoint: http://localhost:4566
      credentials:
        access-key: none
        secret-key: none
Test içinde şöyle yaparız. Burada @DynamicPropertySource kullanılmıyor
@SpringBootTest
@AutoConfigureMockMvc
class ApplicationIT {

  @Autowired
  MockMvc mockMvc;

  @Container
  private static LocalStackContainer localStackContainer = 
    new LocalStackContainer(DockerImageName.parse("localstack/localstack"))
      .withCopyFileToContainer(MountableFile.forClasspathResource("script.sh"),
                    "/docker-entrypoint-initaws.d/")
      .withServices(LocalStackContainer.Service.SECRETSMANAGER);

  @BeforeAll
  static void beforeAll() throws IOException, InterruptedException {
    System.setProperty("spring.cloud.aws.secretsmanager.endpoint",
      localStackContainer.getEndpointOverride(
        LocalStackContainer.Service.SECRETSMANAGER).toString());

    System.setProperty("spring.cloud.aws.secretsmanager.region", 
      localStackContainer.getRegion());
    System.setProperty("spring.cloud.aws.credentials.access-key", "none");
    System.setProperty("spring.cloud.aws.credentials.secret-key", "none");
  }
  ...
 
}
   







SpringBoot Actuator - JVM İçin Metrics Endpoint

Giriş
JVM Metrik isimleri şöyle
{
  "names": [
    ...
    "jvm.buffer.count",
    "jvm.buffer.memory.used",
    "jvm.buffer.total.capacity",
    "jvm.classes.loaded",
    "jvm.classes.unloaded",
    "jvm.gc.live.data.size",
    "jvm.gc.max.data.size",
    "jvm.gc.memory.allocated",
    "jvm.gc.memory.promoted",
    "jvm.gc.overhead",
    "jvm.gc.pause",
    "jvm.memory.committed",
    "jvm.memory.max",
    "jvm.memory.usage.after.gc",
    "jvm.memory.used",
    "jvm.threads.daemon",
    "jvm.threads.live",
    "jvm.threads.peak",
    "jvm.threads.states",
    ...
Örnek
Bazı değerleri görmek için şöyle yaparız
http://localhost:8080/actuator/metrics/jvm.memory.max
http://localhost:8080/actuator/metrics/jvm.memory.used
Bazıları için açıklama şöyle
jvm.memory.max - the max heap/non heap size the application can occupy
jvm.memory.used - the amount of memory that is currently occupied by Java objects
jvm.memory.committed - the amount of memory guaranteed to be available for use by JVM
jvm.buffer.memory.used - temporary buffer cache
Örnek
Şeklen şöyle. Burada max, used, committed alanları gösteriliyor. committed alanı used alanından biraz daha fazla. max ise bir ara zıplama yapmış. Bu alanlar Java uygulaması tarafından toplam kullanılan belleği göstermez. Açıklaması burada





27 Ağustos 2022 Cumartesi

SpringMVC RestTemplate Error Handling

Giriş
Açıklaması şöyle
By default, the RestTemplate will throw one of these exceptions in the case of an HTTP error:

HttpClientErrorException – in the case of HTTP status 4xx
HttpServerErrorException – in the case of HTTP status 5xx
UnknownHttpStatusCodeException – in the case of an unknown HTTP status
All of these exceptions are extensions of RestClientResponseException.
Şeklen şöyle


Bu exception'ların fırlatılmasını sağlayan sınıf DefaultResponseErrorHandler. Örneğin olmayan bir URL'ye GET gönderirsek şu sonucu alırız
Response body
{
  "timestamp": 1661608573837,
  "status": 500,
  "error": "Internal Server Error",
  "path": "/exchangerate"
}
Hatalı apiKey gönderirse şu sonucu alırız
{
  "timestamp": 1661632797098,
  "status": 500,
  "error": "Internal Server Error",
  "path": "/exchangerate"
}
Hatalı Json gönderirsek şu sonucu alırız. Yani fırlatılan exception ismi, ve exception mesajı da var.
{
 "timestamp": 1500597044204,
 "status": 400,
 "error": "Bad Request",
 "exception": "org.springframework.http.converter.HttpMessageNotReadableException",
 "message": "JSON parse error: Unrecognized token 'three': was expecting ('true', 'false' or 'null'); nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'aaa': was expecting ('true', 'false' or 'null')\n at [Source: java.io.PushbackInputStream@cba7ebc; line: 4, column: 17]",
 "path": "/birds"
}
1. RestClientResponseException Sınıfı Yakalamak
Örnek - ResponseEntity 
Çıktıyı değiştirmek için bu exception'ı yakalayıp kendi istediğimiz bir ResponseEntity nesnesini dönmek gerekir. Şöyle yaparız
@ExceptionHandler(RestClientResponseException.class)
public ResponseEntity<String> restClientResponseExceptionHandler(
  RestClientResponseException exception) {

  return ResponseEntity.status(exception.getRawStatusCode())
    .body(exception.getResponseBodyAsString());
}
Hatalı apiKey gönderirse 401 alırız ve bu sefer çıktı şöyle olur
{"message":"Invalid authentication credentials"}
Örnek - Kendi Sınıfımız 
Elimizde şöyle bir kod olsun
@Data
public class ErrorResponse {

  int status;
  String message;
  String path;
}
Şöyle yaparız
@ExceptionHandler(RestClientResponseException.class)
public ResponseEntity<ErrorResponse> restClientResponseExceptionHandler(
  RestClientResponseException exception, 
  HttpServletRequest httpServletRequest ) {

  ErrorResponse errorResponse = new ErrorResponse();
  errorResponse.setStatus(exception.getRawStatusCode());
  errorResponse.setMessage(exception.getResponseBodyAsString());
  errorResponse.setPath(httpServletRequest.getRequestURI());
  return new ResponseEntity(errorResponse, 
                            HttpStatus.valueOf(exception.getRawStatusCode()));
}
401 alırız ve çıktı şöyledir
{
  "status": 401,
  "message": "{\"message\":\"Invalid authentication credentials\"}",
  "path": "/exchangerate"
}
2. Kendi Error Handler Sınıfımız
Burada amaç fırlatılan exception'nı değiştirmek. Önümüzde iki seçenek var
1. ResponseErrorHandler  arayüzünden kalıtmak
2. DefaultResponseErrorHandler sınıfından kalıtmak

Örnek - DefaultResponseErrorHandler Kalıtımı
Şöyle yaparız.
restTemplate.setErrorHandler(new DefaultResponseErrorHandler() {
  @Override
   public boolean hasError(ClientHttpResponse response)
    throws IOException {
    try {
      //Do your stuff
      return super.hasError(response);
    } catch (Exception e) {
      ...;
       return true;
    }
  }

  @Override
   public void handleError(ClientHttpResponse response)
    throws IOException {
    try {
      //Do your stuff
      super.handleError(response);
    } catch (Exception e) {
      ...
      throw e;
    }
  }
});


23 Ağustos 2022 Salı

SpringSecurity OAuth2 ResourceServerConfigurerAdapter Sınıfı - Kendi OAuth2 Resource Sunucumuz İçin

Örnek
Şöyle yaparız
@EnableWebSecurity
public class OAuth2ResourceServerSecurityConfiguration {


  @Value("${spring.security.oauth2.resourceserver.jwt.jwk-set-uri}")
  private String jwkSetUri;

  @Value("${spring.security.oauth2.resourceserver.jwt.jws-algorithm}")
  private String jwsAlgorithm;

  private static final String COUNTRIES_RESOURCE_PATH = "countries";

  @Bean
  public SecurityFilterChain securityFilterChain(HttpSecurity http) throws Exception {

    http
      .authorizeHttpRequests((authorize) -> authorize
        .mvcMatchers(HttpMethod.GET, "/" + COUNTRIES_RESOURCE_PATH + "/**")
        .hasAnyRole(Role.MANAGER.getValue(), Role.USER.getValue())
        .mvcMatchers(HttpMethod.POST, "/" + COUNTRIES_RESOURCE_PATH + "/**")
        .hasAnyRole(Role.MANAGER.getValue())
        .mvcMatchers(HttpMethod.PUT, "/" + COUNTRIES_RESOURCE_PATH + "/**")
        .hasAnyRole(Role.MANAGER.getValue())
        .mvcMatchers(HttpMethod.DELETE, "/" + COUNTRIES_RESOURCE_PATH + "/**")
        .hasAnyRole(Role.MANAGER.getValue())
        .anyRequest().authenticated()
      ).sessionManagement(config -> config.sessionCreationPolicy(SessionCreationPolicy.STATELESS)).csrf().disable()
      .oauth2ResourceServer((oauth2) -> oauth2.jwt(
        jwt -> jwt.jwtAuthenticationConverter(jwtAuthenticationConverter())));

    return http.build();
  }

  @Bean
  public JwtDecoder jwtDecoder() {
    return NimbusJwtDecoder.withJwkSetUri(jwkSetUri)
        .jwsAlgorithm(SignatureAlgorithm.from(jwsAlgorithm)).build();
  }

  private Converter<Jwt, ? extends AbstractAuthenticationToken> jwtAuthenticationConverter() {
    JwtAuthenticationConverter jwtConverter = new JwtAuthenticationConverter();
    jwtConverter.setJwtGrantedAuthoritiesConverter(new ResourceRoleConverter());
    return jwtConverter;
  }
}
ResourceRoleConverter şöyle
public class ResourceRoleConverter implements Converter<Jwt, Collection<GrantedAuthority>> {
private static final Logger LOG = LoggerFactory.getLogger(ResourceRoleConverter.class); private static final String REALM_ACCESS = "realm_access"; private static final String ROLES = "roles"; @Override public Collection<GrantedAuthority> convert(Jwt jwt) { try { List<String> roles = JSONObjectUtils.getStringList((Map<String, Object>) jwt.getClaims().get(REALM_ACCESS),ROLES); return roles.stream() .map(roleName -> "ROLE_" + roleName) .map(SimpleGrantedAuthority::new) .collect(Collectors.toList()); } catch (ParseException e) { LOG.error(e.getMessage(),e); throw new RuntimeException("Error while trying to get user roles"); } } }


22 Ağustos 2022 Pazartesi

SpringBoot Actuator - Http Trace Endpoint

Giriş
Açıklaması şöyle
We can trace the application requests also using Actuator. But in recent versions of Spring Boot, it is disabled by default. So, we can enable that by adding the below config to application.properties.
management.trace.http.enabled=true
Ayrıca HttpTraceRepository isimli bir bean yaratılması gerekir.

Çıktı görmek için şöyle yaparız
http://localhost:8080/actuator/httptrace

HttpTraceRepository Arayüzü
Açıklaması şöyle.
HTTP Tracing can be enabled by providing a bean of type HttpTraceRepository in your application’s configuration. For convenience, Spring Boot offers an InMemoryHttpTraceRepository that store traces for the last 100 request-response exchanges
Örnek
Şöyle yaparız
import org.springframework.boot.actuate.trace.http.HttpTraceRepository;
import org.springframework.boot.actuate.trace.http.InMemoryHttpTraceRepository;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class HttpTraceActuatorConfiguration {

  @Bean
  public HttpTraceRepository httpTraceRepository() {
    return new InMemoryHttpTraceRepository();
  }
}




21 Ağustos 2022 Pazar

SpringBoot Test @Sql Anotasyonu - Entegrasyon Testlerinde Kullanılır

Giriş
Testcontainer kullanırken veri tabanını yaratmak gerekir. Seçenekler için açıklama şöyle
Create database schema
There are several ways to create database schema: 

1. Let Hibernate generate and execute DDL automatically;
2. Use DB versioning scripts (Flyway or Liquibase);
3. Create schema.sql DDL manually and run it using Spring Boot settings.
Daha sonra veri tabanını doldurmak gerekir. Seçenekler için açıklama şöyle
To fill the database with test data, there are three main options: 

1. Insert data in the test code;
2. Use @Sql annotation for tests;
3. Use data.sql file and run it using Spring Boot settings.
The first option is good if you have a very small test dataset, literally a couple of tables and a couple of records for each table. When the database grows, we need to update tons of code: copy common data, keep track of references etc.

The @Sql annotation is great to run SQL scripts to fill the database with test-specific data. For every test we can create small maintainable scripts, and we can see their names right above the test methods. That's convenient.

As for the data.sql file – we can use it together with schema.sql. It can be used to fill the database with common reference data like countries, cities, etc.
Örnek
Şöyle yaparız
import org.springframework.test.context.jdbc.Sql;

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class OrderApiIntegrationTest {

  @LocalServerPort
  private int port;

  @Autowired
  private TestRestTemplate restTemplate;

  @Autowired
  private OrderRepository orderRepository;

  @Autowired
  private OrderService orderService;

  private static HttpHeaders headers;

  private final ObjectMapper objectMapper = new ObjectMapper();

  @BeforeAll
  public static void init() {
    headers = new HttpHeaders();
    headers.setContentType(MediaType.APPLICATION_JSON);
  }

  @Test
  @Sql(statements = "INSERT INTO orders(id, buyer, price, qty) VALUES (22, 'john', 24.0, 1)", 
    executionPhase = Sql.ExecutionPhase.BEFORE_TEST_METHOD)
  @Sql(statements = "DELETE FROM orders WHERE id='22'", 
    executionPhase = Sql.ExecutionPhase.AFTER_TEST_METHOD)
  public void testOrdersList() {
    HttpEntity<String> entity = new HttpEntity<>(null, headers);
    ResponseEntity<List<Order>> response = restTemplate.exchange(
      createURLWithPort(), HttpMethod.GET, entity, 
      new ParameterizedTypeReference<List<Order>>(){});

    List<Order> orderList = response.getBody();
    assert orderList != null;
    assertEquals(response.getStatusCodeValue(), 200);
    assertEquals(orderList.size(), orderService.getOrders().size());
    assertEquals(orderList.size(), orderRepository.findAll().size());
  }

  private String createURLWithPort() {
    return "http://localhost:" + port + "/api/orders";
  }
}



9 Ağustos 2022 Salı

SpringKafka Consumer Reactor Kafka

Giriş
Önce bir tane KafkaReceiver bean yaratmak gerekir. Şöyle yaparız
protected Map<String, Object> kafkaConsumerProperties() {
Map<String, Object> kafkaPropertiesMap = new HashMap<>(); kafkaPropertiesMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); kafkaPropertiesMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); kafkaPropertiesMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); kafkaPropertiesMap.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class); kafkaPropertiesMap.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, KafkaAvroDeserializer.class); ... return kafkaPropertiesMap; } protected ReceiverOptions<K, V> kafkaReceiverOptions() { ReceiverOptions<K, V> options = ReceiverOptions.create(kafkaConsumerProperties()); return options.pollTimeout(Duration.ofMillis(pollTimeout)) .subscription(List.of(consumerTopicName)); } @Bean KafkaReceiver<K, V> kafkaReceiver() { return KafkaReceiver.create(kafkaReceiverOptions()); }
Burada ErrorHandlingDeserializer kullanılıyor. KafkaReceiver nesnesinin receive(), receiveAtmostOnce(), receiveAutoAck(), receiveExactlyOnce() metodlarından birisini çağırmak gerekiyor. Şöyle yaparız
@EventListener(ApplicationStartedEvent.class)
public Disposable startKafkaConsumer() {
  return kafkaReceiver.receive()
    .doOnError(error -> log.error("Error receiving event, will retry", error))
    .retryWhen(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofMinutes(1)))
    .doOnNext(record -> log.debug("Received event: key {}", record.key()))
    .concatMap(this::handleEvent)
    .subscribe(record -> record.receiverOffset().acknowledge());
}
Event handling için 3 tane metod kullanılabilir. Açıklaması şöyle
Choice of Event Handling Operator and Acknowledgement
Once an event is received, it must be processed by the application and subsequently acknowledged. This sequencing provides at-least-once delivery semantics - other types of semantics (at-most-once, exactly-once) may cause the pipeline to look differently. The pattern we propose delegates the responsibility of event handling to a separate method called handleEvent, which always returns the ReceiverRecord used by the subscriber to acknowledge the offset (this method is described in detail in the next section). However, the operator that we choose to call this method has a critical impact on the behavior of the consumer. Let's analyze three different options:

flatMap - this operator applies the provided mapper function to create inner publishers to which it then subscribes eagerly. Provided that these inner publishers are non-blocking, they will be subscribed to in parallel, and the elements produced downstream are not guaranteed to preserve the order in which the original elements (the Kafka events) were received from upstream. In the case of our Kafka consumer, this means that the Kafka events will be processed in parallel, and the offsets will be committed as each event is handled and passed downstream. But whenever one offset is committed, it implicitly commits all the lower offsets. Imagine that the processing of one event finishes and its offset is committed, but later on, the processing of another event with a lower offset fails: the second event will not be re-processed since we already implicitly committed its offset. This can be problematic, especially in cases where at-least-once semantics is required, and it's an important consideration to keep in mind when deciding to use flatMap (Reactor Kafka has recently implemented an out-of-order commits feature to mitigate precisely this issue)

flatMapSequential - much like flatMap, this operator subscribes to the inner publishers eagerly; however, the difference here is that flatMapSequential will publish elements downstream in the same order in which they were originally received from upstream (this is done by delaying publishing if needed, to preserve the order). The fact that events will still be processed in parallel can come with performance benefits in scenarios where this does not impact correctness, e.g., where events refer to distinct entities and can be processed in any order. In addition, the preservation of the sequence will ensure that the offsets are committed in order and thus avoid the problem described above. Of course, deferring the offset commit also increases the risk of duplicate event processing in case the consumer crashes, which is something the application must be prepared to handle (e.g., by ensuring the event processing is idempotent)

concatMap - unlike the two previous operators, concatMap creates and subscribes to the inner publishers sequentially. This is extremely important in scenarios where the events must be processed in the exact order in which they were read from the partition.
Daha sonra event handling kodu şöyledir. Eğer hiç hata yoksa processEvent() metodu çağrılır
/*
  This method will handle the received event and then re-publish it regardless of the 
  result.
  The method must never return an error signal as that will terminate the main consumer 
  pipeline.
 */
private Mono<ReceiverRecord<String, Event>> handleEvent(
  ReceiverRecord<String, Event> record) {

  return Mono.just(record)
    .map(KafkaDeserializerUtils::extractDeserializerError)
    .<Event> handle((result, sink) -> {
      if (result.getT2().isPresent() && Objects.nonNull(result.getT1().value())) {
        // Deserialization error processing
        log.error("Deserialization error encountered", result.getT2().get());
      } else {
        // Publish the event value downstream
        sink.next(result.getT1().value());
      }
    })
    .flatMap(businessLayerService::processEvent)
    .doOnError(ex -> log.warn("Error processing event: key {}", record.key(), ex))
    .onErrorResume(ex -> Mono.empty())
    .doOnNext(record -> log.debug("Successfully processed event: key {}", record.key()))
    .then(Mono.just(record));
}