실시간 메시지 구현

WAS·2026년 4월 21일

사이드프로젝트

목록 보기
2/4

✅ 웹소켓
일반 HTTP는 클라이언트가 요청해야 서버가 응답하지만
채팅처럼 서버가 먼저 클라이언트에게 메시지를 보내야 하는 경우에는 사용하는 것

웹소켓은 한번 연결되면 양방향으로 실시간 통신이 가능한 프로토콜

즉 쉽게말해 상대방이 메시지를 보내면 내가 요청하지 않아도 즉시 받아야 한다.

HTTP : 클라이언트 → 요청 → 서버 → 응답 → 끝 (단방향, 연결 끊김) ex) : 문자메시지
WebSocket : 클라이언트 ↔ 서버 (양방향, 연결 유지) ex) : 전화통화


✅ STOMP

WebSocket은 그냥 데이터를 주고받는 통로일 뿐, 어디로 보낼지 규칙이 없음.
STOMP는 그 위에서 동작하는 프로토콜로 목적지(destination) 개념을 추가해줌.
topic/xxx → 해당 경로를 구독한 모든 사람에게 브로드캐스트
queue/xxx → 특정 1명에게만 전송


✅ Kafka

메시지를 큐에 쌓아두고 순서대로 처리하는 메시지 브로커.

💡 그럼 웹소켓을 사용하는데 kafka를 사용하는 이유가 멀까?

단순하게 컨트롤러를 통해서 메시지를 보낸다 가정하자 Controller -> WebSocket 전송
만약 동시에 1000명 메시지가 전송되면 컨트롤러가 모두 처리를 해야하고
서버 과부하나 장애가 발생할 수 있다.

Controller -> Kafka -> Consumer -> 처리

메시지를 큐에 저장하여, 순차처리 하고 서버 부하를 분산시킬 수 있다.
추가로 DB 저장과 알림전송 등 여러 작업을 비동기로 분리가 가능하다.


✅ Redis Pub/Sub

💡 Redis Pub/Sub 이란? Redis 안에 있는 기능 중 하나로 메시지 전달 시스템 역할을 한다.
💡 Redis의 발행(Publish) / 구독(Subscribe) 기능.

동작구조는 [Publisher] -> (채널) -> [Subscriber] 형식으로 동작한다.

그럼 왜 사용할까?
서버가 1대면 문제가 없지만 만약 운영에서는 아래와 같은 구조를 가진다고 가정하자

사용자 A ──WebSocket──▶ 서버 1 (8080)
사용자 B ──WebSocket──▶ 서버 2 (8081)

A가 B에게 메시지 전송
→ 서버 1은 서버 2에 연결된 B에게 직접 전달 불가 ❌

이러한 문제 때문에 Redis Pub/Sub이 중간다리 역할을 한다.

사용자 A
    │
    ▼
서버 1 (8080)
    │  Publish
    ▼
Redis "chat-channel"
    │
    ├──── Subscribe ──▶ 서버 1 (8080) ─▶ (해당 없음)
    │
    └──── Subscribe ──▶ 서버 2 (8081) ─▶ 사용자 B ✅

Redis Pub/Sub 를 사용하여
모든 서버가 동일 메시지를 수신할 수 있으며
모든 사용자에게 전달이 가능해진다.


-- 운영 추가 작업 ---

GitHub Actions의 자동 배포 설정 파일인 deploy.yml 에서 인스턴스를 2개 기동시킨다.

# 인스턴스 1 (8080)
nohup java -jar app.war -Dserver.port=8080 >> app.log 2>&1 &
echo $! > app.pid

# 인스턴스 2 (8081)
nohup java -jar app.war -Dserver.port=8081 >> app2.log 2>&1 &
echo $! > app2.pid

# 기동 확인
ss -tlnp | grep -E '8080|8081'

-- 운영 추가 작업 ---
Nginx 로드밸런서를 설정한다.

