28 Şubat 2022 Pazartesi

SpringData DataAccessException Hiyerarşisi

Giriş
Şu satırı dahil ederiz
import org.springframework.dao.DataAccessException;
DataAccessException soyut bir sınıf. Bu sınıftan kalıtan bir sürü alt sınıf daha var. Şeklen şöyle



Bunlardan en önemlisi NonTransientDataAccessException sınıfı. 

NonTransientDataAccessException Sınıfı
Bu da aslında soyut bir sınıfı. Veri tabanındaki işlemde geçici/uçucu olmayan yani non-transient bir hata olduğunu belirtir. Kalıtan en önemli sınıflar şöyle

DataIntegrityViolationException
Null olmaması gerek bir alan null ise fırlatılır

DuplicateKeyException
Primary key hatası varsa vs. fırlatılır

DataRetrievalFailureException
Sorguda erişilmeye çalışılan primary key yoksa fırlatılır

IncorrectResultSetColumnCountException
Açıklaması burada

IncorrectResultSizeDataAccessException
Örneğin tek satır sonuç yerine daha fazla satır dönerse fırlatılır

DataSourceLookupFailureException
InvalidDataAccessResourceUsageException
BadSqlGrammarException
CannotGetJdbcConnectionException
Açıklaması burada






24 Şubat 2022 Perşembe

SpringCloud Zipkin

Zipkin Nedir?
Açıklaması şöyle
Zipkin is a distributed instrumentation and monitoring tool that will allow us to collect service information and perform searches on it. With the aim of finding problems in the latency of them, mainly through evaluating how long it has taken the execution of those services.
application.properties
Açıklaması şöyle
By default, our services will try to send trace messages to localhost:9411. If OpenZipkin runs at a different address, you need to specify it in the settings of each service:
Örnek
Şöyle yaparız
spring: zipkin: base-url: http://<host>:<port>
Örnek
Zipkin'a RabbitMQ ile mesaj göndermek için şöyle yaparız
spring.zipkin.sender.type=rabbit
spring.zipkin.rabbitmq.queue=zipkin
....
Docker Zipkin Sunucusu
Zipkin sunucusu 9411 numaralı portu kullanır. GUI'ye erişmek için şöyle yaparız
http://localhost:9411/
Örnek
Şöyle yaparız
docker run -d -p 9411:9411 openzipkin/zipkin
Docker Compose
"docker-compose up" ile başlatırız
Örnek
Şöyle yaparız
version: "3.1" services: zipkin: image: openzipkin/zipkin:2 ports: - "9411:9411"
Örnek 
Şöyle yaparız
version: "3.9"

services:
  microservices_postgresql:
    image: postgres:latest
    container_name: microservices_postgresql
    expose:
      - "5432"
    ports:
      - "5432:5432"
    restart: always
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=bank_accounts
      - POSTGRES_HOST=5432
    command: -p 5432
    volumes:
      - ./docker_data/microservices_pgdata:/var/lib/postgresql/data
    networks: [ "microservices" ]

  redis:
    image: redis:latest
    container_name: microservices_redis
    ports:
      - "6379:6379"
    restart: always
    networks: [ "microservices" ]

  prometheus:
    image: prom/prometheus:latest
    container_name: prometheus
    ports:
      - "9090:9090"
    command:
      - --config.file=/etc/prometheus/prometheus.yml
    volumes:
      - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
    networks: [ "microservices" ]

  node_exporter:
    container_name: microservices_node_exporter
    restart: always
    image: prom/node-exporter
    ports:
      - '9101:9100'
    networks: [ "microservices" ]

  grafana:
    container_name: microservices_grafana
    restart: always
    image: grafana/grafana
    ports:
      - '3000:3000'
    networks: [ "microservices" ]

  zipkin:
    image: openzipkin/zipkin:latest
    restart: always
    container_name: microservices_zipkin
    ports:
      - "9411:9411"
    networks: [ "microservices" ]


networks:
  microservices:
    name: microservices
Şeklen şöyle

Her hangi bir isteği genişletirsek şöyle






13 Şubat 2022 Pazar

SpringKafka Consumer SeekToCurrentErrorHandler

Giriş
Şu satırı dahil ederiz
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
Stateful Retry İçindir. Açıklaması şöyle
Stateful retry is not offered by the Java Apache client by itself, but is available as a configuration option out of the box with Spring Kafka, using the SeekToCurrentErrorHandler.
Açıklaması şöyle
An additional feature of the SeekToCurrentErrorHandler is that those events within the batch that have successfully been processed prior to the event that results in a RetryableException being thrown are still able to be successfully marked as consumed, so are not themselves re-delivered too in the next poll.
constructor - Custom ConsumerRecord + BackOff 
Kullanmayın
Örnek - FixedBackOff
Şöyle yaparız
//Stateful retry listener.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> 
  kafkaStatefulRetryListenerContainerFactory
    ConsumerFactory<String, String> consumerFactory) {

  SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler(
    (record, exception) -> {}, 
                           // 4 seconds pause, 4 retries.
                           new FixedBackOff(4000L, 4L));

  ConcurrentKafkaListenerContainerFactory<String, String> factory = 
    new ConcurrentKafkaListenerContainerFactory();
  factory.setConsumerFactory(consumerFactory);
  factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
  factory.setErrorHandler(errorHandler);
  return factory;
}
Örnek - ExponentialBackOff 
Elimizde şöyle bir kod olsun. SeekToCurrentErrorHandler kullanımı ile ilgili bir örnek burada ve burada.
@Configuration
@EnableKafka
public class SpringConfiguration {

