SpringWebFlux etiketine sahip kayıtlar gösteriliyor. Tüm kayıtları göster
SpringWebFlux etiketine sahip kayıtlar gösteriliyor. Tüm kayıtları göster

9 Kasım 2023 Perşembe

SpringWebFlux Mono.thenReturn metodu

Giriş
İş bittikten sonra farklı bir sonuç dönmek içindir

Örnek
Şöyle yaparız
@Override
public Mono<String> deleteStudent(String id) {
  return studentRepository.deleteById(id)
    .thenReturn("Student deleted successfully!");
}


1 Kasım 2023 Çarşamba

SpringWebFlux Flux NettyServerCustomizer Arayüzü

Giriş
Şu satırı dahil ederiz 
import org.springframework.boot.web.embedded.netty.NettyServerCustomizer;
idleTimeout metodu
Açıklaması şöyle. Eğer bu değer verilmezse bağlantı hiç kapanmayabilir.
Reactor-Netty doesn’t have a default idle connection timeout.
Örnek
Şöyle yaparız
@Configuration
public class NettyServerCustomizerConfig {

  @Bean
  public NettyServerCustomizer nettyServerCustomizer() {
    return httpServer -> httpServer.idleTimeout(Duration.ofMillis(1));
  }
}
aynı şeyi şöyle yaparız
server:
  netty:
    idle-timeout: 1000 # 1 second of idle-timeout


25 Ekim 2023 Çarşamba

SpringWebFlux Sink Sınıfı

Giriş
Şu satırı dahil ederiz 
import reactor.core.publisher.Sink;
Sanırım bir Sink asenkron programlama ile Reactor arasındaki köprüyü sağlıyor. Açıklaması şöyle
Sink allows to programmatically push reactive streams signals.
Örnek
Elimizde şöyle bir kod olsun
public Mono<Void> index(T doc) {
  IndexRequest req = indexRequest(doc);
  return Mono.create(sink -> 
    client.indexAsync(req,RequestOptions.DEFAULT,new IndexActionListener<>(sink)));
}
Burada Elasticsearch HighLevelRestClient kullanılıyor. Bu kütüphane hep listener'lar ile çalışır. Şöyle yaparız
@RequiredArgsConstructor
public class IndexActionListener<T> implements ActionListener<IndexResponse> {

  private final MonoSink<T> sink;

  @Override
  public void onResponse(IndexResponse res) {
    if (res.status().getStatus() < 400) {
      sink.success();
      return;
    }

    sink.error(new RuntimeException(res.toString()));
  }

  @Override
  public void onFailure(Exception e) {
    sink.error(e);
  }
}



23 Ekim 2023 Pazartesi

SpringWebFlux Mono.timeout metodu

Örnek
Şöyle yaparız
webClient.get()
    .uri("/endpoint")
    .retrieve()
    .bodyToMono(String.class)
    .timeout(Duration.ofSeconds(10));

SpringWebFlux Mono.retryWhen metodu

Örnek
Şöyle yaparız
this.retryBackoffSpec = Retry.backoff(maxAttempts, Duration.ofSeconds(backoffSeconds))
.doBeforeRetry(retrySignal -> log.debug("Waiting {} seconds. Retry #{} of {} after exception: {}", backoffSeconds, (retrySignal.totalRetriesInARow()+1), maxAttempts, retrySignal.failure().getLocalizedMessage() )) .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> retrySignal.failure()); this.webClient.post().uri(uri) .bodyValue(emailText) .retrieve() … .bodyToMono(CtfdUserResponse.class) .retryWhen(retryBackoffSpec) .block();
Örnek
Şöyle yaparız
WebClient webClient = WebClient.builder()
  .baseUrl("http://example.com")
  .build();

Mono<String> response = webClient.get()
    .uri("/retry-endpoint")
    .retrieve()
    .bodyToMono(String.class)
    // number of retries and backoff configuration
    .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
      // maximum backoff time
      .maxBackoff(Duration.ofSeconds(10))) 
    // fallback if retries all fail
    .onErrorResume(e -> Mono.just("Fallback response")); 