upstream springboot {
    ip_hash;                  # 같은 IP → 항상 같은 서버로 (세션 유지)
    server localhost:8080;
    server localhost:8081;
}

location / {
    proxy_pass http://springboot;
    proxy_set_header Host $host;
    proxy_set_header X-Real-IP $remote_addr;
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    proxy_set_header X-Forwarded-Proto $scheme;
}

💡 ip_hash를 쓰는 이유
Spring Session을 Redis에 저장하면 세션 공유가 가능하지만,
Spring Security OAuth2 (카카오 로그인) 객체는
Jackson 직렬화가 까다로워 에러가 자주 발생
ip_hash 적용 시 → 같은 사용자 = 항상 같은 서버
→ 세션 공유 없이도 로그인 유지 가능

전체적인 아키텍처

[브라우저]
     ↓  WebSocket(STOMP)
  [ChatMessageController]
     ↓  Kafka 발행
  [Kafka 브로커]
     ↓  소비
  [ChatKafkaConsumer]
     ↓  Redis 발행
  [Redis Pub/Sub]
     ↓  구독
  [RedisSubscriber]
     ↓  WebSocket(STOMP)
  [브라우저들]

    1. 클라이언트 -> 서버
   stompClient.send('/app/chat/send/' + roomId, {}, JSON.stringify({
        roomId:     '룸 id값',
        senderNo:   '유저식별번호',	
        message:    '메시지내용',
        receiverNo: '받는사람식별번호'
    }));

💡 /app : 서버 컨트롤러에 보내라는 명령어

/app 은 Spring WebSocket 설정에서 지정한 도착 prefix

registry.setApplicationDestinationPrefixes("/app");

    1. Controller (서버에서 받기)
  @Autowired
  private KafkaTemplate<String, ChatMessage> kafkaTemplate; // 

  @MessageMapping("/chat/send/{roomId}")
  public void sendMessage(@DestinationVariable String roomId,
                          @Payload ChatMessage chatMessage, Principal principal) {
        chatMessage.setRoomId(roomId);
        
        kafkaTemplate.send(KafkaConfig.CHAT_TOPIC, chatMessage);
    }

위에서 나오는 stompClient.send('경로') 는 HTTP 요청이 아닌 STOMP 메시지이다
그래서 HTTP 서블릿 전용@RequestParam @RequestBody @PathVariable 은 사용할 수 없다

그래서 경로에서 변수를 추출하는 @PathVariable 대신 -> @DestinationVariable 을 사용
바디(메시지내용)을 추출하는 @RequestBody 대신 -> @Payload 사용

@Payload 는 들어오는 json 값을 ChatMessage 자바 객체로 자동 변환해줌

💡 @MessageMappping 이란?
HTTP에서 클라이언트가 서버에 요청할 때 @PostMapping , @GetMapping 쓰는 것처럼
WebSocket에서 클라이언트가 서버에 메시지 보낼 때 쓰는게 @MessageMapping 이다.
경로를 적을 때는 앞에 prefix 을 제외하고 적기

2번 작업에서는 결국 WebSocket으로 받은 메시지를 Kafka chat-messages 토픽 에 발행하고 끝이 난다.
(DB 저장, 알림 전송 등 처리 X) -> 전부 Kafka Consumer 에게 위임
Kafka를 중간에 끼우는 이유는 위에서 말한것처럼 메시지 전송과 처리를 분리하여 서버 부하를 줄이려고


    1. Kafka Consumer (발행된 카프카 토픽메시지 처리)

