Stomp + Kafka를 이용한 1 대 1 채팅 구현 (2) - Stomp & Kafka & & Redis & MongoDB 설치 및 설정

오형상·2024년 9월 23일
0

Mutsa-SNS-Chat

목록 보기
2/4
post-thumbnail

저번 편에서는 1대1 채팅 구현에 필요한 기술 스택과 각 기술이 제공하는 장점에 대해 알아보았습니다. 이번 글에서는 선택한 기술 스택인 MongoDB, Redis, Kafka, Stomp를 실제로 설치하고 설정하는 과정을 단계별로 살펴보겠습니다.

각 기술은 채팅 시스템에서 서로 다른 역할을 담당하며, MongoDB는 채팅 내역 저장, Redis는 실시간 참여자 관리, Kafka는 메시지 큐 역할, 그리고 Stomp는 실시간 메시지 전달을 위한 WebSocket 기반 통신을 담당합니다.


🍃 MongoDB 설치 및 설정

MongoDB 설치Docker를 사용하여 이미지를 받아 실행하도록 하겠습니다.

MongoDB 이미지 받기

$ docker pull mongo

MongoDB 컨테이너 생성 및 실행

$ docker run --name mongodb -v ~/data:/data/db -d -p 27017:27017 mongo

의존성 설정

    // MongoDB 의존성 추가 (MongoDB와 통합하기 위한 Spring Data MongoDB 라이브러리)
    implementation 'org.springframework.boot:spring-boot-starter-data-mongodb'

application-mongo.yml 작성

MongoDB의 연결 설정을 위한 application-mongo.yml 파일을 작성합니다.

mongodb:
  client: mongodb://localhost:27017
  name : test

MongoDBConfig 설정

MongoDB와 애플리케이션을 연결하기 위해 MongoDBConfig 클래스를 설정합니다. 이 클래스는 MongoDB에 연결하여 데이터를 읽고 쓸 수 있는 MongoTemplate을 생성합니다.

package com.likelionsns.final_project.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.core.MongoTemplate;
import com.mongodb.client.MongoClients;

@Configuration
public class MongoDBConfig {

    @Value("${mongodb.client}")
    private String mongoClientUri;

    @Value("${mongodb.name}")
    private String dbName;

    @Bean
    public MongoTemplate mongoTemplate() {
        return new MongoTemplate(MongoClients.create(mongoClientUri), dbName);
    }
}

엔티티 생성

MongoDB에 저장할 Chatting 엔티티는 채팅 메시지를 관리하는 데 사용됩니다. 이 엔티티는 MongoDB의 chatting 컬렉션에 저장됩니다.

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Document(collection = "chatting")
// MongoDB Chatting 모델
public class Chatting {

    @Id
    private String id;
    private Integer chatRoomNo;
    private String senderName;
    private String content;
    private LocalDateTime sendDate;
    private long readCount;

}

🚀 Redis 설치하기

Redis 설치Docker를 사용하여 이미지를 받아 실행하도록 하겠습니다.

Redis 이미지 받기

$ docker pull redis;

Redis 컨테이너 생성 및 실행

$ docker rum --name redis -d -p 6679:6679 redis

의존성 설정

    // Redis 의존성 추가 (Redis와 통합하기 위한 Spring Data Redis 라이브러리)
    implementation 'org.springframework.boot:spring-boot-starter-data-redis'

Redis 설정

Redis는 실시간 데이터를 처리하는 데 매우 적합한 인메모리 데이터 저장소입니다. 여기서는 Redis를 사용하여 현재 채팅방에 참여 중인 인원을 관리합니다.

application-redis.yml 작성

Redis 연결 정보를 설정하는 application-redis.yml 파일입니다. 환경 변수를 이용해 호스트와 포트를 설정할 수 있으며, 기본값은 localhost와 6379로 설정되어 있습니다.

spring:
  redis:
    host: ${REDIS_HOST:localhost}  # 환경 변수 REDIS_HOST 사용, 없으면 기본값 localhost
    port: ${REDIS_PORT:6379}       # 환경 변수 REDIS_PORT 사용, 없으면 기본값 6379

RedisConfig

Redis와 애플리케이션을 연결하기 위한 설정 클래스입니다. 여기서는 LettuceConnectionFactory를 사용해 Redis 연결을 생성하고, RedisTemplate을 통해 데이터를 읽고 쓸 수 있게 합니다.

@Configuration
public class RedisConfig {