response.subscribe(result -> System.out.println(result));
Örnek
Şöyle yaparız
Predicate<Throwable> exceptionFilter() {
  return throwable -> throwable instanceof RuntimeException
      || (throwable instanceof WebClientResponseException
          && (throwable.getStatusCode() == HttpStatus.GATEWAY_TIMEOUT
              || throwable.getStatusCode() == HttpStatus.SERVICE_UNAVAILABLE
              || throwable.getStatusCode() == HttpStatus.BAD_GATEWAY));
}

Retry retry = Retry.backoff(3, Duration.ofSeconds(2))
    .jitter(0.7)
    .filter(exceptionFilter())
    .onRetryExhaustedThrow((retrySpec, retrySignal) -> {
      log.error("Service at {} failed to respond, after max attempts of: {}", 
        uri, retrySignal.totalRetries());
      return retrySignal.failure();
    });



SpringWebFlux Transitioning from RestTemplate to WebClient in Spring Boot

GET
Örnek
Şöyle yaparız
RestTemplate restTemplate = new RestTemplate();
ResponseEntity<String> response = restTemplate.getForEntity("http://example.com", String.class);


WebClient webClient = WebClient.create();
Mono<String> response = webClient.get()
    .uri("http://example.com")
    .retrieve()
    .bodyToMono(String.class);
response.subscribe(result -> System.out.println(result));
Handling Errors
Açıklaması şöyle
RestTemplate’s error handling occurs through the ErrorHandler interface, which requires a separate block of code. WebClient streamlines this with more fluent handling.
Örnek
Şöyle yaparız
WebClient webClient = WebClient.create();
webClient.get()
    .uri("http://example.com/some-error-endpoint")
    .retrieve()
    .onStatus(HttpStatus::isError, response -> {
        // Handle error status codes
        return Mono.error(new CustomException("Custom error occurred."));
    })
    .bodyToMono(String.class);
Açıklaması şöyle
The onStatus() method allows for handling specific HTTP statuses directly within the chain of operations, providing a more readable and maintainable approach.
POST
Örnek
Şöyle yaparız
RestTemplate restTemplate = new RestTemplate();
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<String> request = new HttpEntity<>("{\"key\":\"value\"}", headers);
ResponseEntity<String> response = restTemplate
  .postForEntity("http://example.com",  request, String.class);

WebClient webClient = WebClient.create();
Mono<String> response = webClient.post()
    .uri("http://example.com")
    .contentType(MediaType.APPLICATION_JSON)
    .bodyValue("{\"key\":\"value\"}")
    .retrieve()
    .bodyToMono(String.class);
Asynchronous Processing




4 Ekim 2023 Çarşamba

SpringWebFlux Mono.concatMap metodu

concatMap  vs flatMap
concatMap girdiyi sırayla işler

Örnek
Elimizde şöyle bir kod olsun
void flatMapVsConcatMap() throws InterruptedException {
   Observable.just(5, 2, 4, 1)
    .flatMap(
      second -> Observable.just("Emit delayed with " + second + " second")
          .delay(second, TimeUnit.SECONDS)
    )
    .subscribe(System.out::println, Throwable::printStackTrace );
    Thread.sleep(15_000);
}
Çıktı şöyle. flatMap sırayı korumadı.
Emit delayed with 1 second
Emit delayed with 2 second
Emit delayed with 4 second
Emit delayed with 5 second
Açıklaması şöyle
flatMap tries to start as many possible.
Elimizde şöyle bir kod olsun
void flatMapVsConcatMap() throws InterruptedException {
  Observable.just(5, 2, 4, 1)
    .concatMap(
      second ->
        Observable.just("Emit delayed with " + second + " second")
          .delay(second, TimeUnit.SECONDS)
    )
    subscribe(System.out::println,Throwable::printStackTrace);

  Thread.sleep(15_000);
}
Çıktı şöyle. concatMap sırayı korudu
Emit delayed with 5 second
Emit delayed with 2 second
Emit delayed with 4 second
Emit delayed with 1 second

