저번 달에 1차 코드 컴플리트 2 스터디를 마친 기념으로, 1년 반 동안 미뤄뒀던 코드 리팩토링을 해보자.
프로젝트가 끝난 직후에도 많이 신경 쓰이긴 했던 부분으로, 특정 글을 북마크하거나 인용하면 해당 글의 저자에게 실시간 알림을 보내는 서비스다. SSE와 카프카를 사용해 구현했다.
SSE는 HTTP 기반, 서버에서 클라이언트로의 단방향 이벤트 스트리밍 기술이다. 다음의 이유로 SSE를 선택했다.
- 폴링은 부담스럽다.
- 웹소켓으로 지속적으로 양방향으로 데이터를 주고받을 필요가 없다.
- 간단히 페이지 변경 시 요청/응답을 주고 받는 식으로 구현할 수도 있지만, 맛이가 없다.
SSE를 사용할 때의 가장 큰 장점은 클라이언트에서 서버와의 연결이 끊어지면 자동 재연결 시도를 한다는 점이다.
SSE를 사용하는 경우, 브라우저는 일정 시간 이상 메시지가 오지 않으면 자동으로 재연결을 시도한다. 이때 서버가 살아있다고 답이 오면 연결을 유지하고, 서버가 죽었으면 지수적으로 간격을 늘리면서 재연결을 시도한다.
반대로 가장 큰 단점은 서버에서는 클라이언트의 상태를 모니터링하기 힘들다는 것이다. 주기적으로 하트비트를 보내주는 방식으로 가능하기는 하겠지만, 그렇게 하지 않으려고 SSE를 쓰는 거라 비용이 좀 크다.
또 하나 큰 단점이 있다면 SSE를 사용하면서 OSIV를 켜면 DB 커넥션 풀 고갈 문제가 발생할 수도 있다는 점이다.
SSE를 쓰면 OSIV는 꼭 끄도록 하자.
서버에서 클라이언트에게 메시지를 보내려면 클라이언트의 (SSE)연결 요청 후 해당 클라이언트에게 메시지를 보내기 위한 SseEmitter
객체를 만들어야 한다.
여러 서버가 있고, 어떤 사용자가 북마크/인용을 했다고 치자. 서버가 여러 대 있으므로, 북마크/인용 요청이 들어간 서버에는 해당 글 저자의 SseEmitter
객체가 있을 수도 있고, 없을 수도 있다.
따라서 북마크/인용 이벤트가 일어나면, 해당 이벤트가 일어났음을 다른 서버에도 알릴 필요가 있다. 사실 이 기능만을 위해서라면 Redis Pub/Sub으로도 충분하다. 하지만 문제는 앞서의 이벤트가 발생하면 다음의 두 작업을 마쳐야 하면서 발생한다.
이때 DB저장의 우선 순위는 알림 전송보다 낮다. 물론 DB 저장도 빠트리면 안되겠지만, 알림이 가능하다면 최대한 빠르게 실시간으로 처리하고, DB는 조금 늦어도 상관없다.
DB 저장과 SSE 알림 전송을 한 메서드로, 예를 들어 다음과 같이 묶는다고 해보자.
@대충_어떤_이벤트_리스너
void func(메시지 m) throws 뭐시기예외{
DB저장(m);//여기서 실패하면? DB에 넣을 수 있을 때까지 알림 발송 불가
SSE전송(m);//여기서 실패하면? DB에는 들어갔는데 알림은 불가능
}
DB에 문제가 발생하면, 문제가 해결할 때까지 알림 서비스도 멈춘다. 반대의 경우라면?
@대충_어떤_이벤트_리스너
void func2(메시지 m) throws 뭐시기예외{
SSE전송(m);//여기서 실패하면? 그냥 망함
DB저장(m);//여기서 실패하면? 알림은 가지만, DB에는 저장되지 않음
}
Redis Pub/Sub을 쓰면 메시지를 가져오는 순간 저장소에서 제거하므로 DB에는 저장할 수 없다. 물론 예외가 발생하면 같은 메시지를 다시 넣는 방법도 가능하겠지만, 그럼 중복 알림을 보내게 될 수도 있고, 무엇보다 브로드캐스트 방식이라 DB에 두 번 들어갈 수도 있다(중복 체크 로직이 추가돼야 함).
이때 이벤트 리스너를 다음과 같이 둘로 분리해주면 SSE 전송 실패와 DB 저장 실패가 서로 영향을 주지 않게 된다.
@대충_전송용_리스너
void func3(메시지 m) throws 전송실패예외{
SSE전송(m);
}
@대충_저장용_리스너
void func4(메시지 m) throws 저장실패예외{
DB저장(m);
}
카프카의 경우 한 토픽에 대한 컨슈머 그룹을 두 개 만들면, 각 컨슈머 그룹에서, 딱 한 명의 컨슈머가 이벤트를 가져갈 수 있다. 물론 자동으로 ack
를 보내면 각 컨슈머가 가져가는 순간 오프셋이 옮겨가므로 ack
는 성공하는 경우 수동으로 보내야 한다.
@대충_전송용_리스너
void func3(메시지 m) throws 전송실패예외{
SSE전송(m);
ACK();
}
@대충_저장용_리스너
void func4(메시지 m) throws 저장실패예외{
DB저장(m);
ACK();
}
그런데 서버1에는 원하는 SseEmitter
가 있고 서버2에는 없다고 하자. 메시지를 서버2가 가져가면 처리에 실패하고 ACK()
를 호출하지 않는다. 서버2에 에미터가 있기는 한데 연결이 끊어진 상태였다면? 처리해줘야 한다.
@대충_전송용_리스너
void func3(메시지 m) throws 전송실패예외{
if (에미터있음) {
if (타임아웃아님) {
try {
SSE전송(m);
ACK();
}
catch (전송실패예외) {
//아마도 일시적인 네트워크 오류일걸?
}
}
else {//연결 끊어진 상태
에미터삭제
ACK();
}
}
}
@대충_저장용_리스너
void func4(메시지 m) throws 저장실패예외{
DB저장(m);
ACK();
}
카프카에서는 기본적으로 파티션-컨슈머를 1:1 매핑하므로, 2개의 파티션, 2개의 컨슈머(서버1, 2)인 경우 각 파티션은 각 서버의 컨슈머에 매핑되고, 메시지 이관이 자동으로 일어나지는 않는다. 따라서 전송에 실패하고 메시지를 소화하지 못하는 경우, 영원히 소화 못하게 될 수도 있다...
SseEmitter
의 경우 타임아웃 여부를 확인할 수 있는 플래그 같은 게 따로 없기 때문에 별도의 래퍼 클래스를 만들어줘야 하긴 할 것 같다.
빠르고 좋기는 한데, 사실 과한 선택이다.
비교는 해봐야 하겠지만, 어느 정도는 durable하면서도 가벼운 MQ가 있다면 그걸 사용할 수도 있을 것 같다.
알림 서비스의 인터페이스는 다음과 같은데
public interface NotificationService {
NotificationListDto getNotificationList(Member member);
List<Notification> getNotifications(Member member);
void readNotification(Member member, String notificationId) throws InconsistentNotificationOwnerException, AlreadyRemovedNotificationException;
Notification getVerified(Member member, String notificationId) throws InconsistentNotificationOwnerException, AlreadyRemovedNotificationException;
void delete(Member member, String notificationId) throws AlreadyRemovedNotificationException, InconsistentNotificationOwnerException;
SseEmitter subscribe(Member member) throws Exception;
void sendMessage(Long id, Object event) throws Exception;
void consumeKafkaEvent(NotificationKafkaDto notificationKafkaDto, Acknowledgment ack) throws Exception;
void saveKafkaEventIntoRDB(NotificationKafkaDto notification, Acknowledgment ack) throws Exception;
}
사실 다음과 같이 세 종류로 나눌 수 있다
그럼 대충 NotificationRdbService
, SseManagerService
, NotificationKafkaConsumer
로 별도의 인터페이스를 나누고, NotificationService
를 이 세 인터페이스를 상속하는 인터페이스로 만드는 게 낫지 않을까?
public interface NotificationService {
/**
RDB에 저장된 알림 정보 관련
*/
NotificationListDto getNotificationList(Member member);
List<Notification> getNotifications(Member member);
void readNotification(Member member, String notificationId) throws InconsistentNotificationOwnerException, AlreadyRemovedNotificationException;
void delete(Member member, String notificationId) throws AlreadyRemovedNotificationException, InconsistentNotificationOwnerException;
Notification getVerified(Member member, String notificationId) throws InconsistentNotificationOwnerException, AlreadyRemovedNotificationException;
/**
SSE 연결/메시지 발송
*/
SseEmitter subscribe(Member member) throws Exception;
void sendMessage(Long id, Object event) throws Exception;
/**
카프카 이벤트 소비
*/
void consumeKafkaEvent(NotificationKafkaDto notificationKafkaDto, Acknowledgment ack) throws Exception;
void saveKafkaEventIntoRDB(NotificationKafkaDto notification, Acknowledgment ack) throws Exception;
}
그런데 이렇게 나누면 전부 다 세부적인 구현에, 구체적으로 어떤 기술을 사용할 것인지에 의존하게 되는 느낌이다. 만약 SSE가 아니라 다른 걸로 바꾼다면? 카프카가 아닌 다른 MQ로 바꾼다면?
구현보다, 각 인터페이스 어떤 기능을 하는지를 생각해보자.
그럼 인터페이스를 나누면 대충 이렇게 되지 않을까
public interface NotificationRdbService {
/**
RDB에 저장된 알림 정보 관련
*/
}
public interface NotificationPushService {
/**
클라이언트와의 연결/메시지 발송
*/
}
public interface NotificationMessageConsumer {
/**
메시지 소비
*/
}
public interface NotificationService extends NotificationRdbService, NotificationPushService, NotificationMessageConsumer {
}
구현도 물론 각각 해줘야 할 텐데,
@Service
public class NotificationRdbServiceImpl implements NotificationRdbService {
//...
}
@Service
public class SseService implements NotificationPushService{
//...
}
@Service
public class NotificationKafkaConsumer implements NotificationMessageConsumer {
//...
}
각각 사용할 기술과 구현에 따라 인터페이스를 구현하게 하고
@Service
@RequiredArgsConstructor
public class NotificationFacade implements NotificationService {
private final NotificationRdbService rdbService;
private final NotificationPushService pushService;
private final NotificationMessageConsumer messageConsumer;
@Override
public NotificationListDto getNotificationList(Member member) {
return rdbService.getNotificationList(member);
}
//...
}
세부 로직은 내부 서비스에 위임하게 하면 깔끔한 거 같다.
북마크 서비스 구현을 보면 아래와 같이 북마크 추가 시 알림을 보내게 했다.
public class BookmarkServiceImpl implements BookmarkService {
//...
@Override
public void addBookmark(Member member, AddBookmarkRequestDto requestDto) throws UserNotFoundException, NoteNotFoundException,BookmarkAlreadyExistsException{
Long noteId = requestDto.getNoteId();
Note note = noteRepository.findNoteById(noteId).orElseThrow(NoteNotFoundException::new);
Member memberPersist = memberRepository.findById(member.getId()).orElseThrow(UserNotFoundException::new);
List<Bookmark> bookmarkList = bookmarkRepository.findAllByMember(memberPersist);
Bookmark checkExists = bookmarkList.stream().filter(bookmark -> bookmark.getNote().getId().equals(noteId)).findAny().orElse(null);
if (checkExists != null) throw new BookmarkAlreadyExistsException();
Bookmark bookmark = Bookmark.builder()
.member(memberPersist)
.note(note)
.build();
bookmark = bookmarkRepository.save(bookmark);
memberPersist.getBookmarks().add(bookmark);
sendBookmarkNotification(memberPersist, note);
}
//...
private void sendBookmarkNotification(Member sender, Note note){
//대충 카프카 메시지 발송
}
//...
}
고쳐야 할 부분들이 너무 많다. 일단은
void addBookmark()
의 파라미터로 들어오는 Member member
는 스프링 시큐리티의 인증 객체에 들어가 있는 객체다. 액세스 토큰에서 추출한 이메일로 DB를 조회해 얻은 사용자 엔티티이며, detached
상태에 있다.em.merge()
로 충분히 영속성 컨텍스트에 넣을 수 있다. 내부적으로는 ID 비교를 하겠지만.같은 문제들이 보이지만 제쳐두자. 한 가지 문제점이 분명하다.
북마크 추가 메서드는 자기 책임 이상의 기능을 처리하고 있다.
분리가 필요하다. 어떻게 해야할까?
일단은 고민 거리로 남겨두고, 마지막으로 다음을 보자.
private void sendBookmarkNotification(Member sender, Note note){
if (note.getAuthor().getId().equals(sender.getId())) return;
String id = sender.getId() + "_" + note.getAuthor().getId() + "_" + UUID.randomUUID();
Notification notification = Notification.builder()
.id(id)
.type(NotificationType.BOOKMARK)
.receiver(note.getAuthor())
.sender(sender)
.note(note)
.title(note.getTitle())
.isRead(false)
.build();
kafkaTemplate.send("sse", NotificationKafkaDto.toDto(notification));
}
흠...
RDB에 저장할 알림 객체를 생성하고, 카프카로 보내기 위한 DTO로 변환해 프로듀싱하는 부분이다.
여기서 문제는
1. Notification
엔티티의 필드를 빌더 패턴으로 다 집어 넣고 있는데,
Member, Note
를 생성자 파라미터로 넣는 게 훨씬 깔끔해 보인다.id
필드를 굳이 만들어 줄 필요도 없었는데, 왜 저렇게 했을까? 알 수 없다.Notification notification = new Notification(NotificationType.BOOKMARK, sender, note);
kafkaTemplate.send()
로 이벤트를 보내는 방식 대신, 여기도 따로 인터페이스를 만들고 구현체를 따로 주입하는 편이 나아보인다.