kafka를 활용한 STOMP 채팅 서비스 구축 (4) - configureClientInboundChannel

seongcheollee·2024년 1월 18일
0
post-thumbnail

이전에 웹소켓 STOMP Kafka를 활용하여 메세지 수신이 잘 되는 것은 확인이 되었다. 하지만 jwt 유효성 검사라던지, 방 접속에 따른 메시지 읽음 문제를 해결하기 위해 아래에 configureClientInboundChannel를 만들어서 넣어주었다.


@RequiredArgsConstructor
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    private final StompHandler stompHandler;

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // `/chatting/queue/room/{room_id}'
        registry.enableSimpleBroker("/chatting/topic");
        registry.setApplicationDestinationPrefixes("/chatting/pub");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry
                .addEndpoint("/ws-chat")
                .setAllowedOrigins("*");
        registry.addEndpoint("/ws-chat").setAllowedOrigins("*").withSockJS();
    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(stompHandler);

    }
}
@Component
@RequiredArgsConstructor
@Slf4j
@Order(Ordered.HIGHEST_PRECEDENCE + 99)
public class StompHandler implements ChannelInterceptor {

    private final JwtTokenProvider tokenProvider;
    private final RedisTemplate<String, String> chatRoomConnectTemplate;
    private final ChatService chatService;


    private final Logger logger = LoggerFactory.getLogger(StompHandler.class);
    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        final StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        String jwtToken = accessor.getFirstNativeHeader("Authorization");

        switch (accessor.getCommand()){
            case CONNECT:
                // jwtToken이 null 또는 빈 문자열이 아닌 경우에만 validateToken 호출
                if (jwtToken != null && !jwtToken.isEmpty()) {
                    tokenProvider.validateToken(jwtToken);
                } else {
                    logger.warn("JWT Token is null or empty");
                }
                break;

            case SUBSCRIBE:

                String destination = accessor.getDestination();
                int lastIndex = destination.lastIndexOf('/');
                String roomId = destination.substring(lastIndex + 1);
                String email = tokenProvider.extractUserEmail(jwtToken);
                String key = "chatroom:" + roomId;

                logger.info("Room ID is {}", roomId);

                chatRoomConnectTemplate.opsForSet().add(key, email);
                Set<String> users = chatRoomConnectTemplate.opsForSet().members(key);
                logger.info("SUBSCRIBE size is {}", users.size());

               //  읽지 않은 채팅 읽음 처리
                chatService.decreaseReadCount(Long.valueOf(roomId), email);

                // 입장 메세지 전송
                chatService.updateMessage(Long.valueOf(roomId),email);
                break;

            case UNSUBSCRIBE:

                String subscriptionId = accessor.getSubscriptionId();
                lastIndex = subscriptionId.lastIndexOf('/');
                roomId = subscriptionId.substring(lastIndex + 1);
                key = "chatroom:" + roomId;
                email = tokenProvider.extractUserEmail(jwtToken);
                logger.info("Room ID is {}", roomId);


                // 채팅방 현재 인원 최신화.
                chatRoomConnectTemplate.opsForSet().remove(key, email);
                users = chatRoomConnectTemplate.opsForSet().members(key);
                logger.info("UNSUBSCRIBE. size is {}", users.size());
                break;

                }
        return message;
    }
}

Presend 는 메세지를 보내기 전에 실행하는 작업으로, accessor.getCommand()에 따라 특정 액션이 작동하게끔 설정했다.

Connect -> jwt의 유효성 검사
SUBSCRIBE -> redis에 채팅방 접속한 유저 삽입
UNSUBSCRIBE -> 채팅방 퇴장 유저 제거

    @MessageMapping("/message")
    public void sendSocketMessage(ChatDto chatDto,
                                  @Header("Authorization") String Authorization) {

        logger.info("sendSocketMessage called with Authorization: {}", Authorization);

        if(!chatRoomService.existsRoom(chatDto.getRoomId())){
            return;
        }
        String email = jwtTokenProvider.extractUserEmail(Authorization);
        String key = "chatroom:" + chatDto.getRoomId().toString();
        int size = chatRoomConnectTemplate.opsForSet().members(key).size();
        chatDto.setChatType(Chat.ChatType.MESSAGE);
        ChatDto savedMessage = chatService.saveChatMessage(chatDto,email);

        if (size > 1){
            savedMessage.setReadCount(0);
        }
        else {
            savedMessage.setReadCount(1);
        }
        producer.sendMessage(savedMessage);

    }

@Header("Authorization")를 넣어서 jwt 값을 받아왔고,

를 통해서 프로듀서에 메세지를 보내기 전에 readcount를 설정해주었다.

  • 현재 유저 A가 방에 존재하고 있는 상태에서, 유저 B가 들어왔을 때의 readcount 처리는 메세지의 구조에서 enum 클래스의 chatType을 넣어 ENTER 메세지를 수신받을 때 새로고침을 하게하거나, 아니면 서버에서 데이터를 재요청하는 방식을 고민하고 있다.

  • kafak의 경우 기존 데이터를 수정할 수 없기 때문에, 서버에서 데이터를 직접적으로 건드리기 보단(mysql에 쌓이는 데이터는 입장에 따라 최신화가 된다.) 트리거를 전송하여 클라이언트에서 변경하는 것이 가장 깔끔하지 않을까 싶다.

0개의 댓글