24 Ağustos 2023 Perşembe

SpringWebFlux Flux.limitRate metodu - Backpressure İçindir

Giriş
Her seferinde kaynaktan en fazla kaç tane nesne çekileceğini belirtir

Örnek
Şöyle yaparız
// Simulate a data source emitting a continuous stream of data at a high rate
Flux<Integer> dataSource = Flux.range(1, Integer.MAX_VALUE);
  
// Process the data with back pressure using limitRate operator
dataSource
  .limitRate(10) // Control the number of elements emitted per request (back pressure)
  .doOnNext(data -> { // Simulate processing delay
    try {
      Thread.sleep(100);
    }
    catch (InterruptedException e) {
      e.printStackTrace();
    }
    System.out.println("Processed: " + data);
  })
  .subscribe();
Açıklaması şöyle
In this example, the Flux.range(1, Integer.MAX_VALUE) simulates a continuous stream of data. The limitRate(10) operator specifies that the data processing should be limited to 10 elements per request, effectively controlling the back pressure.

As a result, you’ll notice that the data processing rate is limited, and the doOnNext method will print  "Processed: <data>" with a slight delay between each processed element

SpringWebFlux Flux.switchIfEmpty metodu

Giriş
Mono.switchIfEmpty ile kardeştir. Flux boş ise switchIfEmpty() ile belirtilen sonucu döner

Örnek
Şöyle yaparız
Flux<Integer> dataFlux = Flux.fromIterable(List.of()).
  switchIfEmpty(Mono.just(0)) 
  .collectList();

20 Ağustos 2023 Pazar

SpringWebFlux WebTestClient Sınıfı

Giriş
Açıklaması şöyle
Reactive Spring provides WebTestClient to write down integration tests for API endpoints.
get metodu
Örnek
Şöyle yaparız
@Autowired
private WebTestClient webTestClient; @Test void find_all_patients(){ long patientCount = patientRepository.findAll().count().block(); webTestClient.get().uri("/v1/patients") .exchange().expectStatus().isEqualTo(HttpStatus.ACCEPTED) .expectBody() .jsonPath("$").isArray() .jsonPath("$.size()").isEqualTo(patientCount); }
expectStatus metodu
isCreated()isOk()isNotFound() gibi sonuçlar döner

Örnek
Şöyle yaparız
@Test
public void testGetAllUsers() {
    webTestClient.get().uri("/users")
            .exchange()
            .expectStatus().isOk()
            .expectBodyList(User.class);
}

@Test
public void testGetUserById() {
    webTestClient.get().uri("/users/{id}", "user-id")
            .exchange()
            .expectStatus().isOk()
            .expectBody(User.class);
}

@Test
public void testCreateUser() {
    User newUser = new User("new-user-id", "New User");

    webTestClient.post().uri("/users")
            .bodyValue(newUser)
            .exchange()
            .expectStatus().isCreated()
            .expectBody(User.class)
            .isEqualTo(newUser);
}
Örnek
Şöyle yaparız
@Test
public void testGetUserById_NotFound() {
  webTestClient.get().uri("/users/nonexistent-id")
    .exchange()
    .expectStatus().isNotFound();
}

SpringWebFlux StepVerifier Sınıfı

Giriş
Açıklaması şöyle
When testing reactive components, ensure you cover error scenarios using StepVerifier to verify the behavior of your reactive streams in response to different types of errors.
Örnek
Şöyle yaparız
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

public class ReactiveStreamTest {
  @Test
  public void testFlux() {
    Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);

    StepVerifier.create(numbers)
      .expectNext(1, 2, 3, 4, 5)
      .verifyComplete();
  }
  @Test
  public void testTransformations() {
    Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);

    StepVerifier.create(numbers
      .filter(number -> number % 2 == 0)
      .map(evenNumber -> evenNumber * 2)
    )
    .expectNext(4, 8)
    .verifyComplete();
  }
  @Test
  public void testWithError() {
    Flux<Integer> numbers = Flux.just(1, 2, 3)
      .concatWith(Flux.error(new RuntimeException("Oops! An error occurred.")));

    StepVerifier.create(numbers)
      .expectNext(1, 2, 3)
      .expectError(RuntimeException.class)
      .verify();
  }
}

