Şu satırı dahil ederiz
import reactor.core.publisher.Sink;
Sanırım bir Sink asenkron programlama ile Reactor arasındaki köprüyü sağlıyor. Açıklaması şöyle
Sink allows to programmatically push reactive streams signals.
Örnek
Elimizde şöyle bir kod olsun
public Mono<Void> index(T doc) { IndexRequest req = indexRequest(doc); return Mono.create(sink -> client.indexAsync(req,RequestOptions.DEFAULT,new IndexActionListener<>(sink))); }
Burada Elasticsearch HighLevelRestClient kullanılıyor. Bu kütüphane hep listener'lar ile çalışır. Şöyle yaparız
@RequiredArgsConstructor public class IndexActionListener<T> implements ActionListener<IndexResponse> { private final MonoSink<T> sink; @Override public void onResponse(IndexResponse res) { if (res.status().getStatus() < 400) { sink.success(); return; } sink.error(new RuntimeException(res.toString())); } @Override public void onFailure(Exception e) { sink.error(e); } }
Hiç yorum yok:
Yorum Gönder