31 Ekim 2023 Salı

Protocol Buffers Kullanımı

Maven
Şu satırı dahil ederiz
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-protobuf</artifactId>
</dependency>
2. src/main/proto/ dizinine proto dosyasını tanımlarız
Örnek
Şöyle yaparız
syntax = "proto3";
package example;

message Product {
    string id = 1;
    string name = 2;
    double price = 3;
}
3. plugin tanımlarız
Örnek
Şöyle yaparız
<!-- pom.xml configuration for protobuf-maven-plugin -->
<build>
    <plugins>
        <plugin>
            <groupId>org.xolstice.maven.plugins</groupId>
            <artifactId>protobuf-maven-plugin</artifactId>
            <version>0.6.1</version>
            ...
        </plugin>
    </plugins>
</build>
4. Rest Controller tanımlarız
Örnek
Şöyle yaparız
@RestController
public class ProductController {

    @PostMapping("/product")
    public Product addProduct(@RequestBody Product product) {
        // Business logic here
        return product;
    }
}
Açıklaması şöyle
When a request is received, Spring will automatically handle the deserialization from the Protocol Buffers format to a Product object. Similarly, the response will be automatically serialized to the Protocol Buffers format.



26 Ekim 2023 Perşembe

SpringJMX application.properties

Örnek
Şöyle yaparız
spring:
  jmx:
    enabled: true
    default-domain: <package_name>

25 Ekim 2023 Çarşamba

SpringWebFlux Sink Sınıfı

Giriş
Şu satırı dahil ederiz 
import reactor.core.publisher.Sink;
Sanırım bir Sink asenkron programlama ile Reactor arasındaki köprüyü sağlıyor. Açıklaması şöyle
Sink allows to programmatically push reactive streams signals.
Örnek
Elimizde şöyle bir kod olsun
public Mono<Void> index(T doc) {
  IndexRequest req = indexRequest(doc);
  return Mono.create(sink -> 
    client.indexAsync(req,RequestOptions.DEFAULT,new IndexActionListener<>(sink)));
}
Burada Elasticsearch HighLevelRestClient kullanılıyor. Bu kütüphane hep listener'lar ile çalışır. Şöyle yaparız
@RequiredArgsConstructor
public class IndexActionListener<T> implements ActionListener<IndexResponse> {

  private final MonoSink<T> sink;

  @Override
  public void onResponse(IndexResponse res) {
    if (res.status().getStatus() < 400) {
      sink.success();
      return;
    }

    sink.error(new RuntimeException(res.toString()));
  }

  @Override
  public void onFailure(Exception e) {
    sink.error(e);
  }
}



24 Ekim 2023 Salı

SpringBatch StepBuilder Dealing With Errors

faultTolerant metodu
Örnek
Şöyle yaparız.
Step personPorcessStep(String someParam) throws Exception {
  return stepBuilderFactory.get("personProcessStep")
    .<PersonInput, PersonOutput>chunk(1)
    .reader(new PersonReader(someParam))
    .faultTolerant()
    .skipPolicy(new DataDuplicateSkipper())
    .processor(new PersonProcessor())
    .writer(new PersonWriter())
    .build();
}
listener metodu - Logging bad records
Örnek - onReadError
Elimizde şöyle bir kod olsun
public class TitleItemListenner extends ItemReadListener {

  private final static Logger logger = LoggerFactory.getLogger(TitleItemListenner.class);

  @Override
  public void onReadError(Exception ex) {
    if (ex instanceof FlatFileParseException parseException){
      String builder = "an error has occurred when reading "
                    + parseException.getLineNumber()
                    + " the line. Here are its details about the bad input\n "
                    + parseException.getInput()
                    + "\n";

      logger.error(builder,parseException);
    } else {
      logger.error("An error occur ", ex);
    }
  }
}
Şöyle yaparız
// step that load the content of the csv with titlecsvReader
// wrote them via titleItemWriter
@Bean 
public Step loadCsv(JobRepository jobRepository, 
                    PlatformTransactionManager transactionManager, 
                    FlatFileItemReader<Title> titleCsvReader){

  return new StepBuilder("load csv",jobRepository)
    .<Title,Title>chunk(10,transactionManager)
    .reader(titleCsvReader)
    .writer(titleItemWriter())
    .faultTolerant()
    .skipLimit(100)
    .skip(Exception.class)
    .listener(new TitleItemListenner())// adding the listenner for logging bad records
    .build();
}
Örnek - afterStep
Elimizde şöyle bir kod olsun
@Component
public class StepCompletionNotificationListener implements StepListener {

