SSE로 웹 푸시 알림 개발!

문진영·2022년 9월 10일
4

파이널 프로젝트

목록 보기
7/9

SSE란?

Server Sent Event의 약자로 양방향 통신인 WebSocket과 다르게 서버에서 클라이언트로 일방향 통신입니다.

왜 SSE인가?

알림 특성상 서버에서 Notification을 보내기 때문에 SSE를 사용하는 것이 효율적이라 생각합니다.


참고로 SSE를 처음 다뤄 구글에 있는 여러 자료들을 참고하였고 개발 경험도 적은 초보의 정보이므로 참고하여 개발하시면 좋을것 같습니다!

NotificationController.java

@RestController
@RequiredArgsConstructor
@RequestMapping("/test")
@Log4j2
public class NotificationController {

    private final NotificationService notificationService;

    @GetMapping(value = "/subscribe/{email}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter subscribe(@PathVariable String email, @RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "") String lastEventId){
        return notificationService.subscribe(email, lastEventId);
    }
}

로그인한 유저를 알림 서비스에 등록하는 subscribe 과정입니다.

  • 파라미터에 있는 Last-Event-Id는 서버와의 연결 오류로 인해 유실된 데이터를 다시 전송하기 위하여 클라이언트에게 마지막으로 전송한 event id 값을 보내주는 것입니다.
  • required = false : Last-Event_Id는 항상 담겨있는 것이 아니기에 설정해준다.
  • defaultValue = "" : 값이 담겨있지 않은 경우 isEmpty로 구분하기위해 빈 데이터 설정
  • 유저 정보를 받아오는 방법은 크게 두 가지가 있습니다.
    1.url을 통해서 유저 정보 받기
    2.토큰을 통해서 유저 정보 추출하기

Notification.java

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Notification {

    private String receiver; //알림을 받는 유저의 정보
    
    private String notificationType; //알림 종류별 분류를 위한
    
    private String content; //알람의 내용
    
    private String url; //해당 알림 클릭시 이동할 mapping url
    
    private Boolean isRead; //알림 열람에 대한 여부
}

Domain입니다 주석을 참고 하시면 되겠습니다.

NotificationRepository.java

public interface EmitterRepository {

    SseEmitter save(String emitterId, SseEmitter sseEmitter); //Emitter 저장
    
    void saveEventCache(String eventCacheId, Object event); //이벤트 저장
    
    Map<String, SseEmitter> findAllEmitterStartWithByEmail(String email); //해당 회원과  관련된 모든 Emitter를 찾는다
    
    Map<String, SseEmitter> findAllEmitterStartWithByEmailInList(List emails); //List 에서 해당 회원과  관련된 모든 Emitter를 찾는다(미 개발)
    
    Map<String, Object> findAllEventCacheStartWithByEmail(String email); //해당 회원과관련된 모든 이벤트를 찾는다
    
    void deleteById(String id); //Emitter를 지운다
    
    void deleteAllEmitterStartWithId(String email); //해당 회원과 관련된 모든 Emitter를 지운다
    
    void deleteAllEventCacheStartWithId(String email); //해당 회원과 관련된 모든 이벤트를 지운다
}

DB에 저장하지않아 서버가 꺼지면 모든 데이터는 날라갑니다.
자세한 메서드 구성은 아래에 있습니다

NotificationRepositoryImpl.java

@Repository
@NoArgsConstructor
@Log4j2
public class EmitterRepositoryImpl implements EmitterRepository {

    private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
    private final Map<String, Object> eventCache = new ConcurrentHashMap<>();

    @Override
    public SseEmitter save(String emitterId, SseEmitter sseEmitter) {
        emitters.put(emitterId, sseEmitter);
        log.info(emitters);
        return sseEmitter;
    }

    @Override
    public void saveEventCache(String eventCacheId, Object event) {
        eventCache.put(eventCacheId, event);
    }