SpringWebFlux Mono.onErrorResume metodu - Fallback Value

Giriş
Açıklaması şöyle
Project Reactor provides several operators to manage errors effectively within reactive streams:

1. onErrorResume and onErrorReturn: These operators allow you to provide fallback values or alternative streams in case of an error. This can help prevent the entire stream from failing and provide a more graceful degradation.
2. doOnError: This operator lets you execute specific actions when an error occurs, such as logging the error or cleaning up resources. It doesn't interfere with the error propagation itself.
3. retry and retryWhen: These operators enable you to automatically retry an operation a specified number of times or based on a certain condition. This can be helpful for transient errors.
Kısaca onErrorResume exception'ı loglar ve varsayılan bir sonuç döndürür

Örnek
Şöyle yaparız
public Mono<User> getUserById(String id) {
  return userRepository.findById(id)
    .onErrorResume(throwable -> {
      log.error("Error occurred while fetching user by id: {}", id, throwable);
      return Mono.just(new User("default", "Default User"));
    });
}
Örnek
Şöyle yaparız
webClient.get()
    .uri("/endpoint")
    .retrieve()
    .bodyToMono(String.class)
    .doOnError(e -> log.error("Error occurred", e))
    .onErrorResume(e -> Mono.just("Fallback value"));

SpringWebFlux Flux.zip metodu

Giriş
Açıklaması şöyle
The zip operator combines elements from two or more reactive streams into pairs, tuples, or other custom objects. It's useful when you need to process elements from multiple streams together.
Örnek
Şöyle yaparız
Flux<Integer> numbers = Flux.just(1, 2, 3);
Flux<String> letters = Flux.just("A", "B", "C");
Flux<String> combined = Flux.zip(numbers, letters, (number, letter) -> number + letter);
Örnek
Şöyle yaparız
Flux<String> endpoint1 = webClient.get()
  .uri("/endpoint1").retrieve().bodyToMono(String.class);
Flux<String> endpoint2 = webClient.get()
  .uri("/endpoint2").retrieve().bodyToMono(String.class);

Flux<String> result = Flux.zip(
  endpoint1, 
  endpoint2, 
  (res1, res2) -> res1 + " " + res2);

5 Temmuz 2023 Çarşamba

SpringWebFlux Flux.onErrorReturn metodu - Exception Olursa Yeni Bir Sonuç Döner

Giriş
Flux kapatılır

Örnek
Şöyle yaparız
Flux<String> stringFlux = Flux.just("Hello", "World", "from", "IntelliJ IDEA")
    .map(s -> {
        if (s.equals("World")) throw new RuntimeException("An error occurred");
        else return s.toUpperCase();
    });

stringFlux
    .onErrorReturn("Error occurred in the stream.")
    .subscribe(System.out::println);
Çıktı şöyle
HELLO
Error occurred in the stream.
Örnek
Şöyle yaparız
Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5)
  .concatWith(Flux.error(new RuntimeException("Oops! An error occurred.")))
  .map(number -> 10 / (number - 3)) // This will cause an ArithmeticException

  .doOnError(throwable -> System.err.println("Error occurred: " + throwable.getMessage()))

  .onErrorReturn(-1); // Provide a fallback value in case of an error

numbers.subscribe(
  value -> System.out.println("Received: " + value),
  error -> System.err.println("Subscriber error: " + error.getMessage())
);

6 Haziran 2023 Salı

SpringWebFlux Flux Hot Publisher

Giriş
Açıklaması şöyle
A hot publisher in Spring WebFlux is a publisher that emits data to all subscribers as soon as it is available. This is in contrast to a cold publisher, which emits data to subscribers only when they subscribe.
Açıklaması şöyle
With Hot Publishers, there will be only one data producer. All Subscribers listen to the data produced by the single data producer. The data is shared.

