Şu satırı dahil ederiz
import reactor.core.publisher.Sink;
Sanırım bir Sink asenkron programlama ile Reactor arasındaki köprüyü sağlıyor. Açıklaması şöyle
Sink allows to programmatically push reactive streams signals.
Örnek
Elimizde şöyle bir kod olsun
public Mono<Void> index(T doc) {
IndexRequest req = indexRequest(doc);
return Mono.create(sink ->
client.indexAsync(req,RequestOptions.DEFAULT,new IndexActionListener<>(sink)));
}
Burada Elasticsearch HighLevelRestClient kullanılıyor. Bu kütüphane hep listener'lar ile çalışır. Şöyle yaparız
@RequiredArgsConstructor
public class IndexActionListener<T> implements ActionListener<IndexResponse> {
private final MonoSink<T> sink;
@Override
public void onResponse(IndexResponse res) {
if (res.status().getStatus() < 400) {
sink.success();
return;
}
sink.error(new RuntimeException(res.toString()));
}
@Override
public void onFailure(Exception e) {
sink.error(e);
}
}
Hiç yorum yok:
Yorum Gönder