    @Value("${spring.redis.host}")
    private String redisHost;

    @Value("${spring.redis.port}")
    private int redisPort;


    @Bean
    public LettuceConnectionFactory redisConnectionFactory() {
        return new LettuceConnectionFactory(redisHost, redisPort);
    }

    @Bean
    public RedisTemplate<?, ?> redisTemplate() {

        // redisTemplate 를 받아와서 set, get, delete 를 사용
        RedisTemplate<byte[], byte[]> redisTemplate = new RedisTemplate<>();
        /*
         * setKeySerializer, setValueSerializer 설정
         * redis-cli 을 통해 직접 데이터를 조회 시 알아볼 수 없는 형태로 출력되는 것을 방지
         */
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new StringRedisSerializer());
        redisTemplate.setConnectionFactory(redisConnectionFactory());

        return redisTemplate;
    }
}

🎉 Kafka & Zookeeper 설치 및 설정

Kafka와 Zookeeper를 Docker Compose를 사용하여 간편하게 설치하고 실행해 보겠습니다.

Docker Compose 파일 작성

아래는 docker-compose.yml 파일의 설정입니다. 이 설정을 통해 Kafka와 Zookeeper를 컨테이너로 실행할 수 있습니다.

# Compose 파일 버전
version: '3'
services:
  # Zookeeper 서비스
  zookeeper:
    image: wurstmeister/zookeeper    # 사용할 이미지
    container_name: zookeeper        # 컨테이너명 설정
    ports:
      - "2181:2181"                  # 포트 설정 (외부:내부)

  # Kafka 서비스
  kafka:
    image: wurstmeister/kafka        # 사용할 이미지
    container_name: kafka            # 컨테이너명 설정
    ports:
      - "9092:9092"                  # 포트 설정 (외부:내부)
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost     
      KAFKA_CREATE_TOPICS: "Topic:1:1"   # Topic 설정
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181  # Zookeeper와 연결
    volumes:
      - /var/run/docker.sock           # Docker 볼륨 설정
    depends_on:
      - zookeeper                      # Kafka는 Zookeeper에 의존

위 코드를 docker-compose.yml 파일에 붙여넣고, 해당 파일이 있는 경로에서 아래 명령어로 컨테이너를 실행합니다.

$ docker-compose up -d

Zookeeper 설정하기

Zookeeper 설정은 Bash 셸을 통해 진행합니다.

$ docker exec -i -t zookeeper bash  # Zookeeper Bash로 이동
$ bash bin/zkServer.sh start /opt/zookeeper-3.4.13/conf/zoo.cfg -demon  # Zookeeper 서버 실행
$ netstat -l | grep 2181 # 현재 시스템에서 Listen 상태인 포트 중 2181 포트를 사용하는 것을 확인

위 명령어를 실행한 후, Zookeeper의 2181 포트Listen 상태로 변경된 것을 확인할 수 있습니다.


Kafka 설정하기

Kafka 역시 Bash 셸을 통해 설정을 진행합니다.

$ docker exec -i -t kafka bash   # Kafka Bash로 이동
$ kafka-server-start.sh -daemon  # Kafka 서버 실행

Kafka 서버가 성공적으로 실행되었습니다. 이제 Kafka에서 메시지를 주고받을 Topic을 생성해 보겠습니다.

$ kafka-topics.sh --create --zookeeper zookeeper:2181 --topic {Topic 이름} --partitions 1 --replication-factor 1

명령어를 통해 새로운 Topic을 생성한 후, Topic 목록을 확인할 수 있습니다.

$ kafka-topics.sh --list --zookeeper zookeeper

이로써 Kafka에서 사용할 Topic을 생성하고, Zookeeper와 Kafka의 셋팅을 마쳤습니다.


의존성 설정

    // Kafka 의존성 추가 (Kafka와 통합하기 위한 Spring Kafka 라이브러리)
    implementation "org.springframework.kafka:spring-kafka"
    // WebSocket 의존성 추가 (WebSocket 통신을 지원하기 위한 Spring Boot WebSocket 라이브러리)
    implementation 'org.springframework.boot:spring-boot-starter-websocket'

도메인 생성

Kafka에서 메시지 전달에 사용할 도메인 모델을 작성합니다.
이 클래스는 Kafka 메시징과 MongoDB에 저장될 Chatting 엔티티로 변환하는 역할을 수행합니다.