Imagine a TV station. It does not matter if there is no one to watch the program. It will be emitted regardless. Watchers can start watching anytime they want. But all watchers get the same info at any given moment. 

Watchers would lose the content if they joined late. The same is with the Hot Publishers.
Örnek
Şöyle yaparız
Flux<String> netFlux = Flux.fromStream(ReactiveJavaTutorial::getVideo)
  .delayElements(Duration.ofSeconds(2))
  .share(); // turn the cold publisher into a hot publisher
Backpressure 
Açıklaması şöyle
However, it is important to note that not all publishers respect backpressure. .... These are often termed as 'hot' publishers. Dealing with these publishers requires specific strategies, such as buffering, dropping, or sampling data.
Yani publisher, Backpressure kavramını desteklemez, aboneyi beklemez ve veri üretmeye devam eder.

Örnek
Şöyle yaparız. Böylece araya bir buffer koyulur
Flux.range(1, 100)
  .onBackpressureBuffer(10)
  .subscribe(new BaseSubscriber<Integer>() {
    @Override
    protected void hookOnSubscribe(Subscription subscription) {
      request(10);
    }

    @Override
    protected void hookOnNext(Integer value) {
      // process the value
      System.out.println(value);
      if (value % 10 == 0) {
        request(10);
      }
    }
});
Açıklaması şöyle
In this code, the onBackpressureBuffer operator is used to create a buffer that holds up to 10 items. If the buffer is full, the application will throw a BufferOverflowException.


28 Nisan 2023 Cuma

SpringWebFlux WebClient Test

Giriş
Açıklaması şöyle
Basically, there are two general directions for writing tests for the reactive chain:

1. Directly call the methods to test and receive the outcome by invoking block() on the resulting Mono or Flux.
2. Wrap these calls with the StepVerifier class provided by the Reactor framework.
StepVerifier
Bir Flux veya Mono nesnesini test etmek içindir. Testing sonunda verifyComplete() veya bir türevinin çağrısını yapmak gerekir

assertNext metodu
Örnek
Şöyle yaparız
@Test
public void verifyGetByNameReturnsEntryForSuccessfulRequestUsingStepVerifier() {
    InventoryEntry expectedEntry = new InventoryEntry(42, "...", 5);

    mockBackEnd.enqueue(assembleResponse(expectedEntry));

    StepVerifier
            .create(cut.getByName(expectedEntry.name()))
            .assertNext(entry -> assertThat(entry).isEqualTo(expectedEntry))
            .verifyComplete();
}
expectNext metodu
Örnek
Şöyle yaparız
@Test
public void testFluxStream() {
  Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);

  StepVerifier.create(flux)
    .expectNext(1)
    .expectNext(2)
    .expectNext(3)
    .expectNext(4)
    .expectNext(5)
    .verifyComplete();
}

26 Aralık 2022 Pazartesi

SpringWebFlux DefaultPartHttpMessageReader Sınıfı

Giriş
multipart/form-data cevapları okuyabilmemizi sağlar. Açıklaması şöyle
It reads the "multipart/form-data" requests to a stream of Parts
content metodu
Örnek
Şöyle yaparız. Burada block() metodu kullanıldığı için setStreaming(false) yapılıyor
final var partReader = new DefaultPartHttpMessageReader();
partReader.setStreaming(false); 

WebClient webClient = WebClient.builder().build();
ResponseEntity<Flux<Part>> request = webClient
  .get()
  .uri("...")
  .accept(MediaType.MULTIPART_MIXED)
  .retrieve()
  .toEntityFlux((inputMessage, context) ->
    partReader
      .read(ResolvableType.forType(byte[].class), inputMessage, Map.of()))
  .block();

byte[] image = null;
    
List<Part> parts = request.getBody().collectList().block();

for (Part part : parts) {
  // access individual parts here
  System.out.println(part.headers());
  if (part.headers().get("Content-Type").get(0).equals("image/jpeg")) {
    image = DataBufferUtils.join(part.content())
      .map(dataBuffer -> {
        byte[] bytes = new byte[dataBuffer.readableByteCount()];
        dataBuffer.read(bytes);
        DataBufferUtils.release(dataBuffer);
        return bytes;
      }).block();
  }
}

