안녕하세요 오늘은 Road To MQ 다섯번째 아티클인 WebSocket + STOMP + RabbitMQ로 메세징 - Exchange의 Topic입니다!
지난 아티클에서 WebSocket + stomp 조합에 외부 메세지 브로커인 RabbitMQ를 적용해서 업그레이드 해보았는데요
RabbitMQ에 대한 기본원리 + 사용 이유는 저번 아티클 https://velog.io/@joonoo3/Road-To-MQ-RabbitMQ-기본-개념
Direct 형식은 https://velog.io/@joonoo3/Road-To-MQ-RabbitMQ-기본-개념 에 정리해 두었습니다! (기본 뼈대 코드에 대한 설명은 이쪽에 있습니다)
회사에는 sales 부서와 develop 부서가 있고 각 부서는 director, staff 직급이 있습니다.
큐는 총 8개로 이루어 집니다.
@Configuration
@EnableRabbit
public class RabbitConfig {
//RabbitAdmin을 사용하면 RabbitMQ 서버에 Exchange, Queue, Binding을 등록할 수 있습니다.
//RabbitAdmin은 RabbitTemplate을 사용하여 RabbitMQ 서버에 접근합니다.
@Bean
public AmqpAdmin amqpAdmin() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
rabbitAdmin.declareExchange(topicExchange());
//sales.staff, sales.director, develop.staff, develop.director
List<String> routingKeys = Arrays.asList("sales.staff", "sales.director", "develop.staff", "develop.director");
for (String routingKey : routingKeys) {
Queue queue = new Queue(routingKey);
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareBinding(createBinding(routingKey, queue));
}
//sales.*, develop.*, *.staff, *.director
//queue name : AllSales, AllDevelop, AllStaff, AllDirector
List<String> allRoutingKeys = Arrays.asList("sales.*", "develop.*", "*.staff", "*.director");
for (String routingKey : allRoutingKeys) {
Queue queue = getQueue(routingKey);
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareBinding(createBinding(routingKey, queue));
}
return rabbitAdmin;
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("JunWooCompany");
}
private Queue getQueue(String routingKey) {
//.과 *을 제거
String queueName = routingKey.replace(".", "").replace("*", "");
return new Queue("All" + queueName, true );
}
private Binding createBinding(String routingKey, Queue queue) {
return BindingBuilder.bind(queue).to(topicExchange()).with(routingKey);
}
//RabbitTemplate을 사용하여 RabbitMQ 서버에 메시지를 전송할 수 있습니다.
//RabbitTemplate은 RabbitMQ 서버에 접근하기 위한 클래스입니다.
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}
//ConnectionFactory 등록
//ConnectionFactory는 RabbitMQ 서버에 접근하기 위한 클래스입니다.
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost("localhost");
매우 하드코딩,,,이지만 돌아가기만 하면 되니까요 하하
STOMP 코드는 지난 글과 동일합니다!
@Configuration
@EnableWebSocketMessageBroker
public class StompConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws/chat")
.setAllowedOrigins("*");
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setPathMatcher(new AntPathMatcher("."));
registry.setApplicationDestinationPrefixes("/pub");
registry.enableStompBrokerRelay("/queue", "/topic", "/exchange", "/amq/queue");
}
}
잘 생성되었습니다!🔫🔫
@Controller
@RequiredArgsConstructor
@Slf4j
public class StompRabbitController {
private final RabbitTemplate template;
@MessageMapping("sales.director")
public void toSalesDirector(String message) {
log.info("안녕하세요. sale파트의 director님");
template.convertAndSend("JunWooCompany", "sales.director", message);
}
@MessageMapping("sales.staff")
public void toSalesStaff(String message) {
log.info("안녕하세요. sale파트의 staff님");
template.convertAndSend("JunWooCompany", "sales.staff", message);
}
@MessageMapping("develop.director")
public void toDevelopDirector(String message) {
log.info("안녕하세요. develop파트의 director님");
template.convertAndSend("JunWooCompany", "develop.director", message);
}
@MessageMapping("develop.staff")
public void toDevelopStaff(String message) {
log.info("안녕하세요. develop파트의 staff님");
template.convertAndSend("JunWooCompany", "develop.staff", message);
}
}
여기서 중간점검을 해볼까요?
- /pub/sales.staff로 발행을 한다면 해당 controller로 접근됩니다.
- Controller는 JunWooCompany라는 이름의 exchange를 통해 sales.staff라는 라우팅 키로 message를 보낼껍니다!
- 이때 패턴이 일치하는 큐는 sales.staff, *.staff, sales.* 의 바인딩 키를 가진 큐들이 되겠습니다!
allstaff는 구독하는 consumer가 없었기 때문에 queue에 쌓인 상태입니다!