Stomp + Kafka를 이용한 1 대 1 채팅 구현 (3) - 채팅방 생성 및 접속 관리

오형상·2024년 9월 26일
0

Mutsa-SNS-Chat

목록 보기
3/4
post-thumbnail

이번 포스트에서는 StompKafka를 이용해 1대1 채팅에서 채팅방 생성, 사용자 접속 상태 관리 기능을 중심으로 설명하겠습니다.

1. 엔티티 생성

채팅방을 생성하고 관리하기 위해서는 Chat 엔티티가 필요합니다.

Chat 엔티티

@Entity
@Getter
@DynamicInsert
@AllArgsConstructor
@Builder
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class Chat {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    @Column(name = "chat_no")
    private Integer chatNo;

    @Column(name = "create_user")
    private Integer createUser;

    @Column(name = "join_user")
    private Integer joinUser;

    @Column(name = "reg_date")
    private LocalDateTime regDate;

    public MyChatRoomResponse toResponse(User user, Long cnt, String message) {
        return MyChatRoomResponse.builder()
                .chatRoomId(this.chatNo)
                .joinUserName(user.getUserName())
                .notReadMessageCnt(cnt)
                .lastContent(message)
                .build();
    }
}

2. 레포지토리 생성

채팅방 생성사용자 접속 상태를 관리하는 기능을 구현하기 위해서는 데이터베이스와 통신하는 레포지토리가 필요합니다.

ChatRoomRepository

public interface ChatRoomRepository extends JpaRepository<Chat, Integer> {

    @Query("select c from Chat c where c.createUser = :userId or c.joinUser = :userId")
    List<Chat> findChattingRoom(@Param("userId") Integer userId);

    @Query("select c from Chat c where (c.createUser = :myId and c.joinUser = :otherId) or (c.createUser = :otherId and c.joinUser = :myId)")
    Optional<Chat> findActiveChat(@Param("myId") Integer myId, @Param("otherId") Integer otherId);
}

설명:

  • findChattingRoom: 사용자가 참여 중인 채팅방을 모두 조회하는 메서드입니다.
  • findActiveChat: 두 사용자 간의 이미 활성화된 채팅방이 있는지 확인합니다.

RedisChatRoomRepository (실시간 접속 관리)

public interface RedisChatRoomRepository extends CrudRepository<ChatRoom, String> {

    List<ChatRoom> findByChatroomNo(Integer chatRoomNo);

    Optional<ChatRoom> findByChatroomNoAndUserName(Integer chatRoomNo, String userName);
}

설명:

  • findByChatroomNo: 특정 채팅방에 접속한 사용자들의 목록을 조회합니다.
  • findByChatroomNoAndUserName: 특정 채팅방에 특정 사용자가 접속했는지 확인합니다.

3. 서비스

이제 서비스 레이어에서 레포지토리를 활용하여 채팅방을 생성하고, 사용자의 접속 상태를 관리하는 방법을 설명하겠습니다.

ChatRoomService

@Service
@Transactional(readOnly = true)
@RequiredArgsConstructor
public class ChatRoomService {

    private final UserRepository userRepository;
    private final ChatRoomRepository chatRoomRepository;
    private final RedisChatRoomRepository redisChatRoomRepository;

    @Transactional
    public Chat makeChatRoom(String userName, ChatRequestDto requestDto) {

        // 사용자 조회
        User findUser = userRepository.findByUserName(userName)
                .orElseThrow(() -> new SnsAppException(USERNAME_NOT_FOUND, USERNAME_NOT_FOUND.getMessage()));
        User joinUser = userRepository.findByUserName(requestDto.getJoinUserName())
                .orElseThrow(() -> new SnsAppException(USERNAME_NOT_FOUND, USERNAME_NOT_FOUND.getMessage()));

        // 활성화된 채팅방이 있는지 확인
        chatRoomRepository.findActiveChat(findUser.getId(), joinUser.getId())
                .ifPresent(chat -> {
                    throw new SnsAppException(ALREADY_CHAT_ROOM, ALREADY_CHAT_ROOM.getMessage());
                });

        // 새로운 채팅방 생성 및 저장
        Chat chat = Chat.builder()
                        .createUser(findUser.getId())
                        .joinUser(joinUser.getId())
                        .regDate(LocalDateTime.now())
                        .build();

        return chatRoomRepository.save(chat);
    }

