Redis와 SSE를 이용한 실시간 알림 구현

Ssol·2024년 6월 25일
0
post-thumbnail

최근 개발한 요구사항 중 다음과 같은 기능을 구현해야 했다.

일반 유저만 글 작성이 가능한 특정 비밀 게시판에서 글 작성자가 본인 글에 댓글을 달 경우, 해당 유저와 관련된 모든 관리자 롤에게 댓글 등록 알림이 가야 한다. 반대로, 관리자가 댓글을 달 경우 글 작성자인 일반 유저에게 댓글 등록 알림이 가야 한다.
또한, 로그인하지 않은 동안 달린 댓글 알림을 확인할 수 있도록 현재 날짜로부터 3일 전까지의 댓글 목록을 조회하고, 읽음/안읽음 표시도 할 수 있어야 한다.

이 요구사항을 받자마자 어떻게 구현하면 좋을까 고민을 해보았다.

요구사항 분석

  1. 일반 유저가 댓글을 달 때: 글 작성자 본인이 댓글을 달 경우 관련된 모든 관리자 롤에게 알림을 보내야 한다.
  2. 관리자가 댓글을 달 때: 글 작성자인 일반 유저에게 알림을 보내야 한다.
  3. 로그인하지 않은 동안의 알림: 현재 날짜로부터 3일 전까지의 댓글 알림 목록을 조회하고, 읽음/안읽음 상태를 관리해야 한다.

기술 스택 선택

실시간 알림을 구현하기 위해 SSE(Server-Sent Events)를 사용하기로 했다. 이유는 다음과 같다:

•	실시간으로 서버에서 클라이언트로 데이터를 푸시할 수 있다.
•	WebSocket보다 구현이 간단하고 HTTP 프로토콜을 사용하기 때문에 방화벽 및 프록시와의 호환성이 좋다.

그리고 일반적으로 실시간 알림을 구현할 때는 카프카(Kafka)와 같은 메시지 큐를 사용하는 경우가 많다. 하지만 이번 프로젝트에서는 Redis Pub/Sub을 선택했다. 그 이유는 다음과 같다:

1.	간편한 설정과 사용:
•	Redis는 설정이 간편하고 사용하기 쉬워서 빠르게 개발을 시작할 수 있다.
•	Pub/Sub 기능을 활용하면 별도의 메시지 브로커를 설정하지 않아도 된다.
2.	성능:
•	Redis는 메모리 기반의 저장소로, 매우 빠른 성능을 제공한다.
•	Pub/Sub 메시징 패턴은 높은 처리량과 낮은 지연 시간을 필요로 하는 실시간 알림에 적합하다.
3.	단순성:
•	복잡한 메시지 처리 로직이 필요 없는 경우, Redis Pub/Sub은 간단한 구조로 메시지를 주고받을 수 있다.
•	메시지 큐를 별도로 관리하지 않고도 쉽게 실시간 알림 시스템을 구축할 수 있다.

하지만, Redis Pub/Sub을 선택함으로써 몇 가지 단점도 존재한다

1.	내구성 부족:
•	Redis Pub/Sub은 메시지를 메모리에 저장하기 때문에 서버가 재시작되거나 장애가 발생하면 메시지가 손실될 수 있다.
•	반면 카프카는 메시지를 디스크에 저장하여 내구성을 제공한다.
2.	스케일링 이슈:
•	Redis는 단일 스레드 모델이기 때문에 높은 부하 상황에서 성능이 저하될 수 있다.
•	카프카는 분산 시스템으로 설계되어 있어, 고가용성과 수평적 확장이 용이하다.
3.	복잡한 메시지 처리:
•	Redis Pub/Sub은 단순한 메시지 브로커로, 메시지 재처리, 순서 보장 등의 기능이 부족하다.
•	카프카는 복잡한 메시지 처리, 순서 보장, 메시지 재처리 등을 지원한다.

