이번 포스트에서는 Stomp와 Kafka를 이용한 1대1 채팅에서 메시지 전송, 메시지 저장, Kafka를 통한 실시간 메시지 전송 과정을 설명하겠습니다.
메시지를 전송하고 관리하기 위해서는 Message 엔티티가 필요합니다. 이 엔티티는 메시지 데이터를 관리하며, 사용자가 채팅을 주고받는 데 사용됩니다.
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Message {
private String id; // MongoDB에서 사용하는 메시지 ID
private Integer chatNo; // 채팅방 번호
private String senderName; // 메시지를 보낸 사람
private String content; // 메시지 내용
private LocalDateTime sendDate;
private long readCount; // 읽지 않은 메시지 카운트
public Chatting toEntity() {
return Chatting.builder()
.chatRoomNo(chatNo)
.senderName(senderName)
.content(content)
.sendDate(sendDate)
.readCount(readCount)
.build();
}
}
메시지 전송 후에는 해당 메시지를 MongoDB에 저장하고, 사용자가 이전 메시지 내역을 조회할 수 있어야 합니다. 이를 위해 MongoChatRepository를 사용하여 데이터를 처리합니다.
public interface MongoChatRepository extends MongoRepository<Chatting, String> {
List<Chatting> findByChatRoomNo(Integer chatNo);
Page<Chatting> findByChatRoomNoOrderBySendDateDesc(Integer chatRoomNo, Pageable pageable);
}
ChatService에서는 사용자가 보낸 메시지를 처리하고, 메시지를 저장하거나 Kafka로 전송하는 역할을 담당합니다.
@Service
@Transactional(readOnly = true)
@RequiredArgsConstructor
public class ChatService {
private final MongoChatRepository mongoChatRepository;
private final MessageSender sender;
private final UserRepository userRepository;
private final AlarmRepository alarmRepository;
private final ChatRoomService chatRoomService;
public ChattingHistoryResponseDto getChattingList(Integer chatRoomNo, String userName) {
List<ChatResponseDto> chattingList = mongoChatRepository.findByChatRoomNo(chatRoomNo).stream()
.map(chat -> new ChatResponseDto(chat, userName))
.collect(Collectors.toList());
return ChattingHistoryResponseDto.builder()
.chatList(chattingList)
.userName(userName)
.build();
}
public void sendMessage(Message message, String accessToken) {
String token = JwtUtils.extractToken(accessToken);
// 메시지 전송 요청 헤더에 포함된 AccessToken에서 userName을 추출해 회원을 조회한다.
User user = userRepository.findByUserName(JwtUtils.getUserName(token, secretKey))
.orElseThrow(() -> new SnsAppException(USERNAME_NOT_FOUND, USERNAME_NOT_FOUND.getMessage()));
// 메시지를 전송하기 전에 상대방이 채팅방에 접속 중인지 확인
boolean isConnectedAll = chatRoomService.isAllConnected(message.getChatNo());
Integer readCount = isConnectedAll ? 0 : 1;
// 메시지 객체에 전송 시간과 보낸 사람을 셋팅
message.setSendTimeAndSender(LocalDateTime.now(), user.getUserName(), readCount);
// Kafka를 통해 메시지 전송
sender.send(KAFKA_TOPIC, message);
}
@Transactional
public Message sendAlarmAndSaveMessage(Message message, String userName) {
// 메시지 저장 및 알림 발송
User sender = userRepository.findByUserName(message.getSenderName())
.orElseThrow(() -> new SnsAppException(USERNAME_NOT_FOUND, USERNAME_NOT_FOUND.getMessage()));
if (message.getReadCount() == 1) {
Chat findChat = chatRoomRepository.findById(message.getChatNo()).orElseThrow();
User recipient = userRepository.findById(findChat.getJoinUser()).orElseThrow();
alarmRepository.save(Alarm.builder()
.user(recipient)
.alarmType(NEW_CHAT)
.text(NEW_CHAT.getAlarmText())
.targetId(recipient.getId())
.fromUserId(sender.getId())
.build());
}
if (message.getSenderName().equals(userName)) {
Chatting chatting = message.toEntity();
Chatting savedChat = mongoChatRepository.save(chatting);
message.setId(savedChat.getId());
}
return message;
}
}
컨트롤러는 메시지 전송과 메시지 조회 API를 제공합니다. 클라이언트는 이 API를 통해 메시지를 주고받을 수 있습니다.
@Slf4j
@RestController
@RequiredArgsConstructor
@RequestMapping("/api/v1")
public class ChatRestController {
private final ChatService chatService;
// 메시지 전송 및 알림 저장
@PostMapping("/chatroom/message-alarm-record")
public ResponseEntity<Response<Message>> sendNotification(@RequestBody Message message, Authentication authentication) {
String userName = authentication.getName();
Message savedMessage = chatService.sendAlarmAndSaveMessage(message, userName);
return ResponseEntity.ok(Response.success(savedMessage));
}
// 메시지 전송 (WebSocket)
@MessageMapping("/message")
public void sendMessage(Message message, @Header("Authorization") final String accessToken) {
log.info("보낸 메세지 : {}", message.toString());
chatService.sendMessage(message, accessToken);
}
}
마지막으로, Kafka를 통해 메시지를 실시간으로 전송하고 전송된 메시지를 구독하는 로직을 살펴보겠습니다.
@Slf4j
@Service
@RequiredArgsConstructor
public class MessageSender {
private final KafkaTemplate<String, Message> kafkaTemplate;
public void send(String topic, Message data) {
kafkaTemplate.send(topic, data);
}
}
@Slf4j
@Service
@RequiredArgsConstructor
public class MessageReceiver {
private final SimpMessagingTemplate messagingTemplate;
@KafkaListener(topics = KafkaConstants.KAFKA_TOPIC, containerFactory = "kafkaListenerContainerFactory")
public void receiveMessage(Message message) {
messagingTemplate.convertAndSend("/subscribe/" + message.getChatNo(), message);
}
}