    @Transactional
    public void connectChatRoom(Integer chatRoomNo, String userName) {
        ChatRoom chatRoom = ChatRoom.builder()
                                    .userName(userName)
                                    .chatroomNo(chatRoomNo)
                                    .build();

        // Redis에 접속 정보 저장
        redisChatRoomRepository.save(chatRoom);
    }

    @Transactional
    public void disconnectChatRoom(Integer chatRoomNo, String userName) {
        ChatRoom chatRoom = redisChatRoomRepository.findByChatroomNoAndUserName(chatRoomNo, userName)
                .orElseThrow(IllegalStateException::new);

        // Redis에서 접속 정보 삭제
        redisChatRoomRepository.delete(chatRoom);
    }

    public boolean isConnected(Integer chatRoomNo) {
        List<ChatRoom> connectedList = redisChatRoomRepository.findByChatroomNo(chatRoomNo);
        return connectedList.size() == 1;
    }

    public boolean isAllConnected(Integer chatRoomNo) {
        List<ChatRoom> connectedList = redisChatRoomRepository.findByChatroomNo(chatRoomNo);
        return connectedList.size() == 2;
    }
    
        // 읽지 않은 메시지 카운트
    public long countUnReadMessages(Integer chatRoomNo, String senderName) {
        Query query = new Query(Criteria.where("chatRoomNo").is(chatRoomNo).and("readCount").is(1).and("senderName").ne(senderName));

        return mongoTemplate.count(query, Chatting.class);
    }

    public String findLastMessage(Integer chatRoomNo) {
        Query query = new Query(Criteria.where("chatRoomNo").is(chatRoomNo))
                .with(Sort.by(Sort.Order.desc("sendDate")))
                .limit(1);

        try {
            return mongoTemplate.findOne(query, Chatting.class).getContent();
        } catch (Exception e) {
            log.info(e.getMessage());
            return "";
        }

    }

    public void updateUnreadMessagesToRead(Integer chatRoomNo, String userName) {
        User findUser = userRepository.findByUserName(userName).orElseThrow(() -> new SnsAppException(USERNAME_NOT_FOUND, USERNAME_NOT_FOUND.getMessage()));

        Update update = new Update().set("readCount", 0);
        Query query = new Query(Criteria.where("chatRoomNo").is(chatRoomNo).and("senderName").ne(findUser.getUserName()));

        mongoTemplate.updateMulti(query, update, Chatting.class);
    }
}

설명:

  • makeChatRoom: 사용자가 요청한 새로운 채팅방을 생성합니다.
  • connectChatRoomdisconnectChatRoom: 사용자가 채팅방에 접속하거나 나갈 때 Redis에 접속 상태를 저장하거나 삭제합니다.
  • isConnectedisAllConnected는 현재 채팅방에 몇 명이 접속 중인지 확인합니다.
  • countUnReadMessages: 해당 채팅방에서 사용자가 읽지 않은 메시지의 개수를 반환합니다. 읽지 않은 메시지는 readCount가 1로 설정되어 있으며, 보낸 사람과 메시지 수신자를 기준으로 카운트합니다.
  • findLastMessage: 채팅방에서 가장 마지막으로 전송된 메시지를 조회하여 반환합니다. 이를 통해 채팅 목록에서 마지막 대화 내용을 표시할 수 있습니다.
  • updateUnreadMessagesToRead: 채팅방에 사용자가 접속했을 때 읽지 않은 메시지를 모두 읽음 처리합니다. readCount를 0으로 설정하여 더 이상 읽지 않은 메시지로 표시되지 않도록 업데이트합니다.

4. 채팅방 생성 API (컨트롤러)