현재 프로젝트에서는 이러한 단점들이 큰 문제가 되지 않기 때문에 Redis Pub/Sub 선택에도 한몫했다. 메시지 손실이 발생하더라도 서비스에 치명적인 영향을 미치지 않으며, 시스템의 부하가 상대적으로 낮기 때문에 Redis의 단일 스레드 모델로도 충분히 감당할 수 있다. 또한, 복잡한 메시지 처리 로직이 필요하지 않아서 Redis의 간단한 구조가 오히려 개발과 유지보수에 유리했다.

Redis와 SSE를 이용한 알림 구현

Redis Config

Redis Pub/Sub을 이용한 실시간 알림을 구현하기 위해 다음과 같이 Redis 설정을 진행했다.

@ConditionalOnProperty(name = "spring.redis.enabled", havingValue = "true", matchIfMissing = true)
@Configuration
public class RedisConfig {

    @Bean
    public RedisMessageListenerContainer redisContainer(
            RedisConnectionFactory connectionFactory,
            RedisSubscriber redisSubscriber
    ) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener((message, pattern) ->
                        redisSubscriber.onMessage(new String(message.getChannel()), new String(message.getBody())),
                new PatternTopic("commentNotification")
        );
        return container;
    }

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(connectionFactory);
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        return template;
    }
}
  • RedisMessageListenerContainer: Redis 메시지 리스너 컨테이너를 설정하여 특정 채널의 메시지를 구독하고, 메시지가 수신되면 RedisSubscriber의 onMessage 메서드를 호출하도록 구성했다.
  • RedisTemplate: Redis와의 상호작용을 위한 템플릿을 설정했다. 키는 StringRedisSerializer를 사용하고, 값은 GenericJackson2JsonRedisSerializer를 사용하여 직렬화 및 역직렬화한다.

Redis publisher 서비스

실시간 알림 기능을 위해 메시지를 퍼블리시하고, 알림 데이터를 Redis에 저장하는 RedisPublisher 클래스를 구현했다.

@Slf4j
@Service
public class RedisPublisher {

    private final RedisTemplate<String, Object> redisTemplate;
    private final ObjectMapper objectMapper;

    public RedisPublisher(RedisTemplate<String, Object> redisTemplate, ObjectMapper objectMapper) {
        this.redisTemplate = redisTemplate;
        this.objectMapper = objectMapper;
    }

    public void publish(String channel, Object message) {
        log.info("Publishing message to channel: [{}] at time: {} with message: {}", channel, Instant.now(), message);
        redisTemplate.convertAndSend(channel, message);
        log.info("Published message to channel: [{}] at time: {} with message: {}", channel, Instant.now(), message);
    }

    public void saveNotificationWithTTL(String key, Notification notification, long ttl, TimeUnit timeUnit) {
        try {
            String notificationJson = objectMapper.writeValueAsString(notification);
            redisTemplate.opsForValue().set(key, notificationJson, ttl, timeUnit);
            log.debug("Saved notification with key: {} and TTL: {} {}", key, ttl, timeUnit);
        } catch (Exception e) {
            log.error("Error saving notification with key: {} and TTL: {} {}", key, ttl, timeUnit, e);
        }
    }
}
  • publish: 주어진 메시지를 특정 채널에 퍼블리시한다. Redis의 convertAndSend 메서드를 사용하여 메시지를 전송한다.
  • saveNotificationWithTTL: 알림 데이터를 JSON 형식으로 직렬화하여 Redis에 저장한다. 이 메서드는 TTL(Time-To-Live)을 설정하여 데이터가 일정 시간 후 자동으로 삭제되도록 한다.

Redis subscriber 서비스

이 클래스는 Redis로부터 메시지를 수신하고, 이를 연결된 클라이언트들에게 실시간으로 전달하는 역할을 한다.

@Slf4j
@Service
public class RedisSubscriber {