2번 작업에서 Controllerchat-message 토픽 에 메시지를 발행했다.
3번 작업에서는 @KafkaListener 가 해당 토픽을 감시하다가 새 메시지가 감지되면 자동으로 실행된다.

  @KafkaListener(topics = KafkaConfig.CHAT_TOPIC, groupId = "chat-group")
  	// groupId = "chat-group" → 같은 그룹끼리는 메시지를 나눠서 처리 (서버 여러 대일 때 중복처리 방지)

					// Kafka에서 꺼낸 JSON을 Jackson이 자동으로 객체로 변환
    public void consumeSendChat(ChatMessage chatMessage) { 
        try {
            // 1. DB 저장
            Map<String, Object> map = new HashMap<>();
            map.put("roomId",     chatMessage.getRoomId());
            map.put("senderNo",   chatMessage.getSenderNo());
            map.put("message",    chatMessage.getMessage());
            map.put("receiverNo", chatMessage.getReceiverNo());
            chatService.insertChatMessage(map);

            // 2. DB에서 발신자 이름 + 저장된 시간 조회
            Map<?, ?> info = chatService.getLatestMessageNameTimeInfo(chatMessage.getRoomId());
            chatMessage.setSenderName((String) info.get("NAME"));
            chatMessage.setSentAt((Timestamp) info.get("SENT_AT"));

            // 3. Redis Publish (모든 서버에 브로드캐스트)
            // Redis 는 자바 객체를 모르기 때문에 json 문자열로 바꿔주고 보내줘야함
            String json = objectMapper.writeValueAsString(chatMessage);
            chatRedisTemplate.convertAndSend(RedisPubSubConfig.CHAT_CHANNEL, json);

        } catch (JsonProcessingException e) {
            log.error("ChatKafkaConsumer JSON 직렬화 오류", e);
        } catch (Exception e) {
            log.error("ChatKafkaConsumer 처리 오류", e);
        }
    }

이 작업에서는 결국 DB 저장 후 지정한 Redis 채널에 문자열 발행
그러면 다음작업에서는 해당 채널을 구독 중인 RedisSubscriber.onChatMessage()가 자동으로 호출

이후 처리를 RedisSubscriber 에게 넘김

어떤 설정환경을 통해서 자동으로 호출되는걸까?

	public static final String CHAT_CHANNEL = "chat-channel";

    @Bean
    // 어떤 클래스의 어떤 메소드을 호출할지 등록 -> (클래스명, 메소드명)
    // RedisSubscriber 클래스의 onChatMessage 메서드를 리스너로 등록하겠다는 설정
    public MessageListenerAdapter messageListenerAdapter(RedisSubscriber redisSubscriber) { 
        return new MessageListenerAdapter(redisSubscriber, "onChatMessage");
    } 
    
    @Bean
    // 어떤 채널을 감시할지 등록
  	public RedisMessageListenerContainer redisMessageListenerContainer
    (MessageListenerAdapter listenerAdapter) {
      RedisMessageListenerContainer container = new RedisMessageListenerContainer();
      container.setConnectionFactory(factory);
      container.addMessageListener(
      	listenerAdapter,  // 위에서 등록한 리스너 (누가 처리)
        new ChannelTopic(CHAT_CHANNEL) // 감시할 채널명 (어디서 감지)
      );
      return container;
  }

    1. Redis Subscriber

