15 Şubat 2021 Pazartesi

Flux.generate metodu

Giriş
Açıklaması şöyle
- Accepts a Consumer<SynchronousSink<T>>
- Consumer is invoked again and again based on the downstream demand
- Consumer can emit only one element at the max with an optional complete/error signal.
- Publisher produces elements based on the downstream demand
- We can get the reference of SynchronousSink. But it might not be really useful as we could emit only one element
generate metodu - SynchronousSink
Örnek
Şöyle yaparız
AtomicInteger atomicInteger = new AtomicInteger();

//Flux generate sequence
Flux<Integer> integerFlux = Flux.generate((SynchronousSink<Integer> synchronousSink) -> {
   System.out.println("Flux generate");
   synchronousSink.next(atomicInteger.getAndIncrement());
});

//observer
integerFlux.delayElements(Duration.ofMillis(50))
        .subscribe(i -> System.out.println("First consumed ::" + i));
Açıklaması şöyle. SynchronousSink önce belli bir sayıda nesne ile doldurulur, daha sonra tüketen taraf istedikçe yeni nesneler eklenir.
Generate method emits elements based on the demand from the downstream. It generates 32 elements first and buffers it. As and when downstream starts processing elements and when the buffer size drops the below threshold, it emits few more elements. This process is repeated again and again. At one point, if the observer stops processing elements, Flux.generate will also stop emitting elements. So this generate method is aware of downstream observers processing speed.
Örnek - Birden Fazla Nesne Yazmaya Çalışmak
Bu yapılamaz. Elimizde şöyle bir kod olsun
AtomicInteger atomicInteger = new AtomicInteger();
Flux<Integer> integerFlux = Flux.generate((SynchronousSink<Integer> synchronousSink) -> {
   System.out.println("Flux generate");
    synchronousSink.next(atomicInteger.getAndIncrement());
    synchronousSink.next(atomicInteger.getAndIncrement());
});
Çıktı olarak şu hatayı alırız
Flux generate
[ERROR] (main) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: More than one call to onNext
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: More than one call to onNext
Caused by: java.lang.IllegalStateException: More than one call to onNext

Hiç yorum yok:

Yorum Gönder