Stomp + Kafka를 이용한 1 대 1 채팅 구현 (4) - 메시지 전송 및 처리

오형상·2024년 9월 26일
0

Mutsa-SNS-Chat

목록 보기
4/4
post-thumbnail

이번 포스트에서는 StompKafka를 이용한 1대1 채팅에서 메시지 전송, 메시지 저장, Kafka를 통한 실시간 메시지 전송 과정을 설명하겠습니다.

1. 메시지 전송 및 저장 (엔티티)

메시지를 전송하고 관리하기 위해서는 Message 엔티티가 필요합니다. 이 엔티티는 메시지 데이터를 관리하며, 사용자가 채팅을 주고받는 데 사용됩니다.

Message 엔티티

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Message {

    private String id;         // MongoDB에서 사용하는 메시지 ID
    private Integer chatNo;     // 채팅방 번호
    private String senderName;  // 메시지를 보낸 사람
    private String content;     // 메시지 내용
    private LocalDateTime sendDate;
    private long readCount;     // 읽지 않은 메시지 카운트

    public Chatting toEntity() {
        return Chatting.builder()
                .chatRoomNo(chatNo)
                .senderName(senderName)
                .content(content)
                .sendDate(sendDate)
                .readCount(readCount)
                .build();
    }
}

2. 메시지 저장 및 조회 (레포지토리)

메시지 전송 후에는 해당 메시지를 MongoDB에 저장하고, 사용자가 이전 메시지 내역을 조회할 수 있어야 합니다. 이를 위해 MongoChatRepository를 사용하여 데이터를 처리합니다.

MongoChatRepository

public interface MongoChatRepository extends MongoRepository<Chatting, String> {

    List<Chatting> findByChatRoomNo(Integer chatNo);

    Page<Chatting> findByChatRoomNoOrderBySendDateDesc(Integer chatRoomNo, Pageable pageable);
}

설명:

  • findByChatRoomNo: 특정 채팅방에 속한 모든 메시지를 조회합니다.
  • findByChatRoomNoOrderBySendDateDesc: 채팅방 내의 메시지를 전송 날짜 기준으로 내림차순 정렬하여 페이지 단위로 조회합니다.

3. 메시지 전송 처리 (서비스)

ChatService에서는 사용자가 보낸 메시지를 처리하고, 메시지를 저장하거나 Kafka로 전송하는 역할을 담당합니다.

ChatService

@Service
@Transactional(readOnly = true)
@RequiredArgsConstructor
public class ChatService {

    private final MongoChatRepository mongoChatRepository;
    private final MessageSender sender;
    private final UserRepository userRepository;
    private final AlarmRepository alarmRepository;
    private final ChatRoomService chatRoomService;

    public ChattingHistoryResponseDto getChattingList(Integer chatRoomNo, String userName) {
        List<ChatResponseDto> chattingList = mongoChatRepository.findByChatRoomNo(chatRoomNo).stream()
                .map(chat -> new ChatResponseDto(chat, userName))
                .collect(Collectors.toList());
        return ChattingHistoryResponseDto.builder()
                .chatList(chattingList)
                .userName(userName)
                .build();
    }

    public void sendMessage(Message message, String accessToken) {
        String token = JwtUtils.extractToken(accessToken);

        // 메시지 전송 요청 헤더에 포함된 AccessToken에서 userName을 추출해 회원을 조회한다.
        User user = userRepository.findByUserName(JwtUtils.getUserName(token, secretKey))
                .orElseThrow(() -> new SnsAppException(USERNAME_NOT_FOUND, USERNAME_NOT_FOUND.getMessage()));

        // 메시지를 전송하기 전에 상대방이 채팅방에 접속 중인지 확인
        boolean isConnectedAll = chatRoomService.isAllConnected(message.getChatNo());
        Integer readCount = isConnectedAll ? 0 : 1;

        // 메시지 객체에 전송 시간과 보낸 사람을 셋팅
        message.setSendTimeAndSender(LocalDateTime.now(), user.getUserName(), readCount);

        // Kafka를 통해 메시지 전송
        sender.send(KAFKA_TOPIC, message);
    }