    private final RedisTemplate<String, Object> redisTemplate;
    private final ObjectMapper objectMapper;
    private final Map<String, List<SseEmitter>> emitters = new ConcurrentHashMap<>();
    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);

    public RedisSubscriber(
            RedisTemplate<String, Object> redisTemplate,
            ObjectMapper objectMapper
    ) {
        this.redisTemplate = redisTemplate;
        this.objectMapper = objectMapper;
    }

    public void onMessage(String channel, String message) {
        log.info("Received message from channel: [{}] at time: {} with message: {}", channel, Instant.now(), message);

        String cleanedMessage = message.replace("\"", "");
        log.debug("Cleaned message: {}", cleanedMessage);

        processMessage(cleanedMessage, 5); // 최대 5번 재시도
    }

    private void processMessage(String key, int retriesLeft) {
        scheduledExecutorService.submit(() -> {
            try {
                String notificationJson = null;
                for (int attempt = 0; attempt < retriesLeft; attempt++) {
                    notificationJson = (String) redisTemplate.opsForValue().get(key);
                    if (notificationJson != null) {
                        break;
                    }
                    log.debug("Retrying to get key: {}. Attempt: {}", key, attempt + 1);
                    try {
                        Thread.sleep(200); // 200ms 대기
                    } catch (InterruptedException e) {
                        log.error("InterruptedException during sleep", e);
                        Thread.currentThread().interrupt();
                        return;
                    }
                }

                if (notificationJson != null) {
                    Notification notification = objectMapper.readValue(notificationJson, Notification.class);
                    String userId = notification.getReceiverId();
                    log.debug("Parsed notification: {} for user: {}", notification, userId);

                    sendNotificationToEmitters(userId, notification);
                } else {
                    log.warn("No notification found in Redis for key: {} after maximum retries", key);
                }
            } catch (Exception e) {
                log.error("Exception during message processing for key: {}", key, e);
            }
        });
    }

    private void sendNotificationToEmitters(String userId, Notification notification) {
        List<SseEmitter> userEmitters = emitters.get(userId);
        if (userEmitters != null && !userEmitters.isEmpty()) {
            List<SseEmitter> deadEmitters = new ArrayList<>();
            for (SseEmitter emitter : userEmitters) {
                try {
                    emitter.send(SseEmitter.event()
                            .name("newComment")
                            .data(notification)
                    );
                    log.info("Sent SSE to user: {} with notification: {} at time: {}", userId, notification, Instant.now());
                } catch (IOException e) {
                    log.error("Error sending SSE to user: {} with message: {}", userId, e.getMessage());
                    deadEmitters.add(emitter);
                }
            }
            userEmitters.removeAll(deadEmitters); // dead emitters 제거
        } else {
            log.warn("No emitters found for user: {}", userId);
        }
    }

    public void addEmitter(String userId, SseEmitter emitter) {
        emitters.computeIfAbsent(userId, k -> new ArrayList<>()).add(emitter);
        log.info("Emitter added for user: {}", userId);

        emitter.onCompletion(() -> {
            removeEmitter(userId, emitter);
            log.info("Emitter completed for user: {}", userId);
        });

        emitter.onTimeout(() -> {
            removeEmitter(userId, emitter);
            log.info("Emitter timed out for user: {}", userId);
        });

        emitter.onError((Throwable t) -> {
            removeEmitter(userId, emitter);
            log.error("Emitter error for user: {} with message: {}", userId, t.getMessage());
        });
    }

    public void removeEmitter(String userId, SseEmitter emitter) {
        List<SseEmitter> userEmitters = emitters.get(userId);
        if (userEmitters != null) {
            userEmitters.remove(emitter);
            if (userEmitters.isEmpty()) {
                emitters.remove(userId);
            }
        }
        log.info("Emitter removed for user: {}", userId);
    }
}
  1. Redis 메시지 수신 (onMessage 메서드):
    • Redis 채널로부터 메시지를 수신하면 onMessage 메서드가 호출된다. 수신된 메시지는 로그에 기록되며, processMessage 메서드를 통해 처리된다.
  2. 메시지 처리 (processMessage 메서드):
    • 메시지 키를 사용하여 Redis에서 메시지 내용을 최대 5번 재시도하면서 가져온다.
    • 메시지가 정상적으로 가져와지면 이를 JSON으로 파싱하여 Notification 객체로 변환한다.
    • 수신자 ID를 기반으로 해당 유저의 모든 SseEmitter에 메시지를 전송한다.
    • 전송 중 오류가 발생한 SseEmitter는 제거된다.
  3. SSE Emitter 추가 (addEmitter 메서드):
    • 유저가 알림을 구독할 때 새로운 SseEmitter를 생성하고, 해당 유저의 Emitter 리스트에 추가한다.
    • Emitter가 완료되거나 타임아웃, 오류가 발생할 경우 자동으로 Emitter를 제거하는 콜백을 등록한다.
  4. SSE Emitter 제거 (removeEmitter 메서드):
    • 특정 유저의 Emitter 리스트에서 지정된 Emitter를 제거한다.
    • Emitter 리스트가 비어있으면 유저의 Emitter 리스트 자체를 제거한다.

