21 Temmuz 2020 Salı

SpringMVC SseEmitter Sınıfı - Server Sent Events İçindir

Giriş
Şu satırı dahil ederiz
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
Not : Server Sent Events Kullanımı (SSE) yazısına bakılabilir.

Kullanım
1. @GetMapping kullanan bir metod yaz ve bu metod yeni bir SseEmitter dönsün.
2. Broadcast için tüm SseEmitter nesnelerinin veya unicast için tek bir SseEmitter nesnesinin
send(foo, MediaType.APPLICATION_JSON);
metodunu çağır. 
3. Eğer SseEmitter için zaman aşımı (timeout) vermek istersek public SseEmitter(Long timeout) çağrısı kullanılır


1. SseEmitter Nedir?
Açıklaması şöyle. Sever Sent Events gönderme yöntemlerinden bir tanesidir
However, the full power of SSE comes with the SseEmitter class. It allows for asynchronous processing and publishing of the events from other threads. What is more, it is possible to store the reference to SseEmitter and retrieve it on subsequent client calls. This provides a huge potential for building powerful notification scenarios.
Açıklaması şöyleSseEmitter sınıfı ResponseBodyEmitter sınıfından kalıtır. Kalıtımın açıklaması şöyle
In Spring we have sent server-events with two emitter class types. Infact one is the parent class and the other is a specialized case of the first one. Following are the two types:

- ResponseBodyEmitter
- SseEmitter
Sunucuya bağlanan her bir istemci için bir SseEmitter nesnesi yaratılır. SseEmitter sadece text mesaj gönderebilir. Text mesaj JSON olabilir.
When a client sends a request to an SSE endpoint, the server returns an Emitter object (in terms of Java). This leaves an open HTTP session, during which the server can send messages to the client. Once the connection is no longer needed, the server can drop the object, and it will be acknowledged by the client.
Protocol 
Önce istemci şöyle bir istekte bulunur
GET /sse HTTP/1.1
Accept: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
Sunucu karşılık verir. Sunucu text göndereceğini ve göndereceği verinin büyüklüğünü bilmediği için chunked olacağını söyler.
HTTP/1.1 200
Content-Type: text/event-stream;charset=UTF-8 Transfer-Encoding: chunked
Her event birbirinden 2 tane newline karakteri ile ayrılır. İki tane event şöyle gönderilir
data: The first event.
data: The second event.
Bir event çok satıra sahip olabilir. Bu durumda her satır arasında 1 tane newline karakteri bulunur. Çok satırlı bir event şöyle gönderilir.
data: The third data: event.
Her event'e bir ID verilebilir. ID içeren iki tane event göndermek için şöyle yaparız
id: 1 data: The first event. id: 2 data: The second event.
Her event'e bir TYPE verilebilir. TYPE içeren iki tane event göndermek için şöyle yaparız
event: type1 data: An event of type1. event: type2 data: An event of type2. data: An event without any type.
2. Client SseEmitter Gönderilerini Nasıl Alır?
Örnek
Şöyle yaparız
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>SSE example</title> </head> <body> <ul id="stockChanges"></ul> <script type="application/javascript"> function add(message) { const li = document.createElement("li"); li.innerHTML = message; document.getElementById("stockChanges").appendChild(li); } const eventSource = new EventSource("/stocks-stream"); //4.1 eventSource.onmessage = e => { const response = JSON.parse(e.data); add('Stock price was changed, new price: ' + response.price + ' $'); //4.2 } eventSource.onopen = e => add('Connection opened'); eventSource.onerror = e => add('Connection closed'); </script> </body> </html>

3. Kullanım
Örnek
Abone olmak için şöyle bir kod olsun
@Slf4j
@RestController
@RequestMapping("/events")
@RequiredArgsConstructor
public class EventController {
  public static final String MEMBER_ID_HEADER = "MemberId";
  private final EmitterService emitterService;
  private final NotificationService notificationService;

