이번 포스트에서는 Stomp와 Kafka를 이용해 1대1 채팅에서 채팅방 생성, 사용자 접속 상태 관리 기능을 중심으로 설명하겠습니다.
채팅방을 생성하고 관리하기 위해서는 Chat 엔티티가 필요합니다.
@Entity
@Getter
@DynamicInsert
@AllArgsConstructor
@Builder
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class Chat {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = "chat_no")
private Integer chatNo;
@Column(name = "create_user")
private Integer createUser;
@Column(name = "join_user")
private Integer joinUser;
@Column(name = "reg_date")
private LocalDateTime regDate;
public MyChatRoomResponse toResponse(User user, Long cnt, String message) {
return MyChatRoomResponse.builder()
.chatRoomId(this.chatNo)
.joinUserName(user.getUserName())
.notReadMessageCnt(cnt)
.lastContent(message)
.build();
}
}
채팅방 생성 및 사용자 접속 상태를 관리하는 기능을 구현하기 위해서는 데이터베이스와 통신하는 레포지토리가 필요합니다.
public interface ChatRoomRepository extends JpaRepository<Chat, Integer> {
@Query("select c from Chat c where c.createUser = :userId or c.joinUser = :userId")
List<Chat> findChattingRoom(@Param("userId") Integer userId);
@Query("select c from Chat c where (c.createUser = :myId and c.joinUser = :otherId) or (c.createUser = :otherId and c.joinUser = :myId)")
Optional<Chat> findActiveChat(@Param("myId") Integer myId, @Param("otherId") Integer otherId);
}
public interface RedisChatRoomRepository extends CrudRepository<ChatRoom, String> {
List<ChatRoom> findByChatroomNo(Integer chatRoomNo);
Optional<ChatRoom> findByChatroomNoAndUserName(Integer chatRoomNo, String userName);
}
이제 서비스 레이어에서 레포지토리를 활용하여 채팅방을 생성하고, 사용자의 접속 상태를 관리하는 방법을 설명하겠습니다.
@Service
@Transactional(readOnly = true)
@RequiredArgsConstructor
public class ChatRoomService {
private final UserRepository userRepository;
private final ChatRoomRepository chatRoomRepository;
private final RedisChatRoomRepository redisChatRoomRepository;
@Transactional
public Chat makeChatRoom(String userName, ChatRequestDto requestDto) {
// 사용자 조회
User findUser = userRepository.findByUserName(userName)
.orElseThrow(() -> new SnsAppException(USERNAME_NOT_FOUND, USERNAME_NOT_FOUND.getMessage()));
User joinUser = userRepository.findByUserName(requestDto.getJoinUserName())
.orElseThrow(() -> new SnsAppException(USERNAME_NOT_FOUND, USERNAME_NOT_FOUND.getMessage()));
// 활성화된 채팅방이 있는지 확인
chatRoomRepository.findActiveChat(findUser.getId(), joinUser.getId())
.ifPresent(chat -> {
throw new SnsAppException(ALREADY_CHAT_ROOM, ALREADY_CHAT_ROOM.getMessage());
});
// 새로운 채팅방 생성 및 저장
Chat chat = Chat.builder()
.createUser(findUser.getId())
.joinUser(joinUser.getId())
.regDate(LocalDateTime.now())
.build();
return chatRoomRepository.save(chat);
}
@Transactional
public void connectChatRoom(Integer chatRoomNo, String userName) {
ChatRoom chatRoom = ChatRoom.builder()
.userName(userName)
.chatroomNo(chatRoomNo)
.build();
// Redis에 접속 정보 저장
redisChatRoomRepository.save(chatRoom);
}
@Transactional
public void disconnectChatRoom(Integer chatRoomNo, String userName) {
ChatRoom chatRoom = redisChatRoomRepository.findByChatroomNoAndUserName(chatRoomNo, userName)
.orElseThrow(IllegalStateException::new);
// Redis에서 접속 정보 삭제
redisChatRoomRepository.delete(chatRoom);
}
public boolean isConnected(Integer chatRoomNo) {
List<ChatRoom> connectedList = redisChatRoomRepository.findByChatroomNo(chatRoomNo);
return connectedList.size() == 1;
}
public boolean isAllConnected(Integer chatRoomNo) {
List<ChatRoom> connectedList = redisChatRoomRepository.findByChatroomNo(chatRoomNo);
return connectedList.size() == 2;
}
// 읽지 않은 메시지 카운트
public long countUnReadMessages(Integer chatRoomNo, String senderName) {
Query query = new Query(Criteria.where("chatRoomNo").is(chatRoomNo).and("readCount").is(1).and("senderName").ne(senderName));
return mongoTemplate.count(query, Chatting.class);
}
public String findLastMessage(Integer chatRoomNo) {
Query query = new Query(Criteria.where("chatRoomNo").is(chatRoomNo))
.with(Sort.by(Sort.Order.desc("sendDate")))
.limit(1);
try {
return mongoTemplate.findOne(query, Chatting.class).getContent();
} catch (Exception e) {
log.info(e.getMessage());
return "";
}
}
public void updateUnreadMessagesToRead(Integer chatRoomNo, String userName) {
User findUser = userRepository.findByUserName(userName).orElseThrow(() -> new SnsAppException(USERNAME_NOT_FOUND, USERNAME_NOT_FOUND.getMessage()));
Update update = new Update().set("readCount", 0);
Query query = new Query(Criteria.where("chatRoomNo").is(chatRoomNo).and("senderName").ne(findUser.getUserName()));
mongoTemplate.updateMulti(query, update, Chatting.class);
}
}
이제 컨트롤러에서 클라이언트 요청을 받아 채팅방을 생성하고, 사용자 목록을 조회하는 API를 살펴보겠습니다.
@Slf4j
@RestController
@RequiredArgsConstructor
@RequestMapping("/api/v1")
public class ChatRestController {
private final ChatRoomService chatRoomService;
@PostMapping("/chatroom")
public ResponseEntity<Response<Chat>> createChatRoom(@RequestBody ChatRequestDto requestDto, Authentication authentication) {
String myName = authentication.getName();
// 채팅방을 생성
Chat chat = chatRoomService.makeChatRoom(myName, requestDto);
return ResponseEntity.ok(Response.success(chat));
}
@GetMapping("/my-chatroom")
public ResponseEntity<Response<List<MyChatRoomResponse>>> chatRoomList(Authentication authentication) {
String myName = authentication.getName();
List<MyChatRoomResponse> chatRoomList = chatRoomService.getChatRoomList(myName);
return ResponseEntity.ok(Response.success(chatRoomList));
}
}
마지막으로 StompHandler를 통해 WebSocket 연결을 관리하고, JWT 토큰 검증을 거친 후, 실시간 접속 상태를 Redis에 저장하는 방식을 설명합니다.
@Slf4j
@Component
@RequiredArgsConstructor
@Order(Ordered.HIGHEST_PRECEDENCE + 99)
public class StompHandler implements ChannelInterceptor {
private final ChatRoomService chatRoomService;
private final ChatService chatService;
@Value("${jwt.secret}")
private String secretKey;
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
// DISCONNECT은 헤더에 값을 담을 수 없으므로 따로 처리
if (StompCommand.DISCONNECT.equals(accessor.getCommand())) {
log.info("DISCONNECT command received, skipping token verification.");
return message;
}
String userName = verifyAccessToken(getAccessToken(accessor));
handleMessage(accessor.getCommand(), accessor, userName);
return message;
}
private String getAccessToken(StompHeaderAccessor accessor) {
String authorizationHeader = accessor.getFirstNativeHeader("Authorization");
if (authorizationHeader == null) {
log.info("chat header가 없는 요청입니다.");
throw new MalformedJwtException("jwt");
}
return authorizationHeader.replace("Bearer ", "");
}
private String verifyAccessToken(String accessToken)
{
if (JwtUtils.isExpired(accessToken, secretKey)) {
throw new IllegalStateException("Expired token");
}
return JwtUtils.getUserName(accessToken, secretKey);
}
private void handleMessage(StompCommand stompCommand, StompHeaderAccessor accessor, String userName) {
if (StompCommand.CONNECT.equals(stompCommand)) {
connectToChatRoom(accessor, userName);
}
}
private void connectToChatRoom(StompHeaderAccessor accessor, String userName) {
Integer chatRoomNo = Integer.valueOf(accessor.getFirstNativeHeader("chatRoomNo"));
// 접속 중인 사용자 확인 및 접속 상태 관리
chatRoomService.connectChatRoom(chatRoomNo, userName);
boolean isConnected = chatRoomService.isConnected(chatRoomNo);
// 접속한 유저가 이미 있으면 메시지 처리
if (isConnected) {
chatService.updateMessage(userName, chatRoomNo);
}
}
}