여기서 각 유저가 여러 클라이언트에서 동시에 SSE 연결을 유지할 수 있도록 하기 위해, ConcurrentHashMap을 사용해 유저별로 List를 관리하도록 했다.

TransactionSynchronization 인터페이스를 구현한 추상 클래스

알림 발송을 댓글 저장 완료 후에 하더라도, API 응답이 가기 전에 알림이 발송될 수 있다. 이를 방지하고 API 응답이 클라이언트에 전송된 후에 알림이 발송되도록 하려면, 알림 발송을 비동기로 처리하되, API 응답이 완료된 후에 실행되도록 해야 한다. 이를 위해 Spring의 TransactionSynchronizationManager를 사용하여 트랜잭션이 완료된 후에 알림을 발송하도록 설정할 수 있다.

TransactionSynchronization 인터페이스를 구현하여 트랜잭션 완료 후 작업을 수행할 수 있다.

이 때 TransactionSynchronizationManager의 registerSynchronization 메서드를 사용하는 방법을 적용할 수 있는데 TransactionSynchronization 인터페이스의 모든 필수 오버라이드 메서드를 강제로 구현해야 한다.

하지만 불필요한 메서드를 구현하는 것을 피하는 방법이 있는데 바로 추상 클래스를 사용하는 방법이다.

커스텀 추상 클래스를 만들어 TransactionSynchronization를 상속받아 필수 메서드를 구현하면, 커스텀 추상 클래스를 사용하는 곳에서는 불필요한 메서드를 매번 오버라이드하지 않고도 필요한 메서드만 구현할 수 있다. 이 방법을 사용하면 코드의 가독성이 높아지고 유지 보수가 쉬워진다.

public abstract class CustomTransactionSynchronization implements TransactionSynchronization {

    @Override
    public void suspend() {
        // No implementation needed
    }

    @Override
    public void resume() {
        // No implementation needed
    }

    @Override
    public void flush() {
        // No implementation needed
    }

    @Override
    public void beforeCommit(boolean readOnly) {
        // No implementation needed
    }

    @Override
    public void beforeCompletion() {
        // No implementation needed
    }

    @Override
    public void afterCompletion(int status) {
        // No implementation needed
    }
}
  • 이 클래스는 트랜잭션의 특정 시점에 실행될 로직을 정의할 수 있도록 한다. afterCommit 메서드를 오버라이드하여 트랜잭션이 커밋된 후 실행할 작업을 정의할 수 있다.

댓글 서비스

댓글을 저장하고, 트랜잭션 완료 후 알림을 발송하는 PostCommentServiceImpl 클래스를 구현했다.

@RequiredArgsConstructor
@Transactional(readOnly = true)
@Service
public class PostCommentServiceImpl implements PostCommentService {

    private final PostRepository postRepository;
    private final PostCommentRepository postCommentRepository;
    private final NotificationService notificationService;

    @Transactional
    @Override
    public void saveComment(User user, CommentDto commentDto) {
        boolean isAdmin = Utils.isAdminRole(user.getType());
        postRepository.findPostBy(isAdmin, user.getUserId(), commentDto.getPostId())
                .orElseThrow(() -> new UserDeniedException("not have permission to view posts."));
        Long savedCommentId = postCommentRepository.savePostComment(CommentDto.toEntity(commentDto));

        // 트랜잭션 완료 후 알림 발송
        TransactionSynchronizationManager.registerSynchronization(new CustomTransactionSynchronization() {
            @Override
            public void afterCommit() {
                notificationService.publishNotification(commentDto.getPostId(), savedCommentId, user);
            }
        });
    }
    
