저번 편에서는 1대1 채팅 구현에 필요한 기술 스택과 각 기술이 제공하는 장점에 대해 알아보았습니다. 이번 글에서는 선택한 기술 스택인 MongoDB, Redis, Kafka, Stomp를 실제로 설치하고 설정하는 과정을 단계별로 살펴보겠습니다.
각 기술은 채팅 시스템에서 서로 다른 역할을 담당하며, MongoDB는 채팅 내역 저장, Redis는 실시간 참여자 관리, Kafka는 메시지 큐 역할, 그리고 Stomp는 실시간 메시지 전달을 위한 WebSocket 기반 통신을 담당합니다.
MongoDB 설치
는 Docker
를 사용하여 이미지를 받아 실행하도록 하겠습니다.
$ docker pull mongo
$ docker run --name mongodb -v ~/data:/data/db -d -p 27017:27017 mongo
// MongoDB 의존성 추가 (MongoDB와 통합하기 위한 Spring Data MongoDB 라이브러리)
implementation 'org.springframework.boot:spring-boot-starter-data-mongodb'
MongoDB의 연결 설정을 위한 application-mongo.yml
파일을 작성합니다.
mongodb:
client: mongodb://localhost:27017
name : test
MongoDB와 애플리케이션을 연결하기 위해 MongoDBConfig
클래스를 설정합니다. 이 클래스는 MongoDB에 연결하여 데이터를 읽고 쓸 수 있는 MongoTemplate
을 생성합니다.
package com.likelionsns.final_project.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.core.MongoTemplate;
import com.mongodb.client.MongoClients;
@Configuration
public class MongoDBConfig {
@Value("${mongodb.client}")
private String mongoClientUri;
@Value("${mongodb.name}")
private String dbName;
@Bean
public MongoTemplate mongoTemplate() {
return new MongoTemplate(MongoClients.create(mongoClientUri), dbName);
}
}
MongoDB에 저장할 Chatting 엔티티는 채팅 메시지를 관리하는 데 사용됩니다. 이 엔티티는 MongoDB의 chatting 컬렉션에 저장됩니다.
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Document(collection = "chatting")
// MongoDB Chatting 모델
public class Chatting {
@Id
private String id;
private Integer chatRoomNo;
private String senderName;
private String content;
private LocalDateTime sendDate;
private long readCount;
}
Redis 설치
는 Docker
를 사용하여 이미지를 받아 실행하도록 하겠습니다.
$ docker pull redis;
$ docker rum --name redis -d -p 6679:6679 redis
// Redis 의존성 추가 (Redis와 통합하기 위한 Spring Data Redis 라이브러리)
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
Redis는 실시간 데이터를 처리하는 데 매우 적합한 인메모리 데이터 저장소입니다. 여기서는 Redis를 사용하여 현재 채팅방에 참여 중인 인원을 관리합니다.
Redis 연결 정보를 설정하는 application-redis.yml
파일입니다. 환경 변수를 이용해 호스트와 포트를 설정할 수 있으며, 기본값은 localhost와 6379로 설정되어 있습니다.
spring:
redis:
host: ${REDIS_HOST:localhost} # 환경 변수 REDIS_HOST 사용, 없으면 기본값 localhost
port: ${REDIS_PORT:6379} # 환경 변수 REDIS_PORT 사용, 없으면 기본값 6379
Redis와 애플리케이션을 연결하기 위한 설정 클래스입니다. 여기서는 LettuceConnectionFactory
를 사용해 Redis 연결을 생성하고, RedisTemplate
을 통해 데이터를 읽고 쓸 수 있게 합니다.
@Configuration
public class RedisConfig {
@Value("${spring.redis.host}")
private String redisHost;
@Value("${spring.redis.port}")
private int redisPort;
@Bean
public LettuceConnectionFactory redisConnectionFactory() {
return new LettuceConnectionFactory(redisHost, redisPort);
}
@Bean
public RedisTemplate<?, ?> redisTemplate() {
// redisTemplate 를 받아와서 set, get, delete 를 사용
RedisTemplate<byte[], byte[]> redisTemplate = new RedisTemplate<>();
/*
* setKeySerializer, setValueSerializer 설정
* redis-cli 을 통해 직접 데이터를 조회 시 알아볼 수 없는 형태로 출력되는 것을 방지
*/
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new StringRedisSerializer());
redisTemplate.setConnectionFactory(redisConnectionFactory());
return redisTemplate;
}
}
Kafka와 Zookeeper를 Docker Compose
를 사용하여 간편하게 설치하고 실행해 보겠습니다.
아래는 docker-compose.yml
파일의 설정입니다. 이 설정을 통해 Kafka와 Zookeeper를 컨테이너로 실행할 수 있습니다.
# Compose 파일 버전
version: '3'
services:
# Zookeeper 서비스
zookeeper:
image: wurstmeister/zookeeper # 사용할 이미지
container_name: zookeeper # 컨테이너명 설정
ports:
- "2181:2181" # 포트 설정 (외부:내부)
# Kafka 서비스
kafka:
image: wurstmeister/kafka # 사용할 이미지
container_name: kafka # 컨테이너명 설정
ports:
- "9092:9092" # 포트 설정 (외부:내부)
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_CREATE_TOPICS: "Topic:1:1" # Topic 설정
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 # Zookeeper와 연결
volumes:
- /var/run/docker.sock # Docker 볼륨 설정
depends_on:
- zookeeper # Kafka는 Zookeeper에 의존
위 코드를 docker-compose.yml
파일에 붙여넣고, 해당 파일이 있는 경로에서 아래 명령어로 컨테이너를 실행합니다.
$ docker-compose up -d
Zookeeper 설정은 Bash 셸을 통해 진행합니다.
$ docker exec -i -t zookeeper bash # Zookeeper Bash로 이동
$ bash bin/zkServer.sh start /opt/zookeeper-3.4.13/conf/zoo.cfg -demon # Zookeeper 서버 실행
$ netstat -l | grep 2181 # 현재 시스템에서 Listen 상태인 포트 중 2181 포트를 사용하는 것을 확인
위 명령어를 실행한 후, Zookeeper의 2181 포트
가 Listen
상태로 변경된 것을 확인할 수 있습니다.
Kafka 역시 Bash 셸을 통해 설정을 진행합니다.
$ docker exec -i -t kafka bash # Kafka Bash로 이동
$ kafka-server-start.sh -daemon # Kafka 서버 실행
Kafka 서버가 성공적으로 실행되었습니다. 이제 Kafka에서 메시지를 주고받을 Topic
을 생성해 보겠습니다.
$ kafka-topics.sh --create --zookeeper zookeeper:2181 --topic {Topic 이름} --partitions 1 --replication-factor 1
명령어를 통해 새로운 Topic을 생성한 후, Topic 목록을 확인할 수 있습니다.
$ kafka-topics.sh --list --zookeeper zookeeper
이로써 Kafka에서 사용할 Topic을 생성하고, Zookeeper와 Kafka의 셋팅을 마쳤습니다.
// Kafka 의존성 추가 (Kafka와 통합하기 위한 Spring Kafka 라이브러리)
implementation "org.springframework.kafka:spring-kafka"
// WebSocket 의존성 추가 (WebSocket 통신을 지원하기 위한 Spring Boot WebSocket 라이브러리)
implementation 'org.springframework.boot:spring-boot-starter-websocket'
Kafka에서 메시지 전달에 사용할 도메인 모델을 작성합니다.
이 클래스는 Kafka 메시징과 MongoDB에 저장될 Chatting 엔티티로 변환하는 역할을 수행합니다.
@Getter
@ToString
@Builder
@NoArgsConstructor
@AllArgsConstructor생성
public class Message implements Serializable { // 메시지 데이터를 주고받기 위한 DTO 클래스, 직렬화를 지원하기 위해 Serializable 인터페이스 구현
private String id; // 메시지 고유 ID
private Integer chatNo; // 채팅방 번호
private String content; // 메시지 내용
private String senderName; // 메시지를 보낸 사용자 이름
private long sendTime; // 메시지가 전송된 시간 (Epoch 밀리초로 저장)
private Integer readCount; // 메시지를 읽은 사용자 수
// 메시지 전송 시간과 보낸 사람, 읽은 사람 수를 동시에 설정하는 메서드
public void setSendTimeAndSender(LocalDateTime sendTime, String senderName, Integer readCount) {
this.senderName = senderName;
// LocalDateTime을 Asia/Seoul 타임존 기준으로 Epoch 밀리초로 변환하여 저장
this.sendTime = sendTime.atZone(ZoneId.of("Asia/Seoul")).toInstant().toEpochMilli();
this.readCount = readCount;
}
// 메시지 ID를 설정하는 메서드
public void setId(String id) {
this.id = id;
}
// Message DTO를 Chatting 엔티티로 변환하는 메서드
public Chatting toEntity() {
return Chatting.builder()
.senderName(senderName) // 메시지 전송자 이름 설정
.chatRoomNo(chatNo) // 채팅방 번호 설정
.content(content) // 메시지 내용 설정
// Epoch 밀리초로 저장된 sendTime을 LocalDateTime으로 변환하여 저장
.sendDate(Instant.ofEpochMilli(sendTime).atZone(ZoneId.of("Asia/Seoul")).toLocalDateTime())
.readCount(readCount) // 메시지를 읽은 사용자 수 설정
.build();
}
}
Kafka 프로듀서는 메시지를 Kafka 브로커로 전송하는 역할을 합니다. 아래와 같이 Kafka Producer 설정 클래스를 작성합니다.
@EnableKafka
@Configuration
public class KafkaProducerConfig {
// Kafka ProducerFactory를 생성하는 Bean 메서드
@Bean
public ProducerFactory<String, Message> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigurations());
}
// Kafka Producer 구성을 위한 설정값들을 포함한 맵을 반환하는 메서드
@Bean
public Map<String, Object> producerConfigurations() {
return ImmutableMap.<String, Object>builder()
.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class)
.build();
}
// KafkaTemplate을 생성하는 Bean 메서드
@Bean
public KafkaTemplate<String, Message> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Kafka 컨슈머는 브로커에서 메시지를 읽어오는 역할을 합니다. 아래와 같이 Kafka Consumer 설정 클래스를 작성합니다.
이외에도 다양한 설정들이 있으니 상황에 맞게 설정하시면 됩니다.
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
// KafkaListener 컨테이너 팩토리를 생성하는 Bean 메서드
@Bean
ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Message> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
// Kafka ConsumerFactory를 생성하는 Bean 메서드
@Bean
public ConsumerFactory<String, Message> consumerFactory() {
// Kafka Consumer 구성을 위한 설정값들을 설정 -> 변하지 않는 값이므로 ImmutableMap을 이용하여 설정
Map<String, Object> consumerConfigurations =
ImmutableMap.<String, Object>builder()
.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
.put(ConsumerConfig.GROUP_ID_CONFIG, "Mutsa-Sns")
.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class)
.build();
// 들어오는 Message 를 객체로 받기 위한 deserializer
JsonDeserializer<Message> deserializer = new JsonDeserializer<>();
deserializer.addTrustedPackages("com.likelionsns.final_project.domain.dto");
return new DefaultKafkaConsumerFactory<>(consumerConfigurations, new StringDeserializer(), deserializer);
}
}
WebSocket과 STOMP를 활용하여 메시지를 전달하고 구독하는 설정을 추가합니다.
@Configuration
@RequiredArgsConstructor
@EnableWebSocketMessageBroker // WebSocket을 활성화하고 메시지 브로커 사용가능
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {
private final StompHandler stompHandler;
// STOMP 엔드포인트를 등록하는 메서드
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/chat") // STOMP 엔드포인트 설정
.setAllowedOriginPatterns("*") // 모든 Origin 허용 -> 배포시에는 보안을 위해 Origin을 정확히 지정
.withSockJS(); // SockJS 사용가능*/ 설정
}
// 메시지 브로커를 구성하는 메서드
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/subscribe"); // /subscribe/{chatNo}로 주제 구독 가능
registry.setApplicationDestinationPrefixes("/publish"); // /publish/message로 메시지 전송 컨트롤러 라우팅 가능
}
// 클라이언트 인바운드 채널을 구성하는 메서드
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
// stompHandler를 인터셉터로 등록하여 STOMP 메시지 핸들링을 수행
registration.interceptors(stompHandler);
}
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/chat")
.setAllowedOriginPatterns("*")
.withSockJS();
}
registerStompEndpoints 메서드는 STOMP 프로토콜의 엔드포인트를 정의하는 부분입니다. 엔드포인트는 클라이언트가 서버에 WebSocket을 통해 연결할 수 있는 URL입니다.
registry.addEndpoint("/chat"): 클라이언트가 WebSocket 연결을 맺기 위한 엔드포인트로 /chat을 사용합니다. 이 URL을 통해 클라이언트는 WebSocket 연결을 맺고, 그 후 STOMP 메시지를 주고받을 수 있습니다.
setAllowedOriginPatterns("*"): 모든 Origin에서의 요청을 허용하는 설정입니다. 이는 CORS(Cross-Origin Resource Sharing) 문제를 해결하기 위한 설정인데, 보안적인 이유로 실제 배포 환경에서는 특정 Origin만 허용하는 방식으로 변경하는 것이 좋습니다.
withSockJS(): SockJS는 WebSocket을 사용할 수 없는 환경(구형 브라우저 등)에서도 폴백 메커니즘을 제공하여 WebSocket-like 기능을 사용할 수 있도록 돕는 라이브러리입니다. 이 설정을 통해 WebSocket이 지원되지 않는 브라우저에서 SockJS로 대체됩니다.
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/subscribe");
registry.setApplicationDestinationPrefixes("/publish");
}
configureMessageBroker 메서드는 메시지 브로커의 동작 방식을 설정하는 부분입니다. 이 메서드를 통해 클라이언트가 메시지를 주고받는 경로를 정의합니다.
registry.enableSimpleBroker("/subscribe"): /subscribe 경로를 통해 구독을 받을 수 있도록 설정합니다. 즉, 클라이언트는 /subscribe/{chatNo} 형식으로 특정 주제(채팅방 등)를 구독할 수 있으며, 서버는 구독된 클라이언트에게 메시지를 발송할 수 있습니다. SimpleBroker는 브로커 역할을 하여 구독된 클라이언트에게 메시지를 전달합니다.
registry.setApplicationDestinationPrefixes("/publish"): 클라이언트가 메시지를 서버에 전송할 때 사용되는 경로를 정의합니다. 예를 들어, 클라이언트가 메시지를 보내는 경우 /publish/message로 요청을 보낼 수 있으며, 서버는 이를 처리합니다. 이 경로는 서버의 컨트롤러에서 처리됩니다.
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(stompHandler);
}
configureClientInboundChannel 메서드는 클라이언트로부터 들어오는 메시지를 처리하는 채널을 설정하는 부분입니다.
registration.interceptors(stompHandler): 이 설정을 통해, STOMP 메시지가 클라이언트로부터 들어올 때 StompHandler를 인터셉터로 등록하여 메시지를 가로채고 추가적인 로직을 실행할 수 있게 됩니다. 주로 권한 검사나 연결 상태 확인 같은 작업을 여기서 수행합니다.
StompHandler는 STOMP 메시지를 처리하는 커스텀 핸들러로, 다음 편에서 자세히 설명하겠습니다.
이렇게 Kafka, Zookeeper, Redis, MongoDB 설정을 완료하고 STOMP와 Kafka를 활용한 실시간 메시징 시스템의 기본 설정을 마쳤습니다. 다음 글에서는 이 설정을 기반으로 실제 애플리케이션 개발을 진행하겠습니다.
Reference