Şu satırı dahil ederiz
import reactor.core.publisher.Sink;
Sanırım bir Sink asenkron programlama ile Reactor arasındaki köprüyü sağlıyor. Açıklaması şöyle
Sink allows to programmatically push reactive streams signals.
Örnek
Elimizde şöyle bir kod olsun
public Mono<Void> index(T doc) {
  IndexRequest req = indexRequest(doc);
  return Mono.create(sink -> 
    client.indexAsync(req,RequestOptions.DEFAULT,new IndexActionListener<>(sink)));
}
Burada Elasticsearch HighLevelRestClient kullanılıyor. Bu kütüphane hep listener'lar ile çalışır. Şöyle yaparız
@RequiredArgsConstructor
public class IndexActionListener<T> implements ActionListener<IndexResponse> {
  private final MonoSink<T> sink;
  @Override
  public void onResponse(IndexResponse res) {
    if (res.status().getStatus() < 400) {
      sink.success();
      return;
    }
    sink.error(new RuntimeException(res.toString()));
  }
  @Override
  public void onFailure(Exception e) {
    sink.error(e);
  }
}
 
Hiç yorum yok:
Yorum Gönder