이전에 웹소켓 STOMP Kafka를 활용하여 메세지 수신이 잘 되는 것은 확인이 되었다. 하지만 jwt 유효성 검사라던지, 방 접속에 따른 메시지 읽음 문제를 해결하기 위해 아래에 configureClientInboundChannel를 만들어서 넣어주었다.
@RequiredArgsConstructor
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
private final StompHandler stompHandler;
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// `/chatting/queue/room/{room_id}'
registry.enableSimpleBroker("/chatting/topic");
registry.setApplicationDestinationPrefixes("/chatting/pub");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry
.addEndpoint("/ws-chat")
.setAllowedOrigins("*");
registry.addEndpoint("/ws-chat").setAllowedOrigins("*").withSockJS();
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(stompHandler);
}
}
@Component
@RequiredArgsConstructor
@Slf4j
@Order(Ordered.HIGHEST_PRECEDENCE + 99)
public class StompHandler implements ChannelInterceptor {
private final JwtTokenProvider tokenProvider;
private final RedisTemplate<String, String> chatRoomConnectTemplate;
private final ChatService chatService;
private final Logger logger = LoggerFactory.getLogger(StompHandler.class);
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
final StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
String jwtToken = accessor.getFirstNativeHeader("Authorization");
switch (accessor.getCommand()){
case CONNECT:
// jwtToken이 null 또는 빈 문자열이 아닌 경우에만 validateToken 호출
if (jwtToken != null && !jwtToken.isEmpty()) {
tokenProvider.validateToken(jwtToken);
} else {
logger.warn("JWT Token is null or empty");
}
break;
case SUBSCRIBE:
String destination = accessor.getDestination();
int lastIndex = destination.lastIndexOf('/');
String roomId = destination.substring(lastIndex + 1);
String email = tokenProvider.extractUserEmail(jwtToken);
String key = "chatroom:" + roomId;
logger.info("Room ID is {}", roomId);
chatRoomConnectTemplate.opsForSet().add(key, email);
Set<String> users = chatRoomConnectTemplate.opsForSet().members(key);
logger.info("SUBSCRIBE size is {}", users.size());
// 읽지 않은 채팅 읽음 처리
chatService.decreaseReadCount(Long.valueOf(roomId), email);
// 입장 메세지 전송
chatService.updateMessage(Long.valueOf(roomId),email);
break;
case UNSUBSCRIBE:
String subscriptionId = accessor.getSubscriptionId();
lastIndex = subscriptionId.lastIndexOf('/');
roomId = subscriptionId.substring(lastIndex + 1);
key = "chatroom:" + roomId;
email = tokenProvider.extractUserEmail(jwtToken);
logger.info("Room ID is {}", roomId);
// 채팅방 현재 인원 최신화.
chatRoomConnectTemplate.opsForSet().remove(key, email);
users = chatRoomConnectTemplate.opsForSet().members(key);
logger.info("UNSUBSCRIBE. size is {}", users.size());
break;
}
return message;
}
}
Presend 는 메세지를 보내기 전에 실행하는 작업으로, accessor.getCommand()에 따라 특정 액션이 작동하게끔 설정했다.
Connect -> jwt의 유효성 검사
SUBSCRIBE -> redis에 채팅방 접속한 유저 삽입
UNSUBSCRIBE -> 채팅방 퇴장 유저 제거
@MessageMapping("/message")
public void sendSocketMessage(ChatDto chatDto,
@Header("Authorization") String Authorization) {
logger.info("sendSocketMessage called with Authorization: {}", Authorization);
if(!chatRoomService.existsRoom(chatDto.getRoomId())){
return;
}
String email = jwtTokenProvider.extractUserEmail(Authorization);
String key = "chatroom:" + chatDto.getRoomId().toString();
int size = chatRoomConnectTemplate.opsForSet().members(key).size();
chatDto.setChatType(Chat.ChatType.MESSAGE);
ChatDto savedMessage = chatService.saveChatMessage(chatDto,email);
if (size > 1){
savedMessage.setReadCount(0);
}
else {
savedMessage.setReadCount(1);
}
producer.sendMessage(savedMessage);
}
@Header("Authorization")를 넣어서 jwt 값을 받아왔고,
를 통해서 프로듀서에 메세지를 보내기 전에 readcount를 설정해주었다.