  @GetMapping
  public SseEmitter subscribeToEvents(@RequestHeader(name = MEMBER_ID_HEADER) String
memberId) {
    log.debug("Subscribing member with id {}", memberId);
    return emitterService.createEmitter(memberId);
  }
 
  @PostMapping
  @ResponseStatus(HttpStatus.ACCEPTED)
  public void publishEvent(@RequestHeader(name = MEMBER_ID_HEADER) String memberId,
@RequestBody EventDto event) {
    log.debug("Publishing event {} for member with id {}", event, memberId);
    notificationService.sendNotification(memberId, event);
  }
}
Bu kodun açıklaması şöyle. subscribeToEvents() çağrısı ile event'lere abone olunur.
Multiple clients connect to my backend and their SseEmitters get stored (in memory), so that they can send each other messages via another endpoint, which the server then forwards to all connected emitters.
Gönderen servis kodu şöyledir. Repository nesnesinden SseEmitter okur ve onu kullanarak gönderir
@Service
@Primary
@AllArgsConstructor
@Slf4j
public class SseNotificationService implements NotificationService {
  private final EmitterRepository emitterRepository;
  private final EventMapper eventMapper;
  
  @Override
  public void sendNotification(String memberId, EventDto event) {
    if (event == null) {
      log.debug("No server event to send to device.");
      return;
    }
    doSendNotification(memberId, event);
  }
 