    // 그 외 댓글 서비스 로직...
}
  • saveComment: 댓글을 저장하고, 트랜잭션이 완료된 후 알림을 발송한다. 트랜잭션이 완료되면 afterCommit 메서드를 호출하여 publishNotification 메서드를 통해 알림을 발송한다.
  • 트랜잭션이 실패하거나 롤백된 경우, 알림이 발송되지 않도록 보장할 수 있다.
  • 트랜잭션이 성공적으로 완료된 후에 알림을 비동기로 처리함으로써, API 응답 시간이 길어지는 것을 방지한다.

실시간 알림 서비스

읽지 않은 댓글을 표현하기 위해, '{수신 유저id}:{댓글id}'를 키로 하는 데이터를 Redis에 저장하였다. 유저가 댓글을 읽으면 해당 키를 삭제하여 읽음 상태로 표시한다. 3일이 지나면 Redis에서 자동으로 값이 삭제되도록 TTL을 설정했다.

@Service
public class NotificationService {

  	private final RedisPublisher redisPublisher;
    private final RedisSubscriber redisSubscriber;
    private final RedisTemplate<String, Object> redisTemplate;
    private final PostRepository postRepository;
    private final PostCommentRepository postCommentRepository;
    private final UserRepository userRepository;
    
    public InquiryNotificationService(
            RedisPublisher redisPublisher,
            RedisSubscriber redisSubscriber,
            RedisTemplate<String, Object> redisTemplate,
  			PostRepository postRepository,
  			PostCommentRepository postCommentRepository,
  			UserRepository userRepository
    ) {
        this.redisPublisher = redisPublisher;
        this.redisSubscriber = redisSubscriber;
        this.redisTemplate = redisTemplate;
		this.postRepository = postRepository;
        this.postCommentRepository = postCommentRepository;
  		this.userRepository = userRepository
    }
  
    public void publishNotification(Long postId, Long savedCommentId, Users user) {
        String senderId = user.getUserId();
        if (Utils.isAdminRole(user.getType())) {
            Users recipient = postJooqRepository.findAuthorByPostId(postId);
            publishEventToRedis(postId, savedCommentId, senderId, recipient.getUserId());
        } else {
            List<User> recipients = userRepository.findManagerByGroupId(user.getGroupId());
            for (Users recipient : recipients) {
                publishEventToRedis(postId, savedCommentId, senderId, recipient.getUserId());
            }
        }
    }

    public void publishEventToRedis(Long postId, Long savedCommentId, String senderId, String recipientId) {
        String notificationId = UUID.randomUUID().toString();
        Instant timestamp = Instant.now();

        Notification notification = new Notification(
                notificationId,
                postId,
                savedCommentId,
                senderId,
                recipientId,
                "New comment added",
                timestamp
        );
        String notificationKey = recipientId + ":" + savedCommentId;

        redisPublisher.saveNotificationWithTTL(notificationKey, notification, 3, TimeUnit.DAYS);
        redisPublisher.publish("commentNotification", notificationKey);
    }

    public SseEmitter createEmitter(String userId) {
        SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
        redisSubscriber.addEmitter(userId, emitter);

        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
        executor.scheduleAtFixedRate(() -> sendHeartbeat(userId, emitter, executor), 0, 15, TimeUnit.SECONDS);

        setEmitterCallbacks(userId, emitter, executor);
        return emitter;
    }

    private void sendHeartbeat(String userId, SseEmitter emitter, ScheduledExecutorService executor) {
        try {
            emitter.send(SseEmitter.event()
                    .name("heartbeat")
                    .data("heartbeat"));
        } catch (IOException e) {
            log.warn("Error sending heartbeat, connection might be closed. Removing emitter and shutting down executor.", e);
            redisSubscriber.removeEmitter(userId, emitter);
            emitter.completeWithError(e);
            executor.shutdown();
        }
    }