@Getter 
@ToString 
@Builder
@NoArgsConstructor 
@AllArgsConstructor생성
public class Message implements Serializable { // 메시지 데이터를 주고받기 위한 DTO 클래스, 직렬화를 지원하기 위해 Serializable 인터페이스 구현

    private String id; // 메시지 고유 ID
    private Integer chatNo; // 채팅방 번호
    private String content; // 메시지 내용
    private String senderName; // 메시지를 보낸 사용자 이름
    private long sendTime; // 메시지가 전송된 시간 (Epoch 밀리초로 저장)
    private Integer readCount; // 메시지를 읽은 사용자 수

    // 메시지 전송 시간과 보낸 사람, 읽은 사람 수를 동시에 설정하는 메서드
    public void setSendTimeAndSender(LocalDateTime sendTime, String senderName, Integer readCount) {
        this.senderName = senderName;
        // LocalDateTime을 Asia/Seoul 타임존 기준으로 Epoch 밀리초로 변환하여 저장
        this.sendTime = sendTime.atZone(ZoneId.of("Asia/Seoul")).toInstant().toEpochMilli();
        this.readCount = readCount;
    }

    // 메시지 ID를 설정하는 메서드
    public void setId(String id) {
        this.id = id;
    }

    // Message DTO를 Chatting 엔티티로 변환하는 메서드
    public Chatting toEntity() {
        return Chatting.builder()
                .senderName(senderName) // 메시지 전송자 이름 설정
                .chatRoomNo(chatNo) // 채팅방 번호 설정
                .content(content) // 메시지 내용 설정
                // Epoch 밀리초로 저장된 sendTime을 LocalDateTime으로 변환하여 저장
                .sendDate(Instant.ofEpochMilli(sendTime).atZone(ZoneId.of("Asia/Seoul")).toLocalDateTime())
                .readCount(readCount) // 메시지를 읽은 사용자 수 설정
                .build();
    }
}

Producer 설정

Kafka 프로듀서는 메시지를 Kafka 브로커로 전송하는 역할을 합니다. 아래와 같이 Kafka Producer 설정 클래스를 작성합니다.

@EnableKafka
@Configuration
public class KafkaProducerConfig {

    // Kafka ProducerFactory를 생성하는 Bean 메서드
    @Bean
    public ProducerFactory<String, Message> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigurations());
    }

    // Kafka Producer 구성을 위한 설정값들을 포함한 맵을 반환하는 메서드
    @Bean
    public Map<String, Object> producerConfigurations() {
        return ImmutableMap.<String, Object>builder()
                .put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
                .put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
                .put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class)
                .build();
    }

    // KafkaTemplate을 생성하는 Bean 메서드
    @Bean
    public KafkaTemplate<String, Message> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Consumer 설정

Kafka 컨슈머는 브로커에서 메시지를 읽어오는 역할을 합니다. 아래와 같이 Kafka Consumer 설정 클래스를 작성합니다.
이외에도 다양한 설정들이 있으니 상황에 맞게 설정하시면 됩니다.

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    // KafkaListener 컨테이너 팩토리를 생성하는 Bean 메서드
    @Bean
    ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Message> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    // Kafka ConsumerFactory를 생성하는 Bean 메서드
    @Bean
    public ConsumerFactory<String, Message> consumerFactory() {

        // Kafka Consumer 구성을 위한 설정값들을 설정 -> 변하지 않는 값이므로 ImmutableMap을 이용하여 설정
        Map<String, Object> consumerConfigurations =
                ImmutableMap.<String, Object>builder()
                        .put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
                        .put(ConsumerConfig.GROUP_ID_CONFIG, "Mutsa-Sns")
                        .put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
                        .put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class)
                        .build();

        // 들어오는 Message 를 객체로 받기 위한 deserializer
        JsonDeserializer<Message> deserializer = new JsonDeserializer<>();

        deserializer.addTrustedPackages("com.likelionsns.final_project.domain.dto");

        return new DefaultKafkaConsumerFactory<>(consumerConfigurations, new StringDeserializer(), deserializer);
    }

}

STOMP Client 설정

WebSocket과 STOMP를 활용하여 메시지를 전달하고 구독하는 설정을 추가합니다.