  @BeforeStep
  public void beforeStep(StepExecution stepExecution){
    log.info("Step {} is started time {}",
      stepExecution.getStepName(),stepExecution.getStartTime());
  }

  @AfterStep
  public ExitStatus afterStep(StepExecution stepExecution){
    log.info("Step {} is ended time {}",
      stepExecution.getStepName(),stepExecution.getEndTime());
    return stepExecution.getExitStatus() == ExitStatus.COMPLETED ? 
      ExitStatus.COMPLETED : ExitStatus.FAILED;
  }
}
Şöyle yaparız
@Bean
public Step stepRealNewsReport(
  @Qualifier("readerNewsReportByRealStatus") ItemReader<NewsReport> itemReader, 
  @Qualifier("writerRealNewsReportTable") ItemWriter<RealNews> itemWriter, 
  StepCompletionNotificationListener listener) {

  return stepBuilderFactory
    .get("stepRealNewsReport")
    .listener(listener)
    .<NewsReport, RealNews>chunk(50)
    .reader(itemReader)
    .processor(realNewsProcessor())
    .writer(itemWriter)
    .build();
}
Örnek - afterStep
Elimizde şöyle bir kod olsun
public class EmptyInputFailer implements StepExecutionListener {
  @Override
  public ExitStatus afterStep(StepExecution stepExecution) {
    if (stepExecution.getReadCount() > 0){
      return stepExecution.getExitStatus();
   }
   else return ExitStatus.FAILED;
  }
}
Örnek
Şöyle yaparız
// step that load the content of the csv with titlecsvReader
// wrote them via titleItemWriter
 @Bean 
public Step loadCsv(JobRepository jobRepository, 
                    PlatformTransactionManager transactionManager, 
                    FlatFileItemReader<Title> titleCsvReader){

  return new StepBuilder("load csv",jobRepository)
    .<Title,Title>chunk(10,transactionManager)
    .reader(titleCsvReader)
    .writer(titleItemWriter())
    .listener(new EmptyInputFailer())
}
retryLimit metodu
Açıklaması şöyle
Spring Batch provides comprehensive support for error handling and retrying. Developers can configure retry policies for each step in the job, enabling the application to recover from transient errors such as network failures or database timeouts.
Örnek
Şöyle yaparız
@Bean
public Step stockStatsStep() {
  return stepBuilderFactory.get("stockStatsStep")
    .<StockPrice, StockStats>chunk(100)
    .reader(stockPriceItemReader())
    .processor(stockPriceItemProcessor())
    .writer(stockStatsItemWriter())
    .faultTolerant()
    .retryLimit(3)
    .retry(MyException.class)
    .build();
}

public class MyException extends Exception {

  public MyException(String message) {
   super(message);
  }
}
Açıklaması şöyle
In this implementation, we have configured the `stockStatsStep` step to be fault-tolerant with a retry limit of 3. We have also specified that we want to retry the step in case of an exception of type `MyException`.

If an exception occurs during the processing of a chunk, the step will be retried up to the configured limit. If the retry limit is exceeded, the step will fail, and the job will stop.

We have defined a custom exception called `MyException` that extends the `Exception` class. This exception can be thrown by the `StockPriceItemProcessor` or any other component in the step that needs to be retried.

You can customize these implementations based on your specific error handling and retry requirements. Spring Batch provides many other features and options for error handling, such as skip and recovery, which can also be configured for each step in the job.
skip metodu - Skipping record
Açıklaması şöyle
skip : Determine the exception (exception class) to skip

skipLimit : The number of times you want to make a skip

