[Road To MQ] WebSocket + STOMP + RabbitMQ로 메세징 - Exchange의 Topic

CodeKong의 기술 블로그·2024년 1월 28일
2

Road To MQ

목록 보기
5/6
post-thumbnail

안녕하세요 오늘은 Road To MQ 다섯번째 아티클인 WebSocket + STOMP + RabbitMQ로 메세징 - Exchange의 Topic입니다!

지난 아티클에서 WebSocket + stomp 조합에 외부 메세지 브로커인 RabbitMQ를 적용해서 업그레이드 해보았는데요

Direct 형식에 이어서 핵심 이론이자 장점인 Topic 형식에 대해서 구현해보겠습니다!!

📌 배경

RabbitMQ에 대한 기본원리 + 사용 이유는 저번 아티클 https://velog.io/@joonoo3/Road-To-MQ-RabbitMQ-기본-개념
Direct 형식은 https://velog.io/@joonoo3/Road-To-MQ-RabbitMQ-기본-개념 에 정리해 두었습니다! (기본 뼈대 코드에 대한 설명은 이쪽에 있습니다)

📌 기본 구성

🛠️ 시나리오


회사에는 sales 부서와 develop 부서가 있고 각 부서는 director, staff 직급이 있습니다.

이제 메시지 속에서 sale / develop 부서 내 또는 director / staff 직급 내의 모든 메시지를 모아보고 싶습니다.



큐는 총 8개로 이루어 집니다.

기본적인 SalesStaff,SalesDirector,DevelopStaff,DevelopDirector의 메세지를 담는 큐들,

모든 Sales 부서, 모든 staff 직급, 모든 Director 직급, 모든 Develop 부서의 메세지를 담는 큐로 구성됩니다.


🛠️ Config

@Configuration
@EnableRabbit
public class RabbitConfig {

    //RabbitAdmin을 사용하면 RabbitMQ 서버에 Exchange, Queue, Binding을 등록할 수 있습니다.
    //RabbitAdmin은 RabbitTemplate을 사용하여 RabbitMQ 서버에 접근합니다.
    @Bean
    public AmqpAdmin amqpAdmin() {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());

        rabbitAdmin.declareExchange(topicExchange());

        //sales.staff, sales.director, develop.staff, develop.director
        List<String> routingKeys = Arrays.asList("sales.staff", "sales.director", "develop.staff", "develop.director");
        for (String routingKey : routingKeys) {
            Queue queue = new Queue(routingKey);
            rabbitAdmin.declareQueue(queue);
            rabbitAdmin.declareBinding(createBinding(routingKey, queue));
        }

        //sales.*, develop.*, *.staff, *.director
        //queue name : AllSales, AllDevelop, AllStaff, AllDirector
        List<String> allRoutingKeys = Arrays.asList("sales.*", "develop.*", "*.staff", "*.director");
        for (String routingKey : allRoutingKeys) {
            Queue queue = getQueue(routingKey);
            rabbitAdmin.declareQueue(queue);
            rabbitAdmin.declareBinding(createBinding(routingKey, queue));
        }

        return rabbitAdmin;
    }

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("JunWooCompany");
    }

    private Queue getQueue(String routingKey) {
        //.과 *을 제거
        String queueName = routingKey.replace(".", "").replace("*", "");
        return new Queue("All" + queueName, true );
    }

    private Binding createBinding(String routingKey, Queue queue) {
        return BindingBuilder.bind(queue).to(topicExchange()).with(routingKey);
    }

    //RabbitTemplate을 사용하여 RabbitMQ 서버에 메시지를 전송할 수 있습니다.
    //RabbitTemplate은 RabbitMQ 서버에 접근하기 위한 클래스입니다.
    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }

    //ConnectionFactory 등록
    //ConnectionFactory는 RabbitMQ 서버에 접근하기 위한 클래스입니다.
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setHost("localhost");
     

매우 하드코딩,,,이지만 돌아가기만 하면 되니까요 하하

STOMP 코드는 지난 글과 동일합니다!

@Configuration
@EnableWebSocketMessageBroker
public class StompConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws/chat")
                .setAllowedOrigins("*");
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {

        registry.setPathMatcher(new AntPathMatcher("."));
        registry.setApplicationDestinationPrefixes("/pub");
        registry.enableStompBrokerRelay("/queue", "/topic", "/exchange", "/amq/queue");
    }
}

🚩 생성확인

exchange생성

queue생성

잘 생성되었습니다!🔫🔫


🛠️ Controller

    @Controller
@RequiredArgsConstructor
@Slf4j
public class StompRabbitController {

    private final RabbitTemplate template;

    @MessageMapping("sales.director")
    public void toSalesDirector(String message) {
        log.info("안녕하세요. sale파트의 director님");
        template.convertAndSend("JunWooCompany", "sales.director", message);
    }

    @MessageMapping("sales.staff")
    public void toSalesStaff(String message) {
        log.info("안녕하세요. sale파트의 staff님");
        template.convertAndSend("JunWooCompany", "sales.staff", message);
    }

    @MessageMapping("develop.director")
    public void toDevelopDirector(String message) {
        log.info("안녕하세요. develop파트의 director님");
        template.convertAndSend("JunWooCompany", "develop.director", message);
    }

    @MessageMapping("develop.staff")
    public void toDevelopStaff(String message) {
        log.info("안녕하세요. develop파트의 staff님");
        template.convertAndSend("JunWooCompany", "develop.staff", message);
    }


}

여기서 중간점검을 해볼까요?

  • /pub/sales.staff로 발행을 한다면 해당 controller로 접근됩니다.
  • Controller는 JunWooCompany라는 이름의 exchange를 통해 sales.staff라는 라우팅 키로 message를 보낼껍니다!
  • 이때 패턴이 일치하는 큐는 sales.staff, *.staff, sales.* 의 바인딩 키를 가진 큐들이 되겠습니다!

📌 테스트

📨 구독

sales.staff로 바인딩 된 큐를 구독해줍니다!

consumer가 생겼네요


sales.*로 바인딩 된 큐를 구독해줍니다!

consumer가 생겼네요


📨 발행

sales.staff에 message를 발행합니다!

해당 패턴에 맞는 큐에 message가 들어오는 것을 볼 수 있습니다!

웹 메니지먼트에서도 Message rates를 통해 확인할 수 있습니다!

allstaff는 구독하는 consumer가 없었기 때문에 queue에 쌓인 상태입니다!

완료~!🔫🔫

0개의 댓글