@Configuration
@RequiredArgsConstructor
@EnableWebSocketMessageBroker // WebSocket을 활성화하고 메시지 브로커 사용가능
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {

    private final StompHandler stompHandler;

    // STOMP 엔드포인트를 등록하는 메서드
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/chat") // STOMP 엔드포인트 설정
                .setAllowedOriginPatterns("*") // 모든 Origin 허용 -> 배포시에는 보안을 위해 Origin을 정확히 지정
                .withSockJS(); // SockJS 사용가능*/ 설정
    }

    // 메시지 브로커를 구성하는 메서드
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/subscribe"); // /subscribe/{chatNo}로 주제 구독 가능
        registry.setApplicationDestinationPrefixes("/publish"); // /publish/message로 메시지 전송 컨트롤러 라우팅 가능
    }

    // 클라이언트 인바운드 채널을 구성하는 메서드
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        // stompHandler를 인터셉터로 등록하여 STOMP 메시지 핸들링을 수행
        registration.interceptors(stompHandler);
    }

}

상세 코드

registerStompEndpoints

@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
    registry.addEndpoint("/chat")
            .setAllowedOriginPatterns("*")
            .withSockJS();
}
  • registerStompEndpoints 메서드는 STOMP 프로토콜의 엔드포인트를 정의하는 부분입니다. 엔드포인트는 클라이언트가 서버에 WebSocket을 통해 연결할 수 있는 URL입니다.

  • registry.addEndpoint("/chat"): 클라이언트가 WebSocket 연결을 맺기 위한 엔드포인트로 /chat을 사용합니다. 이 URL을 통해 클라이언트는 WebSocket 연결을 맺고, 그 후 STOMP 메시지를 주고받을 수 있습니다.

  • setAllowedOriginPatterns("*"): 모든 Origin에서의 요청을 허용하는 설정입니다. 이는 CORS(Cross-Origin Resource Sharing) 문제를 해결하기 위한 설정인데, 보안적인 이유로 실제 배포 환경에서는 특정 Origin만 허용하는 방식으로 변경하는 것이 좋습니다.

  • withSockJS(): SockJS는 WebSocket을 사용할 수 없는 환경(구형 브라우저 등)에서도 폴백 메커니즘을 제공하여 WebSocket-like 기능을 사용할 수 있도록 돕는 라이브러리입니다. 이 설정을 통해 WebSocket이 지원되지 않는 브라우저에서 SockJS로 대체됩니다.

configureMessageBroker

@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
    registry.enableSimpleBroker("/subscribe");
    registry.setApplicationDestinationPrefixes("/publish");
}
  • configureMessageBroker 메서드는 메시지 브로커의 동작 방식을 설정하는 부분입니다. 이 메서드를 통해 클라이언트가 메시지를 주고받는 경로를 정의합니다.

  • registry.enableSimpleBroker("/subscribe"): /subscribe 경로를 통해 구독을 받을 수 있도록 설정합니다. 즉, 클라이언트는 /subscribe/{chatNo} 형식으로 특정 주제(채팅방 등)를 구독할 수 있으며, 서버는 구독된 클라이언트에게 메시지를 발송할 수 있습니다. SimpleBroker는 브로커 역할을 하여 구독된 클라이언트에게 메시지를 전달합니다.

  • registry.setApplicationDestinationPrefixes("/publish"): 클라이언트가 메시지를 서버에 전송할 때 사용되는 경로를 정의합니다. 예를 들어, 클라이언트가 메시지를 보내는 경우 /publish/message로 요청을 보낼 수 있으며, 서버는 이를 처리합니다. 이 경로는 서버의 컨트롤러에서 처리됩니다.

configureClientInboundChannel

@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
    registration.interceptors(stompHandler);
}
  • configureClientInboundChannel 메서드는 클라이언트로부터 들어오는 메시지를 처리하는 채널을 설정하는 부분입니다.

  • registration.interceptors(stompHandler): 이 설정을 통해, STOMP 메시지가 클라이언트로부터 들어올 때 StompHandler를 인터셉터로 등록하여 메시지를 가로채고 추가적인 로직을 실행할 수 있게 됩니다. 주로 권한 검사나 연결 상태 확인 같은 작업을 여기서 수행합니다.

  • StompHandler는 STOMP 메시지를 처리하는 커스텀 핸들러로, 다음 편에서 자세히 설명하겠습니다.


이렇게 Kafka, Zookeeper, Redis, MongoDB 설정을 완료하고 STOMP와 Kafka를 활용한 실시간 메시징 시스템의 기본 설정을 마쳤습니다. 다음 글에서는 이 설정을 기반으로 실제 애플리케이션 개발을 진행하겠습니다.

Reference

0개의 댓글