noSkip : Identify the exception you don’t want to skip
Örnek
Şöyle yaparız
// step that load the content of the csv with titlecsvReader
// wrote them via titleItemWriter
@Bean 
public Step loadCsv(JobRepository jobRepository, 
                    PlatformTransactionManager transactionManager, 
                    FlatFileItemReader<Title> titleCsvReader){

  return new StepBuilder("load csv",jobRepository)
    .<Title,Title>chunk(10,transactionManager)
    .reader(titleCsvReader)
    .writer(titleItemWriter())
    .faultTolerant() // tell to spring batch that this step can face errors
    .skip(Exception.class) // skipp all Exception 
    .noSkip(FlatFileParseException.class) // but do not skip this one 
    .skipLimit(20) // the the number of times you want to skip Exeception.class
    .build();
}


SpringBatch KafkaItemWriter Sınıfı

Örnek
Şöyle yaparız
@Configuration
public class TitleBatchConfig { private final KafkaTemplate<String,Title> template; @Bean public KafkaItemWriter<String,Title> titleItemWriter(){ return new KafkaItemWriterBuilder<String,Title>() .kafkaTemplate(this.template) .itemKeyMapper(Title::id) .build(); } ... }


23 Ekim 2023 Pazartesi

SpringWebFlux Mono.timeout metodu

Örnek
Şöyle yaparız
webClient.get()
    .uri("/endpoint")
    .retrieve()
    .bodyToMono(String.class)
    .timeout(Duration.ofSeconds(10));

SpringWebFlux Mono.retryWhen metodu

Örnek
Şöyle yaparız
this.retryBackoffSpec = Retry.backoff(maxAttempts, Duration.ofSeconds(backoffSeconds))
.doBeforeRetry(retrySignal -> log.debug("Waiting {} seconds. Retry #{} of {} after exception: {}", backoffSeconds, (retrySignal.totalRetriesInARow()+1), maxAttempts, retrySignal.failure().getLocalizedMessage() )) .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> retrySignal.failure()); this.webClient.post().uri(uri) .bodyValue(emailText) .retrieve() … .bodyToMono(CtfdUserResponse.class) .retryWhen(retryBackoffSpec) .block();
Örnek
Şöyle yaparız
WebClient webClient = WebClient.builder()
  .baseUrl("http://example.com")
  .build();

Mono<String> response = webClient.get()
    .uri("/retry-endpoint")
    .retrieve()
    .bodyToMono(String.class)
    // number of retries and backoff configuration
    .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
      // maximum backoff time
      .maxBackoff(Duration.ofSeconds(10))) 
    // fallback if retries all fail
    .onErrorResume(e -> Mono.just("Fallback response")); 

response.subscribe(result -> System.out.println(result));
Örnek
Şöyle yaparız
Predicate<Throwable> exceptionFilter() {
  return throwable -> throwable instanceof RuntimeException
      || (throwable instanceof WebClientResponseException
          && (throwable.getStatusCode() == HttpStatus.GATEWAY_TIMEOUT
              || throwable.getStatusCode() == HttpStatus.SERVICE_UNAVAILABLE
              || throwable.getStatusCode() == HttpStatus.BAD_GATEWAY));
}

Retry retry = Retry.backoff(3, Duration.ofSeconds(2))
    .jitter(0.7)
    .filter(exceptionFilter())
    .onRetryExhaustedThrow((retrySpec, retrySignal) -> {
      log.error("Service at {} failed to respond, after max attempts of: {}", 
        uri, retrySignal.totalRetries());
      return retrySignal.failure();
    });



SpringWebFlux Transitioning from RestTemplate to WebClient in Spring Boot

GET
Örnek
Şöyle yaparız
RestTemplate restTemplate = new RestTemplate();
ResponseEntity<String> response = restTemplate.getForEntity("http://example.com", String.class);


WebClient webClient = WebClient.create();
Mono<String> response = webClient.get()
    .uri("http://example.com")
    .retrieve()
    .bodyToMono(String.class);
response.subscribe(result -> System.out.println(result));
Handling Errors
Açıklaması şöyle
RestTemplate’s error handling occurs through the ErrorHandler interface, which requires a separate block of code. WebClient streamlines this with more fluent handling.
Örnek
Şöyle yaparız
WebClient webClient = WebClient.create();
webClient.get()
    .uri("http://example.com/some-error-endpoint")
    .retrieve()
    .onStatus(HttpStatus::isError, response -> {
        // Handle error status codes
        return Mono.error(new CustomException("Custom error occurred."));
    })
    .bodyToMono(String.class);
