RabbitMQ

코끼릭·2022년 9월 20일
0

IT

목록 보기
15/24
post-thumbnail

메세지큐

메세지 큐는 응용 소프트웨어 간의 비동기 통신을 위한 메시지 지향 미들웨어를 구현한 시스템으로 producer로부터 들어오는 메세지(요청)를 큐에 담고 consumer는 큐에 담긴 메세지를 꺼내 처리하는 방식의 구조로 구현되어 있다.

기존 동기적인 방식의 메세지 교환을 미들웨어의 비용을 추가하여 비동기식으로 처리하게 될 때 얻게 되는 장점은 다음과 같다.

  • 모든 요청은 메세지큐에 저장되고 처리가 가능한 요청만을 메시지큐에서 꺼내와 처리하고 있어 서버의 부담을 줄일 수 있다.

  • 일부가 실패해도 전체에 영향을 주지 않는 탄력성을 제공하고 실패할 경우 재실행이 가능하다.

  • 메세지를 전달하는 과정에서 보관을 하고 있어 통신을 하는 클라이언트 간 네트워크 연결이 유지되어야 하는 TCP와 다르게 연결을 유지할 필요가 없어지는 만큼 어플리케이션 간 결합도를 느슨하게 할 수 있다.

  • 메세지 라우팅을 통해 하나의 메세지를 여러 수신자에게 배포가 가능하다.

  • 애플리케이션과 분리되어 있기 때문에 확장성이 용이하다.

이러한 이유로 서버와 클라이언트간의 메세지 교환이 많이 이뤄지는 대용량 데이터 처리를 위한 배치 작업과 채팅 서비스에서 사용된다.

RabbitMQ

  • ISO 응용 계층의 MOM 표준인 AMQP 프로토콜을 구현한 메시지큐로 다양한 기능과 대다수의 환경에서 지원을 하고 있어 kafka와 함께 자주 사용 되는 메세지큐 오픈 소스이다.

AMQP

AMQP는 메세지큐를 구현한 프로토콜로 3개의 컴포넌트 개념을 통해 라우팅을 한다.

  • Exchange
    publisher로 수신한 메세지는 먼저 exchange가 받게 되고 exchange type 바탕으로 적절한 큐로 라우팅한다.
  • Binding
    exchange에 전달된 메세지는 어떤 큐로 전달될 것인지를 정의하는 작업으로 메세지큐에서 생성된 하나의 큐는 exchange type과 binding이 되어 있어야 자신의 큐로 전달받을 수 있다.
  • Queue
    메모리나 디스크에 메세지를 저장하고 consumer에게 전달하는 역할을 한다.

Standard Exchange Type

exchange type의 라우팅 전략은 publisher에서 송신한 메세지 헤더에 포함되는 가상 주소인 Routing Key를 기준으로 어떤 큐로 라우팅 할 지 결정하는 전략이다.

  • Direct Exchange: 라우팅 키와 동일한 이름을 가진 메세지큐에 전달하는 방법 (unicast)
  • Topic Exchange: 와일드 카드를 이용한 전달 방법 (multicast)
  • Fanout Exchange: 메세지를 모든 큐에 라우팅하는 방법 (broadcast)
  • Headers Exchange: 키-값으로 정의된 헤더에 의해 라우팅 되는 방법으로 조건문에 따라 정해진 큐로 전달하는 방법 (multicast)

Docker 파일 설정

version: '3'
services:
  rabbitmq:
    image: rabbitmq:3-management-alpine #콘솔 사용이 가능한 이미지
    container_name: rabbitmq-stream
    volumes:
    	#자동으로 웹콘솔 플러그인이 설치되도록 엔트리 포인트 스크립트를 수정했다.
      - ./docker-entrypoint.sh:/docker-entrypoint.sh
      - ./rabbitmq/etc/:/etc/rabbitmq/
      - ./rabbitmq/data/:/var/lib/rabbitmq/
      - ./rabbitmq/logs/:/var/log/rabbitmq/
    ports:
      - "5672:5672" #AMQP 메세지 전송 포트
      - "15672:15672" #콘솔 웹 접속 포트
    environment:
      RABBITMQ_ERLANG_COOKIE: "RabbitMQ-My-Cookies"
      RABBITMQ_DEFAULT_USER: "root"
      RABBITMQ_DEFAULT_PASS: "1234"

Spring에서의 사용 방법

  1. RabbitMQ의 설정을 바탕으로 connectionFactory를 빈으로 등록한다.
  2. 메세지큐에 등록할 큐를 빈으로 등록한다.
  3. 큐에 사용할 Exchange Type을 빈으로 등록한다.
  4. Binding을 통해 큐와 Exchange Type을 등록한다.
@Configuration
public class MessageBrokerConfiguration {
    @Bean
    public ConnectionFactory getConnectionFactory(){
        ConnectionFactory connectionFactory = new CachingConnectionFactory(RABBITMQ_HOST, RABBITMQ_PORT);
        ((CachingConnectionFactory) connectionFactory).setUsername(USERNAME);
        ((CachingConnectionFactory) connectionFactory).setPassword(PASSWORD);
        return connectionFactory;
    }
 
    @Bean
    public TopicExchange getExchange(){
        return new TopicExchange(EXCHANGE_NAME);
    }
 
    @Bean
    public Queue queue(){
        return new Queue(QUEUE_NAME, true);
    }
 
    @Bean
    public Binding getBinding(Queue queue, TopicExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
    }
}

@RabbitListener

메세지큐에서 저장된 메세지를 가져오는 방식은 rabbitTemplate을 통해 가져오는 방식과 아노테이션을 사용하는 방식 2가지가 존재하는데 아노테이션의 경우 전달된 메세지를 사용할 메소드 위에 @RabbitListener을 달면 된다.
특정 큐에 담긴 메세지를 수신하고 싶은 경우 아노테이션의 queue 파라미터에 연결된 메세지 브로커의 큐 이름을 지정하거나 listenerContainerFactory에 원하는 큐와 관련된 connectionFactory과 수신되는 메세지를 변환하기 위해 필요한 MessageConverter를 등록해주는 설정을 한 후 아노테이션의 containerFactory에 금방 설정한 빈의 이름을 입력하는 것을 통해 원하는 메세지큐에서만 수신이 가능하도록 커스터마이징이 가능하다.

@Configuration
public class MessageBrokerConfiguration {
	
    ...기존 rabbitmq 설정...
    
    @Bean("SampleContainerFactory")
    SimpleRabbitListenerContainerFactory getSampleContainerFactory(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter converter) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(converter);
        return factory;
    }
 
    @Bean
    public Jackson2JsonMessageConverter getMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

@Component
public class Receiver{
 
    private static final Logger logger = LoggerFactory.getLogger(Receiver.class);
 
    @RabbitListener(containerFactory = "SampleContainerFactory", queues="test-queue-1")
    public void onReceiveMessage(SampleMessage message){
        logger.info("Receiver received message: {}", message);
    }
}

Factory Pattern

객체 생성을 대신해주는 디자인 패턴으로 클라이언트는 생성하는 객체에 대한 종속 코드가 없어 서로 간의 종속성을 낮추고 결합도를 느슨하게 할 수 있다.
위의 configuration 코드에서도 RabbitMQ와의 커넥션을 위한 객체 역시 개발자가 직접 생성하는 것이 아닌 생성에 필요한 설정 객체만 넘겨주고 팩토리 빈에 생성 작업을 위임하고 있는 것을 확인할 수 있다.


메시지 지향 미들웨어
AMQP

profile
ㅇㅅㅇ

0개의 댓글