İş bittikten sonra farklı bir sonuç dönmek içindir
Örnek
Şöyle yaparız
@Override
public Mono<String> deleteStudent(String id) {
return studentRepository.deleteById(id)
.thenReturn("Student deleted successfully!");
}
@Override
public Mono<String> deleteStudent(String id) {
return studentRepository.deleteById(id)
.thenReturn("Student deleted successfully!");
}
import org.springframework.boot.web.embedded.netty.NettyServerCustomizer;
Reactor-Netty doesn’t have a default idle connection timeout.
@Configurationpublic class NettyServerCustomizerConfig {@Beanpublic NettyServerCustomizer nettyServerCustomizer() {return httpServer -> httpServer.idleTimeout(Duration.ofMillis(1));}}
server: netty: idle-timeout: 1000 # 1 second of idle-timeout
import reactor.core.publisher.Sink;
Sink allows to programmatically push reactive streams signals.
public Mono<Void> index(T doc) { IndexRequest req = indexRequest(doc); return Mono.create(sink -> client.indexAsync(req,RequestOptions.DEFAULT,new IndexActionListener<>(sink))); }
@RequiredArgsConstructor public class IndexActionListener<T> implements ActionListener<IndexResponse> { private final MonoSink<T> sink; @Override public void onResponse(IndexResponse res) { if (res.status().getStatus() < 400) { sink.success(); return; } sink.error(new RuntimeException(res.toString())); } @Override public void onFailure(Exception e) { sink.error(e); } }
webClient.get().uri("/endpoint").retrieve().bodyToMono(String.class).timeout(Duration.ofSeconds(10));
this.retryBackoffSpec = Retry.backoff(maxAttempts, Duration.ofSeconds(backoffSeconds)).doBeforeRetry(retrySignal -> log.debug("Waiting {} seconds. Retry #{} of {} after exception: {}", backoffSeconds, (retrySignal.totalRetriesInARow()+1), maxAttempts, retrySignal.failure().getLocalizedMessage() )) .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> retrySignal.failure()); this.webClient.post().uri(uri) .bodyValue(emailText) .retrieve() … .bodyToMono(CtfdUserResponse.class) .retryWhen(retryBackoffSpec) .block();
WebClient webClient = WebClient.builder().baseUrl("http://example.com").build();Mono<String> response = webClient.get().uri("/retry-endpoint").retrieve().bodyToMono(String.class)// number of retries and backoff configuration.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))// maximum backoff time.maxBackoff(Duration.ofSeconds(10)))// fallback if retries all fail.onErrorResume(e -> Mono.just("Fallback response"));response.subscribe(result -> System.out.println(result));
Predicate<Throwable> exceptionFilter() { return throwable -> throwable instanceof RuntimeException || (throwable instanceof WebClientResponseException && (throwable.getStatusCode() == HttpStatus.GATEWAY_TIMEOUT || throwable.getStatusCode() == HttpStatus.SERVICE_UNAVAILABLE || throwable.getStatusCode() == HttpStatus.BAD_GATEWAY)); } Retry retry = Retry.backoff(3, Duration.ofSeconds(2)) .jitter(0.7) .filter(exceptionFilter()) .onRetryExhaustedThrow((retrySpec, retrySignal) -> { log.error("Service at {} failed to respond, after max attempts of: {}", uri, retrySignal.totalRetries()); return retrySignal.failure(); });
RestTemplate restTemplate = new RestTemplate();ResponseEntity<String> response = restTemplate.getForEntity("http://example.com", String.class);WebClient webClient = WebClient.create();Mono<String> response = webClient.get().uri("http://example.com").retrieve().bodyToMono(String.class);response.subscribe(result -> System.out.println(result));
RestTemplate’s error handling occurs through the ErrorHandler interface, which requires a separate block of code. WebClient streamlines this with more fluent handling.
WebClient webClient = WebClient.create(); webClient.get() .uri("http://example.com/some-error-endpoint") .retrieve() .onStatus(HttpStatus::isError, response -> { // Handle error status codes return Mono.error(new CustomException("Custom error occurred.")); }) .bodyToMono(String.class);
The onStatus() method allows for handling specific HTTP statuses directly within the chain of operations, providing a more readable and maintainable approach.
RestTemplate restTemplate = new RestTemplate(); HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_JSON); HttpEntity<String> request = new HttpEntity<>("{\"key\":\"value\"}", headers); ResponseEntity<String> response = restTemplate .postForEntity("http://example.com", request, String.class); WebClient webClient = WebClient.create(); Mono<String> response = webClient.post() .uri("http://example.com") .contentType(MediaType.APPLICATION_JSON) .bodyValue("{\"key\":\"value\"}") .retrieve() .bodyToMono(String.class);
void flatMapVsConcatMap() throws InterruptedException {Observable.just(5, 2, 4, 1).flatMap(second -> Observable.just("Emit delayed with " + second + " second").delay(second, TimeUnit.SECONDS)).subscribe(System.out::println, Throwable::printStackTrace );Thread.sleep(15_000);}
Emit delayed with 1 secondEmit delayed with 2 secondEmit delayed with 4 secondEmit delayed with 5 second
flatMap tries to start as many possible.
void flatMapVsConcatMap() throws InterruptedException { Observable.just(5, 2, 4, 1) .concatMap( second -> Observable.just("Emit delayed with " + second + " second") .delay(second, TimeUnit.SECONDS) ) subscribe(System.out::println,Throwable::printStackTrace); Thread.sleep(15_000); }
Emit delayed with 5 secondEmit delayed with 2 secondEmit delayed with 4 secondEmit delayed with 1 second
// Simulate a data source emitting a continuous stream of data at a high rate Flux<Integer> dataSource = Flux.range(1, Integer.MAX_VALUE); // Process the data with back pressure using limitRate operator dataSource .limitRate(10) // Control the number of elements emitted per request (back pressure) .doOnNext(data -> { // Simulate processing delay try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Processed: " + data); }) .subscribe();
In this example, the Flux.range(1, Integer.MAX_VALUE) simulates a continuous stream of data. The limitRate(10) operator specifies that the data processing should be limited to 10 elements per request, effectively controlling the back pressure.As a result, you’ll notice that the data processing rate is limited, and the doOnNext method will print "Processed: <data>" with a slight delay between each processed element
Flux<Integer> dataFlux = Flux.fromIterable(List.of()).switchIfEmpty(Mono.just(0)).collectList();
Reactive Spring provides WebTestClient to write down integration tests for API endpoints.
@Autowiredprivate WebTestClient webTestClient; @Test void find_all_patients(){ long patientCount = patientRepository.findAll().count().block(); webTestClient.get().uri("/v1/patients") .exchange().expectStatus().isEqualTo(HttpStatus.ACCEPTED) .expectBody() .jsonPath("$").isArray() .jsonPath("$.size()").isEqualTo(patientCount); }
@Test public void testGetAllUsers() { webTestClient.get().uri("/users") .exchange() .expectStatus().isOk() .expectBodyList(User.class); } @Test public void testGetUserById() { webTestClient.get().uri("/users/{id}", "user-id") .exchange() .expectStatus().isOk() .expectBody(User.class); } @Test public void testCreateUser() { User newUser = new User("new-user-id", "New User"); webTestClient.post().uri("/users") .bodyValue(newUser) .exchange() .expectStatus().isCreated() .expectBody(User.class) .isEqualTo(newUser); }
@Test public void testGetUserById_NotFound() { webTestClient.get().uri("/users/nonexistent-id") .exchange() .expectStatus().isNotFound(); }
When testing reactive components, ensure you cover error scenarios using StepVerifier to verify the behavior of your reactive streams in response to different types of errors.
import org.junit.jupiter.api.Test;import reactor.core.publisher.Flux;import reactor.test.StepVerifier;public class ReactiveStreamTest {@Testpublic void testFlux() {Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);StepVerifier.create(numbers).expectNext(1, 2, 3, 4, 5).verifyComplete();}@Testpublic void testTransformations() {Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);StepVerifier.create(numbers.filter(number -> number % 2 == 0).map(evenNumber -> evenNumber * 2)).expectNext(4, 8).verifyComplete();}@Testpublic void testWithError() {Flux<Integer> numbers = Flux.just(1, 2, 3).concatWith(Flux.error(new RuntimeException("Oops! An error occurred.")));StepVerifier.create(numbers).expectNext(1, 2, 3).expectError(RuntimeException.class).verify();}}
Project Reactor provides several operators to manage errors effectively within reactive streams:1. onErrorResume and onErrorReturn: These operators allow you to provide fallback values or alternative streams in case of an error. This can help prevent the entire stream from failing and provide a more graceful degradation.2. doOnError: This operator lets you execute specific actions when an error occurs, such as logging the error or cleaning up resources. It doesn't interfere with the error propagation itself.3. retry and retryWhen: These operators enable you to automatically retry an operation a specified number of times or based on a certain condition. This can be helpful for transient errors.
public Mono<User> getUserById(String id) { return userRepository.findById(id) .onErrorResume(throwable -> { log.error("Error occurred while fetching user by id: {}", id, throwable); return Mono.just(new User("default", "Default User")); }); }
webClient.get() .uri("/endpoint") .retrieve() .bodyToMono(String.class) .doOnError(e -> log.error("Error occurred", e)) .onErrorResume(e -> Mono.just("Fallback value"));
The zip operator combines elements from two or more reactive streams into pairs, tuples, or other custom objects. It's useful when you need to process elements from multiple streams together.
Flux<Integer> numbers = Flux.just(1, 2, 3); Flux<String> letters = Flux.just("A", "B", "C"); Flux<String> combined = Flux.zip(numbers, letters, (number, letter) -> number + letter);
Flux<String> endpoint1 = webClient.get() .uri("/endpoint1").retrieve().bodyToMono(String.class); Flux<String> endpoint2 = webClient.get() .uri("/endpoint2").retrieve().bodyToMono(String.class); Flux<String> result = Flux.zip( endpoint1, endpoint2, (res1, res2) -> res1 + " " + res2);
Flux<String> stringFlux = Flux.just("Hello", "World", "from", "IntelliJ IDEA").map(s -> {if (s.equals("World")) throw new RuntimeException("An error occurred");else return s.toUpperCase();});stringFlux.onErrorReturn("Error occurred in the stream.").subscribe(System.out::println);
HELLO Error occurred in the stream.
Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5) .concatWith(Flux.error(new RuntimeException("Oops! An error occurred."))) .map(number -> 10 / (number - 3)) // This will cause an ArithmeticException .doOnError(throwable -> System.err.println("Error occurred: " + throwable.getMessage())) .onErrorReturn(-1); // Provide a fallback value in case of an error numbers.subscribe( value -> System.out.println("Received: " + value), error -> System.err.println("Subscriber error: " + error.getMessage()) );
A hot publisher in Spring WebFlux is a publisher that emits data to all subscribers as soon as it is available. This is in contrast to a cold publisher, which emits data to subscribers only when they subscribe.
With Hot Publishers, there will be only one data producer. All Subscribers listen to the data produced by the single data producer. The data is shared.Imagine a TV station. It does not matter if there is no one to watch the program. It will be emitted regardless. Watchers can start watching anytime they want. But all watchers get the same info at any given moment.Watchers would lose the content if they joined late. The same is with the Hot Publishers.
Flux<String> netFlux = Flux.fromStream(ReactiveJavaTutorial::getVideo) .delayElements(Duration.ofSeconds(2)) .share(); // turn the cold publisher into a hot publisher
Yani publisher, Backpressure kavramını desteklemez, aboneyi beklemez ve veri üretmeye devam eder.However, it is important to note that not all publishers respect backpressure. .... These are often termed as 'hot' publishers. Dealing with these publishers requires specific strategies, such as buffering, dropping, or sampling data.
Flux.range(1, 100).onBackpressureBuffer(10).subscribe(new BaseSubscriber<Integer>() {@Overrideprotected void hookOnSubscribe(Subscription subscription) {request(10);}@Overrideprotected void hookOnNext(Integer value) {// process the valueSystem.out.println(value);if (value % 10 == 0) {request(10);}}});
In this code, the onBackpressureBuffer operator is used to create a buffer that holds up to 10 items. If the buffer is full, the application will throw a BufferOverflowException.
Basically, there are two general directions for writing tests for the reactive chain:1. Directly call the methods to test and receive the outcome by invoking block() on the resulting Mono or Flux.2. Wrap these calls with the StepVerifier class provided by the Reactor framework.
@Testpublic void verifyGetByNameReturnsEntryForSuccessfulRequestUsingStepVerifier() {InventoryEntry expectedEntry = new InventoryEntry(42, "...", 5);mockBackEnd.enqueue(assembleResponse(expectedEntry));StepVerifier.create(cut.getByName(expectedEntry.name())).assertNext(entry -> assertThat(entry).isEqualTo(expectedEntry)).verifyComplete();}
@Test public void testFluxStream() { Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5); StepVerifier.create(flux) .expectNext(1) .expectNext(2) .expectNext(3) .expectNext(4) .expectNext(5) .verifyComplete(); }
It reads the "multipart/form-data" requests to a stream of Parts
final var partReader = new DefaultPartHttpMessageReader();partReader.setStreaming(false);WebClient webClient = WebClient.builder().build();ResponseEntity<Flux<Part>> request = webClient.get().uri("...").accept(MediaType.MULTIPART_MIXED).retrieve().toEntityFlux((inputMessage, context) ->partReader.read(ResolvableType.forType(byte[].class), inputMessage, Map.of())).block();byte[] image = null;List<Part> parts = request.getBody().collectList().block();for (Part part : parts) {// access individual parts hereSystem.out.println(part.headers());if (part.headers().get("Content-Type").get(0).equals("image/jpeg")) {image = DataBufferUtils.join(part.content()).map(dataBuffer -> {byte[] bytes = new byte[dataBuffer.readableByteCount()];dataBuffer.read(bytes);DataBufferUtils.release(dataBuffer);return bytes;}).block();}}return image;
part.headers() will give you the headers and part.content() will give you the actual content in DataBuffer format.
We iterate the list of parts, and determine which one is an image by peeking into the headers. Once we get that, we use the DataBufferUtils , a helper class to convert the buffer into byte array format.
And that’s it, extracting images/files from multipart data is now a walk in the park thanks to Spring Flux !
WebClient webClient = WebClient.create(); webClient.get() .uri("http://example.com/stream") .accept(MediaType.TEXT_EVENT_STREAM) // for Server-Sent Events (SSE) .retrieve() .bodyToFlux(String.class) // convert the response body to a Flux .subscribe(data -> System.out.println("Received: " + data));
WebClient webClient = WebClient.builder().build();ResponseEntity<Flux<Part>> request = webClient.get().uri("...").accept(MediaType.MULTIPART_MIXED).retrieve().toEntityFlux((inputMessage, context) ->partReader.read(ResolvableType.forType(byte[].class), inputMessage, Map.of())).block();
@GetMapping(path = "/streaming", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<Something> streamSomething() {
return WebClient.create()
.get().uri("http://example.org/resource")
.retrieve().bodyToFlux(Something.class)
.delaySubscription(Duration.ofSeconds(5))
.repeat();
}
WebClient webClient = WebClient.builder().baseUrl(baseUrl).build();
webClient.post().uri(uri)
.contentType(MediaType.APPLICATION_JSON_UTF8)
.accept(MediaType.APPLICATION_JSON_UTF8)
.header(HttpHeaders.AUTHORIZATION, "Basic " + Base64Utils
.encodeToString((plainCreds)
.getBytes(Charset.defaultCharset())))
.body(BodyInserters.fromObject(body)).retrieve()
.bodyToFlux(EmployeeInfo.class)
.doOnError(throwable -> {
...
}).subscribe(new Consumer<EmployeeInfo>() {
@Override
public void accept(EmployeeInfo employeeInfo) {
...
}
});
import org.springframework.web.reactive.function.BodyInserters;import org.springframework.web.reactive.function.client.WebClient;@Componentpublic class WebClientHelper {private WebClient webClient;public WebClientHelper() {webClient = WebClient.create();}}
public <T> Flux<T> performGetToFlux(URI uri, MultiValueMap<String, String> params,
Class<? extends T> clazzResponse){return webClient.get().uri(uriBuilder -> uriBuilder.scheme(uri.getScheme()).host(uri.getHost()).port(uri.getPort()).path(uri.getPath()).queryParams(params).build()).exchange().flatMapMany(clientResponse -> clientResponse.bodyToFlux(clazzResponse));}
@RestController@RequestMapping("/employeeClient")public class EmployeeClientController {@Value("${employee.server.host}")private String employeeHost;@Autowiredprivate WebClientHelper webClientHelper;@GetMappingpublic Flux<EmployeeModel> getEmployeeList(){return webClientHelper.performGetToFlux(URI.create(...), null, EmployeeModel.class);}}