Redis의 pub/sub을 활용해서 실시간 채팅을 만들어 보았다.
우선 build.gradle을 추가해준다.
// redis
implementation("org.springframework.boot:spring-boot-starter-data-redis")
// websocket
implementation 'org.springframework.boot:spring-boot-starter-websocket'
implementation 'org.webjars:stomp-websocket:2.3.3-1'
implementation 'org.webjars:sockjs-client:1.1.2'
나는 heroku에서 redis를 사용했다.
application.yml 파일에 추가해준다
spring:
redis:
host: {REDIS_URL}
이제 코드를 살펴보자.
RedisConfig.java
@Configuration
public class RedisConfig {
@Value("${spring.redis.host}") // heroku redis의 경우 REDIS_URL에 포트번호와 host가 모두 들어있기 때문에 이렇게 연결해줘도 된다.
private String host;
@Bean //ConnectionFactory 빈으로 등록
public RedisConnectionFactory redisConnectionFactory() {
//RedisProperties 에서 application.properties에서 설정한 URL을 가져온다
RedisURI redisURI = RedisURI.create(host);
// URI를 가지고 Configuration을 만들어 준 다음에
// Connection을 만들어주는 factory를 만들어 factory를 반환한다
org.springframework.data.redis.connection.RedisConfiguration configuration = LettuceConnectionFactory.createRedisConfiguration(redisURI);
//Lettuce는 전통적인 제디스보다 성능이 더 좋다
LettuceConnectionFactory factory = new LettuceConnectionFactory(configuration);
// Initializing 된다
factory.afterPropertiesSet();
return factory;
}
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(connectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(String.class));
return redisTemplate;
}
@Bean
public RedisMessageListenerContainer redisMessageListener(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter,
ChannelTopic channelTopic) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, channelTopic);
return container;
}
@Bean
public MessageListenerAdapter listenerAdapter(RedisSubscriber subscriber) { // (2) RedisMessageListenerContainer로부터 메시지를 dispatch 받고, 실제 메시지를 처리하는 비즈니스 로직이 담긴 Subscriber Bean을 추가해준다.
return new MessageListenerAdapter(subscriber, "sendMessage");
}
@Bean //
public ChannelTopic channelTopic() { // (4) Topic 공유를 위해 Channel Topic을 빈으로 등록해 단일화 시켜준다.
return new ChannelTopic("chatroom");
}
}
WebStompConfig.java
@Slf4j
@Configuration
@EnableWebSocketMessageBroker
@RequiredArgsConstructor
public class WebStompConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws-stomp").setAllowedOriginPatterns("*").withSockJS(); // endpoint
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/sub"); // sub
registry.setApplicationDestinationPrefixes("/pub"); // pub
}
}
ChatController.java
@Controller
@RequiredArgsConstructor
public class ChatController {
private final RedisTemplate redisTemplate;
private final RedisMessageListenerContainer redisMessageListenerContainer;
private final RedisSubscriber redisSubscriber;
private final RedisPublisher redisPublisher;
@ResponseBody
@MessageMapping("/chat/room")
public void message(ChatMessage chatMessage) {
// Websocket에 발행된 메시지를 redis로 발행(publish)
ChannelTopic topic = new ChannelTopic(chatMessage.getRoomId());
redisMessageListenerContainer.addMessageListener(redisSubscriber, topic);
redisPublisher.publish(topic, chatMessage);
}
}
RedisPublisher.java
@Slf4j
@RequiredArgsConstructor
@Service
public class RedisPublisher {
private final RedisTemplate<String, Object> redisTemplate;
private final ChannelTopic channelTopic;
public void publish(ChannelTopic topic, ChatMessage message) {
redisTemplate.convertAndSend(channelTopic.getTopic(), message);
}
}
redisTemplate의 convertAndSend 함수를 통해 channelTopic의 저장해둔 채널로 메세지를 발행해 준다.
@Slf4j
@RequiredArgsConstructor
@Service
public class RedisSubscriber implements MessageListener {
private final ObjectMapper objectMapper;
private final RedisTemplate redisTemplate;
private final SimpMessagingTemplate messagingTemplate;
/**
* Redis에서 메시지가 발행(publish)되면 대기하고 있던 Redis Subscriber가 해당 메시지를 받아 처리한다.
*/
@Override
public void onMessage(Message message, byte[] pattern) {
try {
// redis에서 발행된 데이터를 받아 deserialize
String publishMessage = (String) redisTemplate.getStringSerializer().deserialize(message.getBody());
// ChatMessage 객채로 맵핑
ChatMessage roomMessage = objectMapper.readValue(publishMessage, ChatMessage.class);
// Websocket 구독자에게 채팅 메시지 Send
messagingTemplate.convertAndSend("/sub/chat/room/" + roomMessage.getRoomId(), roomMessage);
} catch (Exception e) {
log.error(e.getMessage());
}
}
}
하지만 위에 코드처럼 작성하게 되면 Channel을 계속 addMessageListener를 사용해서 추가하게 되는 단점이 있다. 이런 문제가 맘에 안들경우 아래 코드로 바꾸면 된다
RedisPublisher.java 삭제
ChatController.java 수정
ChatController.java
@Controller
@RequiredArgsConstructor
public class ChatController {
private final ChatMessageService chatMessageService;
private final RedisTemplate<String, Object> redisTemplate;
private final ChannelTopic channelTopic;
@ResponseBody
@MessageMapping("/chat/room")
public void message(ChatMessage chatMessage) {
redisTemplate.convertAndSend(channelTopic.getTopic(), chatMessage);
}
}
channel을 계속 추가하는게 아닌 redisConfig에서 내가 만들었던 channel을 사용해서 하는 방법이다.