    @Transactional
    public Message sendAlarmAndSaveMessage(Message message, String userName) {
        // 메시지 저장 및 알림 발송
        User sender = userRepository.findByUserName(message.getSenderName())
                .orElseThrow(() -> new SnsAppException(USERNAME_NOT_FOUND, USERNAME_NOT_FOUND.getMessage()));

        if (message.getReadCount() == 1) {
            Chat findChat = chatRoomRepository.findById(message.getChatNo()).orElseThrow();
            User recipient = userRepository.findById(findChat.getJoinUser()).orElseThrow();

            alarmRepository.save(Alarm.builder()
                    .user(recipient)
                    .alarmType(NEW_CHAT)
                    .text(NEW_CHAT.getAlarmText())
                    .targetId(recipient.getId())
                    .fromUserId(sender.getId())
                    .build());
        }

        if (message.getSenderName().equals(userName)) {
            Chatting chatting = message.toEntity();
            Chatting savedChat = mongoChatRepository.save(chatting);
            message.setId(savedChat.getId());
        }

        return message;
    }
}

설명:

  • sendMessage: 사용자가 전송한 메시지를 Kafka로 전송하고, JWT 인증을 통해 사용자 정보를 추출한 후 메시지를 처리합니다.
  • sendAlarmAndSaveMessage: 메시지를 MongoDB에 저장하고, 상대방이 메시지를 읽지 않았을 때 알림을 전송합니다.

4. 메시지 전송 API (컨트롤러)

컨트롤러는 메시지 전송메시지 조회 API를 제공합니다. 클라이언트는 이 API를 통해 메시지를 주고받을 수 있습니다.

ChatRestController

@Slf4j
@RestController
@RequiredArgsConstructor
@RequestMapping("/api/v1")
public class ChatRestController {

    private final ChatService chatService;

    // 메시지 전송 및 알림 저장
    @PostMapping("/chatroom/message-alarm-record")
    public ResponseEntity<Response<Message>> sendNotification(@RequestBody Message message, Authentication authentication) {
        String userName = authentication.getName();
        Message savedMessage = chatService.sendAlarmAndSaveMessage(message, userName);
        return ResponseEntity.ok(Response.success(savedMessage));
    }

    // 메시지 전송 (WebSocket)
    @MessageMapping("/message")
    public void sendMessage(Message message, @Header("Authorization") final String accessToken) {
        log.info("보낸 메세지 : {}", message.toString());
        chatService.sendMessage(message, accessToken);
    }
}

5. 메시지 전송 실시간 처리

마지막으로, Kafka를 통해 메시지를 실시간으로 전송하고 전송된 메시지를 구독하는 로직을 살펴보겠습니다.

MessageSender

@Slf4j
@Service
@RequiredArgsConstructor
public class MessageSender {

    private final KafkaTemplate<String, Message> kafkaTemplate;

    public void send(String topic, Message data) {
        kafkaTemplate.send(topic, data);
    }
}

설명:

  • MessageSenderKafkaTemplate을 사용하여 메시지를 지정된 Kafka 토픽으로 전송합니다.

MessageReceiver

@Slf4j
@Service
@RequiredArgsConstructor
public class MessageReceiver {

    private final SimpMessagingTemplate messagingTemplate;

    @KafkaListener(topics = KafkaConstants.KAFKA_TOPIC, containerFactory = "kafkaListenerContainerFactory")
    public void receiveMessage(Message message) {
        messagingTemplate.convertAndSend("/subscribe/" + message.getChatNo(), message);
    }
}

설명:

  • MessageReceiver는 Kafka에서 메시지를 수신한 후, SimpMessagingTemplate을 통해 실시간으로 클라이언트에게 메시지를 전송합니다.

0개의 댓글