Açıklaması şöyle
The onStatus() method allows for handling specific HTTP statuses directly within the chain of operations, providing a more readable and maintainable approach.
POST
Örnek
Şöyle yaparız
RestTemplate restTemplate = new RestTemplate();
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<String> request = new HttpEntity<>("{\"key\":\"value\"}", headers);
ResponseEntity<String> response = restTemplate
  .postForEntity("http://example.com",  request, String.class);

WebClient webClient = WebClient.create();
Mono<String> response = webClient.post()
    .uri("http://example.com")
    .contentType(MediaType.APPLICATION_JSON)
    .bodyValue("{\"key\":\"value\"}")
    .retrieve()
    .bodyToMono(String.class);
Asynchronous Processing




19 Ekim 2023 Perşembe

SpringData JPA @Lock Anotasyonu - PESSIMISTIC_WRITE İle Dağıtık Ortamda Scheduling

Giriş
LockModeType = PESSIMISTIC_WRITE kullanınca şöyle bir SQL üretiyor.
'select id from table where id = ? for update wait 5'
Örnek
Şöyle yaparız.
interface WidgetRepository extends Repository<Widget, Long> {

  @Lock(LockModeType.PESSIMISTIC_WRITE)
  Widget findOne(Long id);
}
Timeout 
Açıklaması şöyle
To lock entities pessimistically, set the lock mode to PESSIMISTIC_READPESSIMISTIC_WRITE, or PESSIMISTIC_FORCE_INCREMENT.

If a pessimistic lock cannot be obtained, but the locking failure doesn’t result in a transaction rollback, a LockTimeoutException is thrown.

Pessimistic Locking Timeouts

The length of time in milliseconds the persistence provider should wait to obtain a lock on the database tables may be specified using the javax.persistence.lock.timeout property. If the time it takes to obtain a lock exceeds the value of this property, a LockTimeoutException will be thrown, but the current transaction will not be marked for rollback. If this property is set to 0, the persistence provider should throw a LockTimeoutException if it cannot immediately obtain a lock.

If javax.persistence.lock.timeout is set in multiple places, the value will be determined in the following order:

1. The argument to one of the EntityManager or Query methods.
2. The setting in the @NamedQuery annotation.
3. The argument to the Persistence.createEntityManagerFactory method.
4. The value in the persistence.xml deployment descriptor.
Dağıtık Ortamda Scheduling
Örnek
Elimizde şöyle bir kod olsun
@Repository
public interface TaskLockRepository extends JpaRepository<TaskLockEntity, String> {

  @Lock(LockModeType.PESSIMISTIC_WRITE)
  @QueryHints({
    @QueryHint(name = "jakarta.persistence.lock.timeout", value = "1000")
  })
  Optional<ProviderLockEntity> findByTaskIdAndLastExecutionLessThan(String taskId,
    long timestamp);
}
Veri tabanı şöyle olsun
CREATE TABLE task_lock
(
    task_id         VARCHAR(64)       NOT NULL,
    last_execution   bigint DEFAULT 0 NOT NULL,
    PRIMARY KEY (task_id)
);
INSERT INTO task_lock (task_id) VALUES ('scrap_website');
INSERT INTO task_lock (task_id) VALUES ('move_to_cold');
Bu kodu şöyle kullanırız
@Configuration
@EnableScheduling
public class TaskScheduler {

  private final TaskLockRepository taskLockRepository;
  // Other dependencies