return image;
Açıklaması şöyle
part.headers() will give you the headers and part.content() will give you the actual content in DataBuffer format.

We iterate the list of parts, and determine which one is an image by peeking into the headers. Once we get that, we use the DataBufferUtils , a helper class to convert the buffer into byte array format.

And that’s it, extracting images/files from multipart data is now a walk in the park thanks to Spring Flux !

SpringWebFlux WebClient Flux Dönüşümü

1. bodyToFlux metodu
Örnek
Şöyle yaparız
WebClient webClient = WebClient.create();
webClient.get()
    .uri("http://example.com/stream")
    .accept(MediaType.TEXT_EVENT_STREAM) // for Server-Sent Events (SSE)
    .retrieve()
    .bodyToFlux(String.class) // convert the response body to a Flux
    .subscribe(data -> System.out.println("Received: " + data));

2. toEntityFlux metodu
Örnek
Şöyle yaparız
WebClient webClient = WebClient.builder().build();
ResponseEntity<Flux<Part>> request = webClient
  .get()
  .uri("...")
  .accept(MediaType.MULTIPART_MIXED)
  .retrieve()
  .toEntityFlux((inputMessage, context) ->
    partReader
      .read(ResolvableType.forType(byte[].class), inputMessage, Map.of()))
  .block();
Örnek
Şöyle yaparız.
@GetMapping(path = "/streaming", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<Something> streamSomething() {
  return WebClient.create()
    .get().uri("http://example.org/resource")
    .retrieve().bodyToFlux(Something.class)
    .delaySubscription(Duration.ofSeconds(5))
    .repeat();
}
Örnek
Şöyle yaparız.
WebClient webClient = WebClient.builder().baseUrl(baseUrl).build();

webClient.post().uri(uri)
  .contentType(MediaType.APPLICATION_JSON_UTF8)
  .accept(MediaType.APPLICATION_JSON_UTF8)
  .header(HttpHeaders.AUTHORIZATION, "Basic " + Base64Utils
  .encodeToString((plainCreds)
  .getBytes(Charset.defaultCharset())))
  .body(BodyInserters.fromObject(body)).retrieve()
  .bodyToFlux(EmployeeInfo.class)
  .doOnError(throwable -> {
    ...
  }).subscribe(new Consumer<EmployeeInfo>() {
    @Override
    public void accept(EmployeeInfo employeeInfo) {
      ...
    }
}); 
3. exchange + flatMap Yöntemi
Örnek
Elimizde şöyle bir kod olsun. Bu kod hem Mono hem de Flux dönebiliyor. Hem get() hem de put() işlemi için exchange() çağrısı yapıyor. Kod bir  spring bean. İskeleti şöyle
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;

@Component
public class WebClientHelper {

  private WebClient webClient;

  public WebClientHelper() {
    webClient = WebClient.create();
  }
}
Flux dönen metodlar şöyle. Burada uri() metodu önemli. Gönderilecek uri'ye verilecek parametreler burada belirtiliyor.
public <T> Flux<T> performGetToFlux(URI uri, MultiValueMap<String, String> params,
Class<? extends T> clazzResponse){
  return webClient.get()
        .uri(uriBuilder -> uriBuilder
            .scheme(uri.getScheme())
            .host(uri.getHost())
            .port(uri.getPort())
            .path(uri.getPath())
            .queryParams(params)
            .build()
        )
        .exchange()
        .flatMapMany(clientResponse -> clientResponse.bodyToFlux(clazzResponse));
}
Bu kodu kullanmak için şöyle yaparız
@RestController
@RequestMapping("/employeeClient")
public class EmployeeClientController {

  @Value("${employee.server.host}")
  private String employeeHost;

  @Autowired
  private WebClientHelper webClientHelper;

  @GetMapping
  public Flux<EmployeeModel> getEmployeeList(){
    return webClientHelper.performGetToFlux(URI.create(...), null, EmployeeModel.class);
  }
}