위에서 감시할 채널명을 등록해놨기 때문에 Redis "chat-channel" 에 메시지가 들어오면
등록해놓은 리스너를 호출한다 -> RedisSubscriber.onChatMessage() 호출

   public void onChatMessage(String message, String channel) {
        try {
        
        	// redis에서 받은 데이터들은 json 문자열로 다시 자바 객체로 변환해야함 (역직렬화)
            ChatMessage chatMessage = objectMapper.readValue(message, ChatMessage.class);
            String roomId   = chatMessage.getRoomId();
            int senderNo    = chatMessage.getSenderNo();
            int receiverNo  = chatMessage.getReceiverNo();

            // 1. 수신자 / 발신자 채팅창에 메시지 실시간 전송
            messagingTemplate.convertAndSendToUser(
                    String.valueOf(receiverNo), "/queue/chat/", chatMessage);
            messagingTemplate.convertAndSendToUser(
                    String.valueOf(senderNo),   "/queue/chat/", chatMessage);

            // 2. 채팅 목록 실시간 업데이트 (양쪽 모두)
            Map<String, Object> senderMap = new HashMap<>();
            senderMap.put("roomId", roomId);
            senderMap.put("userNo", senderNo);

            Map<String, Object> receiverMap = new HashMap<>();
            receiverMap.put("roomId", roomId);
            receiverMap.put("userNo", receiverNo);

            List<ChatRoom> senderRooms   = chatService.getSingleChatRoomInfo(senderMap);
            List<ChatRoom> receiverRooms = chatService.getSingleChatRoomInfo(receiverMap);

            if (!senderRooms.isEmpty()) {
                messagingTemplate.convertAndSend("/topic/chat-list/" + senderNo,   senderRooms.get(0));
            }
            if (!receiverRooms.isEmpty()) {
                messagingTemplate.convertAndSend("/topic/chat-list/" + receiverNo, receiverRooms.get(0));
            }

            // 3. 수신자 실시간 알림 (DB 저장 없이 WebSocket만 전송)
            // 채팅 메시지는 CHAT_MESSAGE 테이블에 이미 저장됨 → 별도 NOTIFICATION 불필요
            String toastMsg = chatMessage.getSenderName() + ": " + chatMessage.getMessage();

            Map<String, Object> notifyPayload = new HashMap<>();
            notifyPayload.put("notiMessage", toastMsg);
            notifyPayload.put("type", "MESSAGE");

            messagingTemplate.convertAndSendToUser(
                    String.valueOf(receiverNo), "/queue/notify", notifyPayload);

        } catch (Exception e) {
            log.error("RedisSubscriber onMessage 오류", e);
        }
    }

    1. 채팅 실시간 목록과, 메시지 실시간 전송, 실시간 알림에 대한 구독

💡 subscribe() 는 해당 경로를 구독하는 것.
서버에서 그 경로로 메시지가 오면 콜백 함수 가 자동 실행됨.

// ① 채팅창에 메시지 실시간 표시 (수신자, 발신자 모두)
messagingTemplate.convertAndSendToUser(receiverNo, "/queue/chat/", chatMessage);
messagingTemplate.convertAndSendToUser(senderNo,   "/queue/chat/", chatMessage);
    // 2) 메시지 수신
    stompClient.subscribe('/user/queue/chat/', function(message) {
        const msg = JSON.parse(message.body);
        appendMessage(msg);

        if (msg.senderNo != userId) {
            sendReadReceipt(); // 바로 읽음 처리 
        }
    });

// ② 채팅 목록 최신화 (마지막 메시지, 안읽은 수 등)
messagingTemplate.convertAndSend("/topic/chat-list/" + senderNo,   senderRoom);
messagingTemplate.convertAndSend("/topic/chat-list/" + receiverNo, receiverRoom);
    stompClient.connect({}, function() {
        stompClient.subscribe('/topic/chat-list/' + userNo, function(message) {
            updateChatListItem(JSON.parse(message.body));
        });
    });

messagingTemplate.convertAndSendToUser(receiverNo, "/queue/notify", notifyPayload);
stompClient.subscribe("/user/queue/notify", function (message) {
           const data = JSON.parse(message.body);
           if (data.type === 'MESSAGE') {
               // 채팅 메시지 → 채팅 아이콘 뱃지 증가
               updateChatBadge((parseInt(document.getElementById('chat-badge').textContent) || 0) + 1);
           } else {
               // 찜 알림 → 알림 벨 뱃지
               updateNotiBadge(data.noReadCnt);
               createToast(data.notiMessage);
           }
       });

💡queue 로 구독할 경우 설정파일에서 특정 사용자에게 메시지 전송시 사용할 Prefix 주소 설정 가능

registry.setUserDestinationPrefix("/user"); // 굳이 user로 안해도됨
profile
우측 상단 햇님모양 클릭하셔서 무조건 야간모드로 봐주세요!!

0개의 댓글