  @Transactional
  @Scheduled(fixedDelay = 5, timeUnit = TimeUnit.MINUTES)
  public void scrapSourceWebsite() {
    long currentTime = System.currentTimeMillis();
    long scheduledRate = Duration.of(1, ChronoUnit.HOURS).toMillis(); // 1h
    taskLockRepository.findByTaskIdAndLastExecutionLessThan("scrap_website",
      currentTime - scheduledRate)
      .ifPresent(scrapingTask -> {
        // Execute scraping task...
        scrapingTask.setLastExecution(System.currentTimeMillis());
      });
  }
  // Other tasks
}
1. Burada @Transactional ile scrap_website satırı  eğer last_execution değeri son 1 saatten eski ise kilitleniyor. 
2. LockModeType.PESSIMISTIC_WRITE kullanıldığı için  dağıtık ortamda sadece bir iş bu satırı kilitleyebilir
3. Eğer scrapingTask exception fırlatırsa Transaction rollback edilir. Yani satır kilitli kalmaz
4. Eğer servis çökerse Transaction bu sefer veri tabanı tarafından rollback edilir. Yani satır kilitli kalmaz





9 Ekim 2023 Pazartesi

Swagger @Hidden Anotasyonu

Giriş
Şu satırı dahil ederiz
import io.swagger.v3.oas.annotations.Hidden;
Örnek
Şöyle yaparız
@RestController
@Hidden
public class HiddenController {
    @GetMapping("/hiddenEndpoint")
    public String hiddenEndpoint() {
        return "This endpoint will be hidden.";
    }
}
Örnek
Şöyle yaparız
@RestController
public class MixedVisibilityController {
    @GetMapping("/publicEndpoint")
    public String publicEndpoint() {
        return "This is a public endpoint.";
    }

    @Hidden
    @GetMapping("/privateEndpoint")
    public String privateEndpoint() {
        return "This is a private endpoint.";
    }
}

4 Ekim 2023 Çarşamba

SpringWebFlux Mono.concatMap metodu

concatMap  vs flatMap
concatMap girdiyi sırayla işler

Örnek
Elimizde şöyle bir kod olsun
void flatMapVsConcatMap() throws InterruptedException {
   Observable.just(5, 2, 4, 1)
    .flatMap(
      second -> Observable.just("Emit delayed with " + second + " second")
          .delay(second, TimeUnit.SECONDS)
    )
    .subscribe(System.out::println, Throwable::printStackTrace );
    Thread.sleep(15_000);
}
Çıktı şöyle. flatMap sırayı korumadı.
Emit delayed with 1 second
Emit delayed with 2 second
Emit delayed with 4 second
Emit delayed with 5 second
Açıklaması şöyle
flatMap tries to start as many possible.
Elimizde şöyle bir kod olsun
void flatMapVsConcatMap() throws InterruptedException {
  Observable.just(5, 2, 4, 1)
    .concatMap(
      second ->
        Observable.just("Emit delayed with " + second + " second")
          .delay(second, TimeUnit.SECONDS)
    )
    subscribe(System.out::println,Throwable::printStackTrace);

  Thread.sleep(15_000);
}
Çıktı şöyle. concatMap sırayı korudu
Emit delayed with 5 second
Emit delayed with 2 second
Emit delayed with 4 second
Emit delayed with 1 second

2 Ekim 2023 Pazartesi

SpringCloud Kubernetes

Maven
Şu satırı dahil ederiz
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter</artifactId>
</dependency>

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-kubernetes-fabric8-leader</artifactId>
  <version>3.0.2</version>
</dependency>
<dependency>
  <groupId>org.awaitility</groupId>
  <artifactId>awaitility</artifactId>
  <version>3.1.2</version>
  <scope>test</scope>
</dependency>
Örnek - Leader Election
Açıklaması şöyle. Yani aslında altta Spring Integration kullanılıyor. Sonuçlar CongigMap'te saklanıyor
The Spring Cloud Kubernetes leader election mechanism implements the leader election API of Spring Integration using a Kubernetes ConfigMap.

Multiple application instances compete for leadership, but leadership will only be granted to one. When granted leadership, a leader application receives an OnGrantedEvent application event with leadership Context. Applications periodically attempt to gain leadership, with leadership granted to the first caller. A leader will remain a leader until either it is removed from the cluster, or it yields its leadership. When leadership removal occurs, the previous leader receives OnRevokedEvent application event. After removal, any instances in the cluster may become the new leader, including the old leader.
Şöyle yaparız
spring.cloud.kubernetes.leader.role=world
# Configmap to which leader election metadata will be saved
spring.cloud.kubernetes.leader.config-map-name=my-config-map
Elimizde şöyle bir kod olsun
import org.springframework.integration.leader.Context;
import org.springframework.stereotype.Service;