    @Override
    public Map<String, SseEmitter> findAllEmitterStartWithByEmail(String email) {
        return emitters.entrySet().stream() //여러개의 Emitter가 존재할 수 있기떄문에 stream 사용
                .filter(entry -> entry.getKey().startsWith(email))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    @Override
    public Map<String, SseEmitter> findAllEmitterStartWithByEmailInList(List emails) {
        return null;
    }

    @Override
    public Map<String, Object> findAllEventCacheStartWithByEmail(String email) {
        return emitters.entrySet().stream()
                .filter(entry -> entry.getKey().startsWith(email))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    @Override
    public void deleteById(String id) {
        emitters.remove(id);
    }

    @Override
    public void deleteAllEmitterStartWithId(String email) {
        emitters.forEach((key, emitter) -> {
            if (key.startsWith(email)){
                emitters.remove(key);
            }
        });
    }

    @Override
    public void deleteAllEventCacheStartWithId(String email) {
        emitters.forEach((key, emitter) -> {
            if (key.startsWith(email)){
                emitters.remove(key);
            }
        });
    }
}

NotificationService.java

@Service
@RequiredArgsConstructor
@Log4j2
public class NotificationService {

    private final EmitterRepositoryImpl emitterRepository;

    public SseEmitter subscribe(String email, String lastEventId) {

        String emitterId = makeTimeIncludeId(email);

        SseEmitter emitter;

      //글쓴이가 버그 방지용으로 만든 코드입니다.
        if (emitterRepository.findAllEmitterStartWithByEmail(email) != null){
            emitterRepository.deleteAllEmitterStartWithId(email);
            emitter = emitterRepository.save(emitterId, new SseEmitter(Long.MAX_VALUE)); //id가 key, SseEmitter가 value
        }
        else {
            emitter = emitterRepository.save(emitterId, new SseEmitter(Long.MAX_VALUE)); //id가 key, SseEmitter가 value
        }

       //오류 종류별 구독 취소 처리
        emitter.onCompletion(() -> emitterRepository.deleteById(emitterId)); //네트워크 오류
        emitter.onTimeout(() -> emitterRepository.deleteById(emitterId)); //시간 초과
        emitter.onError((e) -> emitterRepository.deleteById(emitterId)); //오류

        // 503 에러를 방지하기 위한 더미 이벤트 전송
        String eventId = makeTimeIncludeId(email);
        sendNotification(emitter, eventId, emitterId, "EventStream Created. [userId=" + email + "]");

        // 클라이언트가 미수신한 Event 목록이 존재할 경우 전송하여 Event 유실을 예방
        if (hasLostData(lastEventId)) {
            sendLostData(lastEventId, email, emitterId, emitter);
        }

        return emitter;
    }
    
  //단순 알림 전송
    private void sendNotification(SseEmitter emitter, String eventId, String emitterId, Object data) {

        try {
            emitter.send(SseEmitter.event()
                    .id(eventId)
                    .name("sse")
                    .data(data, MediaType.APPLICATION_JSON));
        } catch (IOException exception) {
            emitterRepository.deleteById(emitterId);
            emitter.completeWithError(exception);
        }
    }

    private String makeTimeIncludeId(String email) { return email + "_" + System.currentTimeMillis(); }//Last-Event-ID의 값을 이용하여 유실된 데이터를 찾는데 필요한 시점을 파악하기 위한 형태
  
  //Last-Event-Id의 존재 여부 boolean 값
    private boolean hasLostData(String lastEventId) {
        return !lastEventId.isEmpty();
    }
    
  //유실된 데이터 다시 전송
    private void sendLostData(String lastEventId, String email, String emitterId, SseEmitter emitter) {

        Map<String, Object> eventCaches = emitterRepository.findAllEventCacheStartWithByEmail(String.valueOf(email));
        eventCaches.entrySet().stream()
                .filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
                .forEach(entry -> sendNotification(emitter, entry.getKey(), emitterId, entry.getValue()));
    }
    
//    sse연결 요청 응답
/*-----------------------------------------------------------------------------------------------------------------------------------*/
//    서버에서 클라이언트로 일방적인 데이터 보내기

  //1ㄷ1로 특정 유저에게 알림 전송
    public void send(String receiver, String content, String type, String urlValue) {

        Notification notification = createNotification(receiver, content, type, urlValue);

        // 로그인 한 유저의 SseEmitter 모두 가져오기
        Map<String, SseEmitter> sseEmitters = emitterRepository.findAllEmitterStartWithByEmail(receiver);

        sseEmitters.forEach(
                (key, emitter) -> {
                    // 데이터 캐시 저장(유실된 데이터 처리하기 위함)
                    emitterRepository.saveEventCache(key, notification);
                    // 데이터 전송
                    sendToClient(emitter, key, notification);
                }
        );
    }
  //1ㄷ1로 List에 존재하는 특정 유저에게 알림 전송
    public void sendList(List receiverList, String content, String type, String urlValue) {

        List<Notification> notifications = new ArrayList<>();

        Map<String, SseEmitter> sseEmitters;

        for (int i = 0; i < receiverList.size(); i++) {

            int finalI = i;

            sseEmitters = new HashMap<>();

            notifications.add(createNotification(receiverList.get(i).toString(), content, type, urlValue));

            sseEmitters.putAll(emitterRepository.findAllEmitterStartWithByEmail(receiverList.get(i).toString()));

            sseEmitters.forEach(
                    (key, emitter) -> {
                        // 데이터 캐시 저장(유실된 데이터 처리하기 위함)
                        emitterRepository.saveEventCache(key, notifications.get(finalI));
                        // 데이터 전송
                        sendToClient(emitter, key, notifications.get(finalI));
                    }
            );
        }
    }

  //타입별 알림 생성
    private Notification createNotification(String receiver, String content, String type, String urlValue) {

        if (type.equals("chat")){
            return Notification.builder()
                    .receiver(receiver)
                    .content(content)
                    .url("/chat/sender/room/" + urlValue)
                    .notificationType(type)
                    .isRead(false)
                    .build();
        }

        else if (type.equals("survey")) {
            return Notification.builder()
                    .content(content)
                    .url("/quotation/" + urlValue)
                    .notificationType(type)
                    .isRead(false)
                    .build();
        }

        else if (type.equals("quotation")) {
            return Notification.builder()
                    .receiver(receiver)
                    .content(content)
                    .url("/matchedgosulist/" + urlValue)
                    .notificationType(type)
                    .isRead(false)
                    .build();
        }

        else {
            return null;
        }
    }

  //알림 전송
    private void sendToClient(SseEmitter emitter, String id, Object data) {

        try {
            emitter.send(SseEmitter.event()
                    .id(id)
                    .name("sse")
                    .data(data, MediaType.APPLICATION_JSON)
                    .reconnectTime(0));

            emitter.complete();

            emitterRepository.deleteById(id);

        } catch (Exception exception) {
            emitterRepository.deleteById(id);
            emitter.completeWithError(exception);
        }
    }
}

-오류와 수정-

emitter.send()시 브라우저로 spring server에 연결 시 작동하지만
react와 연동 했을 때 데이터를 받지 못하는 오류가 발견되었습니다...

  1. 해결 방법을 알아보는 중에 emitter.complete()를 하면 react로 데이터가 전송되었습니다.
    하지만 complete()는 모든 이벤트가 전송되었는지 확인한 다음 연결을 종료하기 때문에
    다시 한번 subscribe mapping을 보내줘야 하게 됩니다.
  1. 그렇게 되면 굉장히 번거롭기 때문에 방법을 찾던 도중 event().reconnectTime()을 발견하게 되고 거기에 값을 0으로 적용했더니 react에서 연결 끊김과 동시에 바로 재연결이 되었고 구동은 할 수 있게 되었습니다.

아직 SSE는 처음 접하고 실력이 부족해서 방금과 같은 오류를 완전히 해결하지 못하였네요...
혹시 더 괜찮은 방법이 있으시면 피드백 부탁드리겠습니다.

profile
개발 하는 게 좋은 사람입니다.

0개의 댓글