15 Şubat 2021 Pazartesi

Flux.create metodu - Harici Kaynaklardan Gelen Asenkron Veri İçindir

Giriş
Şu satırı dahil ederizz
import reactor.core.publishe.Flux;
Cold publisher yaratır.  Harici kaynaklardan asenkron olarak gelen veriyi bir FluxSink'e yazarak tüketilmesini sağlar. Açıklaması şöyle. Flux.create() multi-threaded şekilde çalışır. Eğer harici kaynak tek ise yani  tek bir thread tarafından veri dolduruluyorsa Flux.push() kullanılabilir.
- Accepts a Consumer<FluxSink<T>>
- Consumer is invoked only once per subscriber
- Consumer can emit 0..N elements immediately
- Publisher is not aware of downstream state. So we need to provide Overflow strategy as an additional parameter
- We can get the reference of FluxSink using which we could keep on emitting elements as and when required using multiple threads.
Bu metod tarafından yaratılan FluxSink, tüketen taraf yavaş kalırsa bile umursamaz.  Açıklaması şöyle
The downstream will determine how many elements (elements == next signals) it wants and if he can't keep up, those elements which are already emitted will be removed/buffered in some strategy (by default they will be buffered until the downstream will ask for more).

create metodu - FluxSink
Bu metoda geçilen lamda parametre olarak bir tane FluxSink nesnesi alır. Bu nesnenin next() metodu çağrılarak doldurma işlemi gerçekleştirilir.

Örnek
Şöyle yaparızz
Flux<String> articlesFlux = Flux.create((FluxSink<String> sink) -> {
  /* get all the latest article from a server and emit them one by one to downstream. */
  List<String> articals = getArticalsFromServer();
  articals.forEach(sink::next);
});
Örnek
Açıklaması şöyle
Flux.create produces a SerializedSink that is safe to use from multiple threads for next calls
Şöyle yaparız. Burada FluxSink nesnesine hemen bir şey yazılmıyor. Daha sonra ara ara yazılıyor
Map<String, FluxSink<Item>> sinks = new ConcurrentHashMap<>();

// Store a new sink for the given ID
public void start(String id) {
  Flux.create(sink -> sinks.put(id, sink));
}

// Called from different threads
public void publish(String id, Item item) {
  sinks.get(id).next(item); //<----------- Is this safe??
}
Örnek
Elimizde  şöyle bir kod olsun. Burada FluxSink nesnesi en baştan hemen dolduruluyor. Her abone veriyi en baştan tekrar alır
Flux<Integer> integerFlux = Flux.create((FluxSink<Integer> fluxSink) -> {
    IntStream.range(0, 5)
            .peek(i -> System.out.println("going to emit - " + i))
            .forEach(fluxSink::next);
});

//First observer. takes 1 ms to process each element
integerFlux.delayElements(Duration.ofMillis(1)).subscribe(
i -> System.out.println("First :: " + i));

//Second observer. takes 2 ms to process each element
integerFlux.delayElements(Duration.ofMillis(2)).subscribe(
i -> System.out.println("Second:: " + i));
Çıktı olarak şunu alırız
going to emit - 0
going to emit - 1
going to emit - 2
going to emit - 3
going to emit - 4
going to emit - 0
going to emit - 1
going to emit - 2
going to emit - 3
going to emit - 4
First :: 0
Second:: 0
First :: 1
First :: 2
Second:: 1
First :: 3
First :: 4
Second:: 2
Second:: 3
Second:: 4
Açıklaması şöyle
What we can understand from the above output is,
- Each observer gets its own FluxSink instance which is expected as we create a Cold publisher.
- Create method does not wait for the observer to process the elements. It emits the elements even before observers start processing the elements.
create metodu - FluxSink + OverFlowStrategy
Açıklaması şöyle
BUFFER : Buffer all signals if the downstream can't keep up
DROP : Drop the incoming signal if the downstream is not ready to receive it
ERROR : Signal an IllegalStateException when the downstream can't keep up
IGNORE : Completely ignore the downstream backpressure request
LATEST : Downstream will get only the lates signals from upstream
Örnek
Şöyle yaparız. Burada BUFFER, DROP, ERROR, IGNORE, LATEST seçenekleri kullanılabilir.
Flux<Integer> integerFlux = Flux.create((FluxSink<Integer> fluxSink) -> {
            IntStream.range(0, 5)
                    .peek(i -> System.out.println("going to emit - " + i))
                    .forEach(fluxSink::next);
        }, FluxSink.OverflowStrategy.DROP);

Hiç yorum yok:

Yorum Gönder