@Service
public class ManagerContext {
  private Context context;


  public Context getContext() {
    return context;
  }

  public void setContext(Context context) {
    this.context = context;
  }
}
Lider kontrolü için şöyle yaparız
@Component
public class ScheduledTasks {

  private ManagerContext managerContext;
  public ScheduledTasks(ManagerContext managerContext){
    this.managerContext = managerContext;
  }
  ...
  private boolean isLeader() {
    // Logic to check if this instance is the leader
    // Modify the logic to check real leadership
    return managerContext.getContext() != null; 
  }
}
Lider mesajlarını işleyen kod şöyledir
}aa@RestController
@RequestMapping("/leader")
public class LeaderController {

 @Value("${spring.cloud.kubernetes.leader.role}")
 private String role;

 private Context context;

 private ManagerContext managerContext;

 public LeaderController(ManagerContext managerContext)  {
    
  this.managerContext =managerContext;
 }

 /**
  * Return a message whether this instance is a leader or not.
  * @return info
  */
 @GetMapping
 public String getInfo() {
  if (this.context == null) {
   return String.format("I am '%s' but I am not a leader of the '%s'", this.host, this.role);
  }
  return String.format("I am '%s' and I am the leader of the '%s'", this.host, this.role);
 }

 /**
  * PUT request to try and revoke a leadership of this instance. If the instance is not
  * a leader, leadership cannot be revoked. Thus "HTTP Bad Request" response. If the
  * instance is a leader, it must have a leadership context instance which can be used
  * to give up the leadership.
  * @return info about leadership
  */
 @PutMapping
 public ResponseEntity<String> revokeLeadership() {
  if (this.context == null) {
   String message = String.format("Cannot revoke leadership because '%s' is not a leader", this.host);
   return ResponseEntity.badRequest().body(message);
  }
  this.context.yield();
  String message = String.format("Leadership revoked for '%s'", this.host);
  return ResponseEntity.ok(message);
 }

 /**
  * Handle a notification that this instance has become a leader.
  * @param event on granted event
  */
 @EventListener
 public void handleEvent(OnGrantedEvent event) {
  System.out.println(String.format("'%s' leadership granted", event.getRole()));
  this.context = event.getContext();
  managerContext.setContext(this.context);
 }

 /**
  * Handle a notification that this instance's leadership has been revoked.
  * @param event on revoked event
  */
 @EventListener
 public void handleEvent(OnRevokedEvent event) {
  System.out.println(String.format("'%s' leadership revoked", event.getRole()));
  this.context = null;
  managerContext.setContext(null);
 }

}
Kubernetes ortamında Role, RoleBinding, ConfigMap yaratılmalıdır. Bunlar için yaml dosyaları burada



SpringMVC ErrorController Arayüzü - Custom ErrorController

Giriş
Şu satırı dahil ederiz
import org.springframework.boot.autoconfigure.web.ErrorController;
Açıklaması şöyle
The WhiteLabel error handling system is built using the ErrorController, which is a part of the Spring Boot framework. By default, this function is taken over by the BasicErrorController, which is responsible for the white-label error-handling features.
Örnek
Şöyle yaparız
import org.springframework.boot.autoconfigure.web.ErrorController;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;

@Controller
public class CustomErrorController implements ErrorController {

  @RequestMapping("/error")
  public String handleError() {
    // Your custom error handling logic goes here
    return "error"; // Return the name of your custom error page
  }

  public String getErrorPath() {
    return "/error";
  }
}
src/main/resources/public
src/main/resources/static
src/main/resources/templates
dizinlerinden birisinde error.html dosyası yaratırız

<!DOCTYPE html>
<html lang="en">
<head>
   <meta charset="UTF-8">
   <meta name="viewport" content="width=device-width, initial-scale=1.0">
   <title>Error Page</title>
</head>
<body>
   <h1>Oops! Something went wrong.</h1>
   <p>We're sorry, but an error occurred.</p>
</body>
</html>