이제 컨트롤러에서 클라이언트 요청을 받아 채팅방을 생성하고, 사용자 목록을 조회하는 API를 살펴보겠습니다.

ChatRestController

@Slf4j
@RestController
@RequiredArgsConstructor
@RequestMapping("/api/v1")
public class ChatRestController {

    private final ChatRoomService chatRoomService;

    @PostMapping("/chatroom")
    public ResponseEntity<Response<Chat>> createChatRoom(@RequestBody ChatRequestDto requestDto, Authentication authentication) {
        String myName = authentication.getName();

        // 채팅방을 생성
        Chat chat = chatRoomService.makeChatRoom(myName, requestDto);

        return ResponseEntity.ok(Response.success(chat));
    }

    @GetMapping("/my-chatroom")
    public ResponseEntity<Response<List<MyChatRoomResponse>>> chatRoomList(Authentication authentication) {
        String myName = authentication.getName();

        List<MyChatRoomResponse> chatRoomList = chatRoomService.getChatRoomList(myName);

        return ResponseEntity.ok(Response.success(chatRoomList));
    }
}

설명:

  • createChatRoom: 클라이언트 요청에 따라 새로운 채팅방을 생성하는 API입니다.
  • chatRoomList: 현재 사용자가 참여 중인 채팅방 목록을 조회하는 API입니다.

5. WebSocket 연결 및 실시간 관리 (핸들러)

마지막으로 StompHandler를 통해 WebSocket 연결을 관리하고, JWT 토큰 검증을 거친 후, 실시간 접속 상태를 Redis에 저장하는 방식을 설명합니다.

StompHandler

@Slf4j
@Component
@RequiredArgsConstructor
@Order(Ordered.HIGHEST_PRECEDENCE + 99)
public class StompHandler implements ChannelInterceptor {

    private final ChatRoomService chatRoomService;
    private final ChatService chatService;

    @Value("${jwt.secret}")
    private String secretKey;

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);

		// DISCONNECT은 헤더에 값을 담을 수 없으므로 따로 처리
        if (StompCommand.DISCONNECT.equals(accessor.getCommand())) {
            log.info("DISCONNECT command received, skipping token verification.");
            return message;
        }

        String userName = verifyAccessToken(getAccessToken(accessor));
        handleMessage(accessor.getCommand(), accessor, userName);

        return message;
    }

    private String getAccessToken(StompHeaderAccessor accessor) {
        String authorizationHeader = accessor.getFirstNativeHeader("Authorization");
        if (authorizationHeader == null) {
            log.info("chat header가 없는 요청입니다.");
            throw new MalformedJwtException("jwt");
        }

        return authorizationHeader.replace("Bearer ", "");
    }

    private String verifyAccessToken(String accessToken)

 {
        if (JwtUtils.isExpired(accessToken, secretKey)) {
            throw new IllegalStateException("Expired token");
        }

        return JwtUtils.getUserName(accessToken, secretKey);
    }

    private void handleMessage(StompCommand stompCommand, StompHeaderAccessor accessor, String userName) {
        if (StompCommand.CONNECT.equals(stompCommand)) {
            connectToChatRoom(accessor, userName);
        }
    }

    private void connectToChatRoom(StompHeaderAccessor accessor, String userName) {
        Integer chatRoomNo = Integer.valueOf(accessor.getFirstNativeHeader("chatRoomNo"));

        // 접속 중인 사용자 확인 및 접속 상태 관리
        chatRoomService.connectChatRoom(chatRoomNo, userName);
        boolean isConnected = chatRoomService.isConnected(chatRoomNo);

        // 접속한 유저가 이미 있으면 메시지 처리
        if (isConnected) {
            chatService.updateMessage(userName, chatRoomNo);
        }
    }
}

설명:

  • preSend: WebSocket 연결 시 JWT 토큰을 검증하고, 사용자가 채팅방에 접속했을 때 접속 상태를 관리합니다.
  • connectToChatRoom: 사용자가 채팅방에 접속하면 Redis에 접속 정보를 저장하고, 접속 상태를 확인하여 메시지 처리를 진행합니다.

0개의 댓글