Şu satırı dahil ederizz
import reactor.core.publishe.Flux;
Cold publisher yaratır. Harici kaynaklardan asenkron olarak gelen veriyi bir FluxSink'e yazarak tüketilmesini sağlar. Açıklaması şöyle. Flux.create() multi-threaded şekilde çalışır. Eğer harici kaynak tek ise yani tek bir thread tarafından veri dolduruluyorsa Flux.push() kullanılabilir.
- Accepts a Consumer<FluxSink<T>>- Consumer is invoked only once per subscriber- Consumer can emit 0..N elements immediately- Publisher is not aware of downstream state. So we need to provide Overflow strategy as an additional parameter- We can get the reference of FluxSink using which we could keep on emitting elements as and when required using multiple threads.
Bu metod tarafından yaratılan FluxSink, tüketen taraf yavaş kalırsa bile umursamaz. Açıklaması şöyle
The downstream will determine how many elements (elements == next signals) it wants and if he can't keep up, those elements which are already emitted will be removed/buffered in some strategy (by default they will be buffered until the downstream will ask for more).
create metodu - FluxSink
Bu metoda geçilen lamda parametre olarak bir tane FluxSink nesnesi alır. Bu nesnenin next() metodu çağrılarak doldurma işlemi gerçekleştirilir.
Örnek
Şöyle yaparızz
Flux<String> articlesFlux = Flux.create((FluxSink<String> sink) -> {
/* get all the latest article from a server and emit them one by one to downstream. */
List<String> articals = getArticalsFromServer();
articals.forEach(sink::next);
});
Örnek
Açıklaması şöyle
Flux.create produces a SerializedSink that is safe to use from multiple threads for next calls
Şöyle yaparız. Burada FluxSink nesnesine hemen bir şey yazılmıyor. Daha sonra ara ara yazılıyor
Map<String, FluxSink<Item>> sinks = new ConcurrentHashMap<>();
// Store a new sink for the given ID
public void start(String id) {
Flux.create(sink -> sinks.put(id, sink));
}
// Called from different threads
public void publish(String id, Item item) {
sinks.get(id).next(item); //<----------- Is this safe??
}
Örnek
Elimizde şöyle bir kod olsun. Burada FluxSink nesnesi en baştan hemen dolduruluyor. Her abone veriyi en baştan tekrar alır
Flux<Integer> integerFlux = Flux.create((FluxSink<Integer> fluxSink) -> {IntStream.range(0, 5).peek(i -> System.out.println("going to emit - " + i)).forEach(fluxSink::next);});//First observer. takes 1 ms to process each elementintegerFlux.delayElements(Duration.ofMillis(1)).subscribe(
i -> System.out.println("First :: " + i));//Second observer. takes 2 ms to process each elementintegerFlux.delayElements(Duration.ofMillis(2)).subscribe(
i -> System.out.println("Second:: " + i));
Çıktı olarak şunu alırız
going to emit - 0going to emit - 1going to emit - 2going to emit - 3going to emit - 4going to emit - 0going to emit - 1going to emit - 2going to emit - 3going to emit - 4First :: 0Second:: 0First :: 1First :: 2Second:: 1First :: 3First :: 4Second:: 2Second:: 3Second:: 4
Açıklaması şöyle
What we can understand from the above output is,- Each observer gets its own FluxSink instance which is expected as we create a Cold publisher.- Create method does not wait for the observer to process the elements. It emits the elements even before observers start processing the elements.
create metodu - FluxSink + OverFlowStrategy
Açıklaması şöyle
BUFFER : Buffer all signals if the downstream can't keep upDROP : Drop the incoming signal if the downstream is not ready to receive itERROR : Signal an IllegalStateException when the downstream can't keep upIGNORE : Completely ignore the downstream backpressure requestLATEST : Downstream will get only the lates signals from upstream
Örnek
Şöyle yaparız. Burada BUFFER, DROP, ERROR, IGNORE, LATEST seçenekleri kullanılabilir.
Flux<Integer> integerFlux = Flux.create((FluxSink<Integer> fluxSink) -> {IntStream.range(0, 5).peek(i -> System.out.println("going to emit - " + i)).forEach(fluxSink::next);}, FluxSink.OverflowStrategy.DROP);
Hiç yorum yok:
Yorum Gönder