    private void setEmitterCallbacks(String userId, SseEmitter emitter, ScheduledExecutorService executor) {
        // 클라이언트 연결 종료 시 emitter 제거
        emitter.onCompletion(() -> {
            redisSubscriber.removeEmitter(userId, emitter);
            log.info("Emitter completed for user: {}", userId);
            executor.shutdown();
        });
        emitter.onTimeout(() -> {
            redisSubscriber.removeEmitter(userId, emitter);
            log.info("Emitter timed out for user: {}", userId);
            executor.shutdown();
        });
        emitter.onError((Throwable t) -> {
            redisSubscriber.removeEmitter(userId, emitter);
            log.error("Emitter error for user: {}", userId, t);
            executor.shutdown();
        });
    }

    public boolean isNotificationRead(String userId, Long postId) {
        List<PostComment> postComments = postCommentRepository.findPostCommentByPostId(postId);
        for (PostReply postComment : postComments) {
            // 게시글에 달린 댓글 id로 redis에 저장된 key가 있는지 확인
            String key = userId + ":" + postComment.getCommentId();
            Boolean existKey = redisTemplate.hasKey(key);
            if (Boolean.TRUE.equals(existKey)) {
                return false;  // 반복 중 하나라도 true이면 false 반환
            }
        }
        return true;
    }

    public void markAsCommentReadByPostId(String userId, Long postId) {
        List<PostComment> postComments = postCommentRepository.findPostCommentByPostId(postId);
        for (PostComment postComment : postComments) {
            String key = userId + ":" + postComment.getCommentId();
            redisTemplate.delete(key);
        }
    }
}
  • publishNotification: 댓글 작성자가 관리자 롤인지 일반 유저인지에 따라 알림을 발송한다. 관리자 롤인 경우 글 작성자에게만, 일반 유저인 경우 관련된 모든 관리자에게 알림을 발송한다.
  • publishEventToRedis: 알림 데이터를 생성하여 Redis에 저장하고, 특정 채널에 퍼블리시한다.
  • createEmitter: SSE Emitter를 생성하고, 주기적으로 heartbeat 메시지를 전송하여 연결을 유지한다.
  • sendHeartbeat: heartbeat 메시지를 전송하여 SSE 연결을 유지한다.
  • setEmitterCallbacks: Emitter의 콜백을 설정하여 완료, 타임아웃, 오류 시 Emitter를 제거한다.
  • isNotificationRead: 특정 유저가 특정 게시글에 달린 댓글을 읽었는지 여부를 확인한다.
  • markAsCommentReadByPostId: 특정 유저가 특정 게시글에 달린 모든 댓글을 읽음 상태로 표시한다.

엔트포인트

SSE 연결을 위한 엔드포인트

SSE(Server-Sent Events)를 사용하여 클라이언트와 서버 간의 실시간 연결을 유지하기 위한 엔드포인트

@GetMapping("/notifications")
public SseEmitter notificationSubscribe(
        @AuthenticationPrincipal LoginUser loginUser
) {
    return notificationService.createEmitter(loginUser.getUser().getUserId());
}
  • 클라이언트가 /notifications 엔드포인트에 접속하면 서버는 새로운 SseEmitter를 생성하고, 이를 redisSubscriber에 추가하여 관리한다.
  • 생성된 SseEmitter는 15초마다 heartbeat 메시지를 전송하여 연결을 유지한다.
  • 클라이언트와의 연결이 완료되거나 타임아웃, 오류가 발생하면 해당 SseEmitter는 redisSubscriber에서 제거되며, 이를 통해 불필요한 자원이 사용되지 않도록 관리한다.

댓글 등록 엔드포인트

댓글 작성 시 실시간 알림을 전송하기 위한 엔드포인트

@PostMapping("/{postId}/comments")
public ResponseEntity<Void> saveInquiryComment(
        @PathVariable Long postId,
        @AuthenticationPrincipal LoginUser loginUser,
        @RequestBody @Valid ReplyRequest replyRequest,
        BindingResult bindingResult
) {
    Long savedCommentId = postCommentService.saveComment(
            loginUser.getUser(),
            ReplyDto.toDto(loginUser.getUser(), postId, replyRequest)
    );

    URI location = URI.create("/users/posts/" + postId);
    return ResponseEntity.created(location).build();
}

이제 성능 개선과 리팩토링을 진행해봐야지.🏃

profile
Junior Back-end Developer 🫠

0개의 댓글