  private void doSendNotification(String memberId, EventDto event) {
    emitterRepository.get(memberId).ifPresentOrElse(sseEmitter -> {
      try {
        log.debug("Sending event: {} for member: {}", event, memberId);
        sseEmitter.send(eventMapper.toSseEventBuilder(event));
      } catch (IOException | IllegalStateException e) {
        log.debug("...");
        emitterRepository.remove(memberId);
      }
    }, () -> log.debug("No emitter for member {}", memberId));
   }
}
Örnek - Broadcast
Şöyle yaparız
import org.springframework.context.event.EventListener; import org.springframework.http.MediaType; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; @RestController public class StockController { private final Set<SseEmitter> clients = new CopyOnWriteArraySet<>(); //2.1 @GetMapping("/stocks-stream") //2.2 public SseEmitter stocksStream() { SseEmitter sseEmitter = new SseEmitter(); clients.add(sseEmitter); sseEmitter.onTimeout(() -> clients.remove(sseEmitter)); sseEmitter.onError(throwable -> clients.remove(sseEmitter)); return sseEmitter; } @Async @EventListener public void stockMessageHandler(Stock stock) { //2.3 List<SseEmitter> errorEmitters = new ArrayList<>(); clients.forEach(emitter -> { try { emitter.send(stock, MediaType.APPLICATION_JSON); //2.4 } catch (Exception e) { errorEmitters.add(emitter); } }); errorEmitters.forEach(clients::remove); //2.5 } }
2.5 kısmında eğer emitter ile mesaj gönderiminde hata varsa bu emitter siliniyor ama bence bu kısım gerekli değil çünkü sseEmitter.onError() ile bu zaten yapılmıştı.

Şöyle yaparız
@Component public class StockPriceChanger { private final ApplicationEventPublisher publisher; private void changePrice() { publisher.publishEvent(new Stock(...)); } }
Örnek - Broadcast
Açıklaması şöyle
SseEmitter with multiple clients
For every client connection, you need to create a dedicated new object of SseEmitter. So, the questions arise, when we have multiple clients, then how to send the events to all clients.

- The solution is very simple, keep a track of all client connections by having a list of all SseEmitter objects. 
- For every client, when a new connection is established. You need to return a new object of SseEmitter type. Before returning from the method, add its reference to a list for later use, Otherwise you will not be able to track this newly created SseEmitter object.
- When the server has an update to send as an event, then process the list and send the event to every emitter object for every client. 
- If any SseEmitter object fails to send an event, that means the client connection is closed. So remove that SseEmitter object from the list.
Şöyle yaparız
@Controller
public class MySSEController {
  private List<SseEmitter> sseEmitters = new ArrayList<>();

  private int counter = 0;

  @RequestMapping("/ssestream")
  public SseEmitter getRealTimeMessageAction() throws IOException {
    SseEmitter sseEmitter = new SseEmitter(100000l);
    sseEmitter.send("MessageCounter : " + counter);
    sseEmitters.add(sseEmitter);
    return sseEmitter;
  }

  @Scheduled(fixedDelay = 2 * 1000)
  public void scheduledMsgEmitter() {
    counter++;
    List<SseEmitter> toRemove = new ArrayList<>();

    for (SseEmitter sseEmitter : sseEmitters) {
      try {
        sseEmitter.send("MessageCounter : " + counter);
      } catch (IOException ignore) {
        toRemove.add(sseEmitter);
      }
    }

    for (SseEmitter remove : toRemove) {
      sseEmitters.remove(remove);
    }
  }
}
Örnek
Şöyle yaparız. Burada @GetMapping ile text stream gönderileceği açıktan belirtilmiş. Normalde gerek yok.
@RestController
public class SseWebMvcController

  private SseEmitter emitter;

  @GetMapping(path="/sse", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
  SseEmitter createConnection() {
    emitter = new SseEmitter();
    return emitter;
  }

  // in another thread
  void sendEvents() {
    try {
      emitter.send("Alpha");
      emitter.send("Omega");

      emitter.complete();
    } catch(Exception e) {
      emitter.completeWithError(e);
    }
  }
}
4. Metodlar
constructor - Long timeout
Örnek
Şöyle yaparız. Burada SseEmitter 24 saat boyunca açık kalır
@PostMapping("...") public SseEmitter updateAndEmailUsers(...) { SseEmitter emitter = new SseEmitter(1000*60*60*24L); ... return emitter; }
send metodu - String
Örnek
Şöyle yaparız
@GetMapping("/emitter")
public SseEmitter eventEmitter(@RequestParam String userId) {
  SseEmitter emitter = new SseEmitter(12000); //the timeout and it is optional  

  //create a single thread for sending messages asynchronously
  ExecutorService executor = Executors.newSingleThreadExecutor();
  executor.execute(() -> {
    try {
      for (int i = 0; i < 4; i++) {
        emitter.send("message" + i);          
      }      
    } catch(Exception e) {
      emitter.completeWithError(e);      
    } finally {
      emitter.complete();      
    }  
  });

  executor.shutdown();
  return emitter;
}
send metodu - SseEmitter.SseEventBuilder
Örnek
Şöyle yaparız
@GetMapping("/sse-emitter")
public SseEmitter sseEmitter() {
  SseEmitter emitter = new SseEmitter();
  Executors.newSingleThreadExecutor().execute(() -> {
    try {
      for (int i = 0; true; i++) {
        SseEmitter.SseEventBuilder event = SseEmitter.event()
                       .id(String.valueOf(i))
                       .name("SSE_EMITTER_EVENT")
                       .data("SSE EMITTER - " + LocalTime.now().toString());
        emitter.send(event);
        Thread.sleep(1000);
      }
    } catch (Exception ex) {
      emitter.completeWithError(ex);
    }
  });
  return emitter;
}
Örnek
Şöyle yaparız
SseEmitter emitter = new SseEmitter();

SseEmitter.SseEventBuilder sseEventBuilder = SseEmitter.event()
  .id("0") // You can give nay string as id
  .name("customEventName")
  .data("message1")
  .reconnectTime(10000); //reconnect time in millis

emitter.send(sseEventBuilder);
Açıklaması şöyle
By default, the browser will wait for 3s before trying to establish the connection again and the browser will keep on retrying till it gets 200 status from the server. The server can change this default 3s wait time by sending this ‘retry’ flag. Here, in the above example server instructs the browser to wait for 10s (10000 milliseconds) before retrying connection in case of error by sending this “retry” flag. The server can send this flag value 0 as well, which will tell the browser to reconnect immediately if the connection is closed.

Hiç yorum yok:

Yorum Gönder