  @Bean
  public SeekToCurrentErrorHandler eh() {
    long initialMillis = 500;
    long factor = 2;
    long maxElapsedTimeSecs = 60;
    ExponentialBackOff backoff = new ExponentialBackOff(initialMillis, factor);
    backoff.setMaxElapsedTime(maxElapsedTimeSecs*1000);

    BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer = (rec, exc) ->  {
      // TODO In the final app do something more useful here
      logger.error("* Maximum retry policy has been reached {} - acknowledging and
        proceeding *", rec);
    };

    SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(recoverer, backoff);
    eh.setCommitRecovered(true);
    return eh;
  }
  ...
}
Örnek - FixedBackOff 
Şöyle yaparız
//Stateful retry listener.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> 
  kafkaStatefulRetryListenerContainerFactory
    ConsumerFactory<String, String> consumerFactory) {

  SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler(
    (record, exception) -> {}, 
                           // 4 seconds pause, 4 retries.
                           new FixedBackOff(4000L, 4L));

  ConcurrentKafkaListenerContainerFactory<String, String> factory = 
    new ConcurrentKafkaListenerContainerFactory();
  factory.setConsumerFactory(consumerFactory);
  factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
  factory.setErrorHandler(errorHandler);
  return factory;
}
Örnek - ExponentialBackOff 
Elimizde şöyle bir kod olsun. SeekToCurrentErrorHandler kullanımı ile ilgili bir örnek burada ve burada.
@Configuration
@EnableKafka
public class SpringConfiguration {

  @Bean
  public SeekToCurrentErrorHandler eh() {
    long initialMillis = 500;
    long factor = 2;
    long maxElapsedTimeSecs = 60;
    ExponentialBackOff backoff = new ExponentialBackOff(initialMillis, factor);
    backoff.setMaxElapsedTime(maxElapsedTimeSecs*1000);

    BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer = (rec, exc) ->  {
      // TODO In the final app do something more useful here
      logger.error("* Maximum retry policy has been reached {} - acknowledging and
        proceeding *", rec);
    };

    SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(recoverer, backoff);
    eh.setCommitRecovered(true);
    return eh;
  }
  ...
}
Şöyle yaparız.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
  kafkaListenerContainerFactory(RetryTemplate retryTemplate) {
  ConcurrentKafkaListenerContainerFactory<String, String> factory =
    new ConcurrentKafkaListenerContainerFactory<>();
  factory.setConsumerFactory(consumerFactory());
  factory.setMissingTopicsFatal(missingTopicsFatal);//True in prod, false otherwise

  factory.getContainerProperties().setAckMode
    (ContainerProperties.AckMode.MANUAL_IMMEDIATE);
  factory.getContainerProperties().setSyncCommitTimeout(Duration.ofSeconds(60));
  factory.setStatefulRetry(true);
  factory.setErrorHandler(eh());
  return factory;
}

constructor - Sadece BackOff 
Örnek - FixedBackOff
Şöyle yaparız. Eğer hata varsa tekrar işlemeye çalışır. Her hata arasında 60 saniye bekler.
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<byte[], byte[]>>
kafkaListenerContainerFactory() {
  ConcurrentKafkaListenerContainerFactory<byte[], byte[]> factory =
new ConcurrentKafkaListenerContainerFactory<>();
  factory.setErrorHandler(new SeekToCurrentErrorHandler(
new FixedBackOff(60000L))); // per listener
  factory.setConsumerFactory(consumerFactory());
  return factory;
}

Şöyle yaparız.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
  kafkaListenerContainerFactory(RetryTemplate retryTemplate) {
  ConcurrentKafkaListenerContainerFactory<String, String> factory =
    new ConcurrentKafkaListenerContainerFactory<>();
  factory.setConsumerFactory(consumerFactory());
  factory.setMissingTopicsFatal(missingTopicsFatal);//True in prod, false otherwise

  factory.getContainerProperties().setAckMode
    (ContainerProperties.AckMode.MANUAL_IMMEDIATE);
  factory.getContainerProperties().setSyncCommitTimeout(Duration.ofSeconds(60));
  factory.setStatefulRetry(true);
  factory.setErrorHandler(eh());
  return factory;
}
Örnek FixedBackOff
Şöyle yaparız. Eğer hata varsa tekrar işlemeye çalışır. Her hata arasında 60 saniye bekler.
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<byte[], byte[]>>
kafkaListenerContainerFactory() {
  ConcurrentKafkaListenerContainerFactory<byte[], byte[]> factory =
new ConcurrentKafkaListenerContainerFactory<>();
  factory.setErrorHandler(new SeekToCurrentErrorHandler(
new FixedBackOff(60000L))); // per listener
  factory.setConsumerFactory(consumerFactory());
  return factory;
}