Java 로 다뤄보는 RabbitMQ

식빵·2024년 5월 28일
0

rabbit-mq

목록 보기
1/2
post-thumbnail

🎯 기본 지식 쌓기

아무래도 아무 지식 없이 뛰어드는 것보다는 가볍게라도
RabbitMQ 가 뭔지, 왜 필요한지 정도는 알고 가야겠죠?


1. Message Broker

RabbitMQ 을 표현할 때 많은 사람들이 Message Broker 라고 말합니다.
그리고 실제 RabbitMQ 공식 사이트 소개글에도 그렇게 나옵니다.


그렇다면 Message Broker 는 정확히 뭘까요?

A message broker is software that enables applications, systems and services to communicate with each other and exchange information.

The message broker does this by translating messages between formal messaging protocols. This allows interdependent services to “talk” with one another directly, even if they were written in different languages or implemented on different platforms.

출처: https://www.ibm.com/topics/message-brokers


메시지 브로커를 설명하는 위 영문을 해석하자면 아래와 같습니다.

message broker 는 애플리케이션, 시스템 또는 서비스 간에 소통을 위한 소프트웨입니다.
이를 위해서 기반이 되는 프로토콜(ex: AMQP)을 사용하며,
결과적으로 서로 성격이 완전히 다른(기반이 되는 프로그래밍 언어, OS 환경 등) 애플리케이션
간에 소통(=메시징)이 가능토록 하는 것입니다.

더 쉽게 얘기하자면, 그냥 서로 다른 애플리케이션 간에 메세지를 전달하는 중간 역할자,
즉 브로커(broker) 라는 거죠.



... 이게 왜 필요해?

아마 여기까지 읽고, 이게 왜 필요하지? 라고 생각하실 수도 있습니다.
그냥 app 에서 다른 app 으로 Rest API 요청을 날려서 메세지를 전달하면 되지 않을까요?
맞습니다!

실제로도 단순한 메시지 전달이면 그래도 되겠죠.
하지만 메세지 브로커라는 중간 매개체가 있기 때문에 우리는
아주 중요한 2가지 이점을 얻을 수 있습니다.

  1. 비동기 통신
  2. 서로 다른 App 간의 소스 코드(또는 로직)상의 디커플링

이점 1: 비동기 통신

예를 들어서 Application A 가 제법 시간이 걸리는 작업을
하는 Application B 의 REST API 를 직접 호출하면 어떻게 될까요?
AB 의 처리 지연으로 인해 계속해서 응답을 기다리게 됩니다.
심지어 그 응답은 Body 에 아무 내용이 없는데도 말이죠.

사용자가 많지 않은 서비스면 상관없지만 사용자가 많아지면...?
결과적으로 요청이 수백 수천 건으로 뻥튀기되면 점점 병목 현상이 일어나게 됩니다.

이러한 현상이 일어나는 가장 근본적인 이유는 Application A
Application B 에 대하여 직접적으로 API 를 호출하기 때문입니다.
즉 너무 밀접한 관계가 형성(커플링)되어 있기 때문입니다.


이번에는 메시지 브로커를 쓴다면 어떨지 생각해보죠.

이번에는 Application A 가 Message Broker 에 요청, 즉 메시지를 넣어서
요청을 전송시킵니다. 이를 다르게 말해서 Publish 라고 표현합니다.

이러한 Publish 는 비동기적으로 이루어짐으로 Application A 는 어떠한 대기도
일어나지 않습니다. 그러니까 애초에 병목이 일어날 수가 없는거죠.

Message Broker 는 내부적으로 이러한 메시지들을 Queue 에 넣어서 관리하는데,
앞서 전달된 메시지도 이 Queue 에 넣게 됩니다.

이후 Message Broker 는 자신에게 Queue 에 어떤 정보가
들어오면 자기한테 알려달라고 미리 약속했던 Application B 에게 해당 메시지를
전달하게 됩니다.

Application BMessage Broker 가 전달해준 데이터를 받게 되면
어떤 작업을 할지 미리 등록한 Listener 에 의해 메세지 처리가 일어납니다.
이를 다르게 말해서 Consume 이라고 표현합니다.


고작 중간에 Broker 하나 넣었을 뿐인데,
비동기적으로 서로간에 요청을 가능케 합니다.
원천적으로 병목현상이 일어나는 일을 막아 주고 있는 것이죠.



이점 2: 서로 다른 App 간의 소스 코드상의 디커플링

아마 이점1 을 읽으신 여러분들 중에서 이런 생각을 하는 분들이 있지 않을까 싶습니다.

그냥 Rest API 구현을 비동기적으로 구현하면 되잖아?

그렇긴 하죠~
하지만 Rest API 를 직접 호출하는 것은 하나의 Application 이 다른 App
어떤 Rest API 를 호출해야될지 모두 알아야 된다는 관점에서
코드에 서로 의존성이 생기게 됩니다.


하지만 Message Broker 를 사용한다면 어떨가요?

만약 서로 다른 App B, App CApp A 가 Message Broker 에 넣은 메시지를
각자 전달받고 각자 알아서 처리하게 되면 어떨까요?
그러면 App A 입장에서는 자기가 어떤 API 를 호출해야되는지 알 필요가 없습니다.
즉 코드 상(또는 로직상)에서도 Decoupling 이 된 것이죠.

이러한 이유로 많은 마이크로서비스가 메시지 브로커 역할을 해주는 Kafka 를 사용해서
서비스간 통신을 하는 것입니다. 각각의 마이크로서비스는 자신이 처리할 수 없는 일을
Kafka 에게 메시지(Event)로 전달하고, 이 메시지에 관심이 있는 다른 마이크로서비스들은
각자 받은 Event 에 대한 처리를 개별적으로 하는 형태가 됩니다.




2. RabbitMQ

자~ 이제 Message Broker 의 필요성은 어느정도 알았습니다.
그렇다면 Message Broker 의 구현체 중 하나인 (오늘의 주인공) RabbitMQ 를 알아봅시다.

참고: AMQP
앞서서 Message Broker 는 메시지 전달을 위한 프로토콜을 사용한다고 말씀드렸죠?
이러한 프로토콜 중에서 AMQP(Advanced Message Queue Principal) 이 있습니다.
RabbitMQ 는 바로 이 AMQP 프로토콜을 준수하는 Message Broker 입니다.


구성요소

RabbitMQ 사용할 때 알아야할 구성요소가 몇가지 있습니다.
어떤 구성요소들이 있는지 빠르게 알아보고 갑시다.
(아래 그림을 참고하면서 글을 읽어주세요)

출처: https://www.rabbitmq.com/

일단 쉽게 이해하고 넘어가고 싶다면 간단한 설명만 읽고 넘어가세요!
Deep 한 설명은 제가 실습코드도 돌리고, 혼자서 더 공부해보고 난 이후에 나온 결론입니다.


👉 간단한 설명

  • Publisher : 메세지 쏘는 애

  • Exchange : Publisher 한테 메세지 받고, 이걸 어떤 규칙에 따라 특정 Queue 들한테 메세지 전달하는 애

  • Queue : 메세지 보관소. Consumer 한테 전달.

  • Consumer: 메세지 받는 애

  • Channel : Publisher 가 RabbitMQ 에게 메세지 전달 시 쓰는 데이터 전송 매체

  • Binding : Exchange 가 어떤 Queue 에게 메세지를 전달할지 결정할 때 사용하는 규칙


👉 Deep 한 설명

이 설명은 이제 막 RabbitMQ 를 접하신 분들에게는 어려울 수 있습니다!
최대한 가볍게 읽고만 넘어가시거나, 실습코드 돌려보고나서 다시 돌아와서 읽으시기 바랍니다.

  • Publisher (또는 Producer)
    • 메시지를 생성하여 큐에 넣는 RabbitMQ 를 사용하는 외부 애플리케이션(또는 시스템)입니다.

  • Consumer
    • Queue 에 있는 메시지를 수신하는 애플리케이션(또는 시스템)입니다.

  • Exchange
    • Publisher 의 메시지를 받아내는 RabbitMQ 내부 구성요소입니다.
    • 받은 Message 에 대해서 여러가지 규칙에 의해서 어떤 0~n 개의 Queue 에 메시지를 전달합니다.
      이런 동작을 Route 라고도 표현합니다.
    • 여기서 말하는 RuleExchangeType 에 따라 방식이 조금씩 다릅니다. (이건 코딩할 때 보는 걸로)

  • Queue
    • Exchange 로부터 받은 메시지를 [보관/전송/삭제] 합니다.
    • 양쪽이 열려있는 List 형태의 자료구조. 앞/뒤로 메시지를 넣고 뺄 수 있습니다.
    • 자신과 연결된 Consumer 가 있고, 메시지가 보관되어 있으면 즉시 메시지를 Consumer 에게 전달합니다.
    • 만약 Consumer 가 메시지를 제대로 못받았으면, 해당 메시지를 다시 Queue 보관하게 됩니다.
    • QueueConsumer 에 메세지를 전달 후, Consumer 로부터 ack 응답을 받아야
      정상적으로 메세지를 전달한 것으로 판단합니다. 그 이후에 Queue 내부에 메시지 실제로 삭제를 합니다.

추가적으로 그림에는 안 나오지만 알아두면 좋은 Channel, Binding 이라는 것도 있습니다.

  • Channel
    • RabbitMQ 와 Connection 을 맺는 외부의 App 들이 실제 RabbitMQ 와 통신할 때 사용하는 것입니다.
    • 하나의 ConnectionTCP 기반의 애플리케이션 레이어 프로토콜을 사용합니다.
    • 그리고 이러한 하나의 Connection 은 여러개의 Channel 을 다발로 갖고 있습니다.
    • 일종의 Multiplexer 라고 생각하면 편합니다.
    • 여러 Channel 로 동시에 Message 를 전달해도 결국은 하나의 Connection 으로 빠져 나가고,
      RabbitMQ 는 이러한 하나의 Connection 을 통해서 App 의 메시지를 수용합니다.

  • Binding
    • 바인딩(Binding)은 Exchange 가 어떤 Queue 에 메세지를 전달할지 결정(route)할 때 사용하는 규칙입니다.
    • 또는 Exchange 와 Queue 와의 관계를 의미하기도 합니다.
      • 이러한 이유로 Binding 은 Exchange, Queue 둘 중 하나에 설정하면 동시에 양쪽 모두 세팅됩니다.
    • 바인딩이 있어야만 Exchange 가 Queue 에게 메세지를 전달할 수 있습니다.
    • Binding 에는 routing key 속성을 지정할 수 있으며, 이는 Exchange 에서 route 작업 때 사용할 수도 있습니다.


여기까지해서 이론적인 부분은 끝났습니다.
나머지 이해가 좀 애매했던 부분들은 실습 코드를 통해서 이해해보죠.






🎯 코드로 이해하기

이제 코딩 좀 해봅시다.

참고: 저의 집구석 개발환경

  • OS : Window 10 Home
  • IDE : IntelliJ IDEA 2024.1.2 (Ultimate Edition)
  • JDK : azul jdk 21.0.3
  • Build Tool : Apache Maven 3.9.6 (IDE 내장)
  • Docker : Docker Engine v.26.1.1

1. Docker - RabbitMQ 설치

https://www.rabbitmq.com/docs/download 에서 제시하는 방법 그대로 했다.
Docker 를 실행하고 아래와 같이 명령어를 입력했다.

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management
  • 참고(1) : 5672 포트는 RabbitMQ 와 애플리케이션이 소통하는 포트입니다.
  • 참고(2) :
    • 15672 포트는 RabbitMQ 관리자 UI 제공 포트입니다.
    • 브라우저 키고 http://localhost:15672 로 접속하면 됩니다.
    • 접속 후 로그인을 위한 아이디 및 비밀번호는 아래와 같습니다.
      • id : guest
      • pw : guest

참고 이미지(1) : RabbitMQ 관리자 UI 접속 (로그인 전)

  • guest / guest 로 로그인!

참고 이미지(2) : RabbitMQ 관리자 UI 접속 (로그인 후)



2. Java 환경 - 기본실습

일단 Java 를 통해서 RabbitMQ 를 조작하는 방법을 알아봅시다.
그리고 RabbitMQ 에서 가장 중요한 Exchange Type 에 대해서 알아보겠습니다.

이번 기본 실습에서는 Exchange Type 중에서 Direct 를 사용한다는
점만 기억하시고 실습을 진행해주시기 바랍니다!


프로젝트 생성

maven 프로젝트를 생성하고, pom.xml 을 아래와 같이 작성하여 의존성을 추가합니다.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" 
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    
    <!-- 버전 작성하기 귀찮아서 spring boot starter parent 사용 -->
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.3.0</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <groupId>coding.toast</groupId>
    <artifactId>rabbit-mq-java</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <properties>
        <java.version>21</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <!-- 참고: 사용되는 버전은 1.5.6 입니다. -->
        </dependency>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <!-- 참고: 사용되는 버전은 5.21.0 입니다. -->
        </dependency>
    </dependencies>
</project>



Connection, Channel 생성

TCP 기반의 네트워크 연결인 Connection 을 생성하고,
하나의 Connection 에서 Channel 을 생성하는 법은 다음과 같습니다.

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class CreateConAndChannel {
    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();

        // default host, port config
        factory.setHost("localhost");
        factory.setPort(5672);

        // default connect timeout - (default : 60000 = 1 minute)
        factory.setConnectionTimeout(5000);

        // default max channel - (default : 2047)
        factory.setRequestedChannelMax(32);

        // create connection
        try (Connection connection = factory.newConnection()) {
            Channel channel = connection.createChannel();

            // do something with channel
        } catch (IOException | TimeoutException e) {
            throw new RuntimeException(e);
        }
    }
}

참고로 factory 에서 default 로 호스트/포트를 지정했는데
지정하지 않으면 자동으로 host: localhost / port: 5672 를 사용합니다.

그리고 현재 try-catch-resource 구문을 통해서 Connection 을 자동으로 close 하는데,
이러면 Connection 과 연관된 모든 Channel 은 자동으로 close 된다는 점도 알아두시기 바랍니다.

애플리케이션이 살아있는 동안에 계속 사용하고 싶다면?
Connectionclose 하지 않으면 됩니다.



이 아래에서 부터 나오는 Exchange, Queue, Binding 설정 및 생성은
모두 Admin UI 에서 가능한 일들입니다. 하지만 이번 글에서는 해당 내용을
Java 코드로만 작업할 예정이고, Admin UI 와 관련해서는 완전히 따로 글을 작성해서
올리 예정입니다. 답답해도 참고 봐주세요 😂


Exchange, Queue 생성

위에서 본 Connection, Channel 생성 코드는 생략하고,
try-catch-resource block 내부의 코드만 작성했습니다.

try (Connection connection = factory.newConnection()) {
	Channel channel = connection.createChannel();
	
    
    // 1. Exchange 생성
	// my.exchange 라는 이름의 Exchange 가 없으면 생성하고 있으면 무시한다.
    // 단! 아래 설정값과 기존 Exchange 의 설정이 다르면 예외를 터뜨린다.
	channel.exchangeDeclare(
		"my.exchange", 				// exchange 명칭
		BuiltinExchangeType.DIRECT, // exchange 타입 (이건 다른 목차에서 다루겠습니다.
		true, 						// durable 여부
		false, 						// auto-delete 여부
		null 						// 그외 추가적인 설정 파라미터 (Map 인스턴스)
	);
	
    
    // 2. Queue 생성
	// my.queue 라는 이름의 Queue 가 없으면 생성하고 있으면 무시한다.
    // 단! 아래 설정값과 기존 Queue 의 설정이 다르면 예외를 터뜨림
	channel.queueDeclare(
		"my.queue",  // 큐의 명칭
		true,        // durable 여부
		false,       // exclusive 여부
		false,       // auto-delete 여부
		null         // 그외 추가적인 설정 파라미터 (Map 인스턴스)
	);
	
    
	
} catch (IOException | TimeoutException e) {
	throw new RuntimeException(e);
}

Exchange 생성 시에 BuiltinExchangeType.DIRECT 라는 것을 사용했습니다.
이는 Exchange Type 인데, 이와 관련된 내용은 워낙에 방대함으로
추후에 다른 목차에서 설명 및 실습하도록 하겠습니다.

생각보다 생성 시에 사용되는 설정값들이 많죠?
해당 설정값들에 대한 정의는 다음과 같습니다.


- Exchange 속성

  • Name : 명칭

  • Durability :
    broker 재시작할 때 지워지지 않고 계속 유지

  • Auto-delete :
    최소 하나의 queue 가 한번 bind 되고, 이후 모든 queue 가 unbound 될 때 삭제

  • Arguments (선택사항) :
    플러그인 또는 broker 자체에서 제공되는 특수 기능을 위한 설정


- Queue 속성

  • Name : 명칭

  • Durable broker 재시작할 때 지워지지 않고 계속 유지

  • Exclusive :
    오로지 하나의 connection 만 맺고, 해당 connection 이 close 되면 Queue 삭제

  • Auto-delete :
    최소 하나의 consumer 가 subscribe 하고, 이후 마지막 consumer 가 unsubscribe 할 때 삭제

  • Arguments (선택사항):
    플러그인 또는 broker 자체에서 제공되는 특수 기능을 위한 설정.
    예들 들어서 TTL, QUEUE 길이 제한 등이 있음.


아무튼 위처럼 생성하고 나서 RabbitMQ Admin UI 에 접속해서 제대로 생성되었는지
눈으로 직접 확인해봅시다.



Queue 와 Exchange 의 연결고리 - Binding 생성

try (Connection connection = factory.newConnection()) {
	Channel channel = connection.createChannel();
	
    // 설명 생략
	channel.exchangeDeclare("my.exchange", 
    						BuiltinExchangeType.DIRECT, 
    						true, false, null);
	// 설명 생략
    channel.queueDeclare("my.queue", true, false, false, null);

	// Binding 을 생성합니다!
	channel.queueBind(
    	"my.queue", 		// 연결할 queue 지정
        "my.exchange", 		// 연결할 exchange 지정
        "my.routing.key"	// 추후에 라우팅 시에 사용할 routing_key 지정
	);
    
    // 참고: 이 예시에서는 Binding 을 하나만 생성하지만,
    // 하나의 Queue 는 여러 개의 Binding 을 갖을 수 있습니다.
	
} catch (IOException | TimeoutException e) {
	throw new RuntimeException(e);
}

한번 실행하고 나서 Admin UI 접속하여 Binding 생성 여부를 눈으로 확인해봅시다~

  • Queues and Streams 탭 클릭합니다.
  • Queue 의 이름 클릭합니다.

  • Bindings 라는 문구를 클릭합니다.
  • 그러면 현재 QueueBinding 상태를 조회할 수 있습니다.
  • 아까 저희가 생성한 binding 이 보이면 성공입니다.

참고: (Default exchange binding)
(Default exchage binding) 이라는 특별한 Binding 이 있는데,
이건 Queue 를 생성하자마자 Message Broker 가 자동으로 할당하는 Binding 입니다.
해당 Binding 으로 연결된 ExchangeBroker 가 제공하는 Default Exchange 입니다.
이와 관련된 내용은 이후 Exchange Type 에서 더 자세히 알아보겠습니다.



Publish 하기

Binding 으로 ExchangeQueue 를 연결했으니,
이제 PublisherExchange 에 메세지를 전송(publish) 하고
이걸 Queue 까지 전송되도록 하겠습니다.

그런데 여기서 죄송하지만 한가지 추가로 알아둘 점이 있습니다.
제가 앞서 Exchange 를 BuiltinExchangeType.DIRECT 타입으로 생성을 했는데,
Exchange 는 타입에 따라서 조금씩 다른 방식으로 Queue 에 메세지를 routing 합니다.

DIRECT 타입의 Exchange 는 아래 2개의 값이 완전히 동일해야만
BindingQueueroute 를 수행합니다.

  • publisher 에서 전송한 메세지의 routing_key
  • Bindingrouting_key

그림으로 보면 아래와 같은 동작입니다.

나머지 Exchange Type 들은 다른 목차에서 다루겠습니다!

자 이제 어떻게 동작하는지 알겠죠?
Publisher 클래스를 하나 생성하여 Exchange 에 메세지를 전송해봅시다!

package coding.toast.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.concurrent.TimeoutException;

public class MessagePublisher {
	public static void main(String[] args) {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		factory.setPort(5672);
		
		try (Connection connection = factory.newConnection()) {
			Channel channel = connection.createChannel();
			
			String publishingMessage
				= "Hello World!!! / timestamp => %s".formatted(Instant.now().getEpochSecond());
			
			channel.basicPublish(
				"my.exchange",  // 메세지 전송 타깃인 exchange 명칭
				"my.routing.key",         // routing_key 설정
				null,                     // BasicProperties 설정 인스턴스, 뭔지 몰라서 null 처리
				publishingMessage.getBytes(StandardCharsets.UTF_8) // 전송할 데이터 설정
			);
			
		} catch (IOException | TimeoutException e) {
			throw new RuntimeException(e);
		}
	}
}

publish 를 할 때 전송하는 것이 무엇인지 잘 기억해둡시다.

  • Exchange name
  • Routing Key name
  • (선택사항) 추가 프로퍼티
  • 전달할 메세지의 내용

코드를 한번 실행한 후에 Admin UI 에 접속해서 제대로 메세지가 전달됐는지 확인해봅시다.

  • Queues And Streams 탭을 클릭하면 아래와 같이 큐에
    Message 가 하나 늘어난 것을 확인할 수 있습니다.

  • 해당 QueueName 을 클릭하고
  • 스크롤을 좀 내려서 Get messages 라는 문구를 클릭합니다.
  • 펼쳐지면 Get Message(s) 버튼을 클릭합니다.
  • 앞서 publisher 가 전송한 데이터가 눈에 보입니다!



Consume 하기

이제 Queue 에 있는 정보를 소비해줄 Consumer 가 필요하죠?
하나 생성해보죠.

package coding.toast.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class MessageConsumer {
	public static void main(String[] args) {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		factory.setPort(5672);
		
		try {
			
			Connection connection = factory.newConnection();
			Channel channel = connection.createChannel();
			
			DeliverCallback deliverCallback = (consumerTag, delivery) -> {
				String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
				System.out.println(" [x] Received '" + message + "'");
			};
			
			channel.basicConsume(
				"my.queue", // consume(또는 subscribe) 할 Queue 지정.
				true,               // auto-ack 여부
				deliverCallback,    // 받은 메세지 처리를 수행할 콜백
				(consumerTag) -> {} // 캔슬 콜백. 정확히 뭔지 몰라서 지정 안함!
			);
			
            // 계속해서 메세지를 받아보기 위해서 일부러
            // close 메소드를 호출하지 않았습니다!! 까먹은 거 아닙니다!
            
		} catch (IOException | TimeoutException e) {
			throw new RuntimeException(e);
		}
	}
}

위 코드를 실행해봅시다.
그러면 아마 Publisher 가 전송해서 Queue 에 들어갔던 메세지가
console 창에 찍힐 겁니다.

그런데 Consumer 클래스는 실행 후에 일부러 계속 프로세스가 살아있도록 제가 코딩했는데,
이렇게 살아있는 상태에서 Publisher 클래스를 여러 번 다시 실행하면 어떨까요?

그러면 그때마다 Consumer 는 아래와 같이 새롭게 받은 메세지를 콘솔에 찍게 됩니다.

이렇게 Queue 와 연결(Subscribe)된 Consumer 에 메세지가 다 전달되고,
Consumer 들이 ack 신호를 되돌려주면, 그때 Queue 에서 메세지가 삭제됩니다.
그래서 Consumer 코드를 재실행하면 아무 메세지가 안 나올 겁니다.





3. Exchange Type 과 동작 방식

자~ 지금까지 rabbitMQ 기본 API 사용법을 익혀봤습니다.
그리고 이제 슬슬 Exchange Type 에 대한 설명을 해보려고 합니다.

저희가 위에서 사용한 Exchange 도 Type 이 있었는데,
타입이 DIRECT 였습니다. 여태까지 이걸 사용해서 메세지를 routing 한 겁니다.

지금부터 RabbitMQ 에서 사용되는 Exchange 의 타입이 뭐가 있는지,
그리고 각 타입마다 어떤 방식으로 메세지 라우팅을 하는지 자세히 알아봅시다.



제가 작성한 설명이 조금 읽기 힘들다면 그냥 아래 유투브 영상으로 이해하고
넘어가셔도 좋습니다. 설명과 그림이 아주 좋은 영상이네요.

다만 실습 코드는 영상에서 제공하지 않으니, 실습까지 하실 분들은
아래 글에서 테스트 코드 부분만 똑같이 따라쳐보고 실행해보시기 바랍니다~



Exchange Type 과 RabbitMQ 기본제공 Exchange

아래는 Exchange 타입들과 각 타입들에 대한 RabbitMQ 의 기본
제공 Exchange 의 이름들입니다.

Exchange 타입 RabbitMQ 기본 제공 Exchange 명칭
Direct exchange (Empty string) and amq.direct
Fanout exchange amq.fanout
Topic exchange amq.topic
Headers exchange amq.headers

지금부터 위의 각 Exchange Type 들을 설명과 코드를 통해서 차근차근 알아가봅시다.




Direct Exchange

Bindingrouting_key 속성을 사용하여 어떤 Queue 에게 메세지를 전달할지
결정하는 Exchange 타입이다.

Publish 할 때 Exchange 에 제공하는 routing_key 의 값과,
Exchange 가 들고 있는 Binding 목록들의 routing_key 들 중에서
서로 값이 동일한 것이 있으면 Exchange 는 해당 binding 으로 연결된 Queue 에
모두 메세지를 전달합니다. (위 그림 참고)

이건 이미 실습을 했으니 넘어가도록 하겠습니다.




참고: Default Exchange

이 타입은 따로 존재하는 타입이 아닙니다.
실제로는 Direct 와 완전히 같은 타입입니다.
다만 동작방식이 일반적인 Direct Exchange 들과 다릅니다.

이 Exchange 는 생성되는 모든 큐에 자동으로 Binding 됩니다.
그리고 이렇게 생긴 Bindingrouting_key 는 무조건 Queue 의 명칭을 사용합니다.

예를 들어서 Queue 를 생성하고 나서 아래와 같이
Publish 하면 메세지를 전송할 수 있다는 의미입니다.

  • Exchange name : "" (공백)
  • routing_key : <큐의 이름>

참고: Default Exchange 의 명칭은 공백 (= 빈 문자열) 입니다.

주의:
RabbitMQ 에서는 이 Default Exchange 에 대한 Binding/Unbinding 을 허용하지 않습니다!


테스트 코드

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.concurrent.TimeoutException;

public class DirectPublishTest {
    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);

        try (Connection connection = factory.newConnection()) {
            Channel channel = connection.createChannel();

            // (없을 때만) Exchange 생성
             channel.exchangeDeclare("my.exchange", 
             						  BuiltinExchangeType.DIRECT, 
                                      true, false, null);

            // (없을 때만) Queue 생성
            channel.queueDeclare("my.queue", true, false, false, null);

            // Binding 으로 Exchange 와 Queue 연결
            channel.queueBind("my.queue", "my.exchange", "my.routing.key");

            // 전송할 메세지 내용 생성
            byte[] message
                    = "Default Message Publish! Timestampe => %s"
                    .formatted(Instant.now())
                    .getBytes(StandardCharsets.UTF_8);

            // Default Exchange 로 메세지 publish!
            channel.basicPublish(
                    "",          // exchange 명 : 공백(= default exchange)
                    "my.queue",  // routing_key = 큐의 이름
                    null,        // 아직 몰라도 됩니다. null 처리!
                    message);    // 전송할 메세지

        } catch (IOException | TimeoutException e) {
            throw new RuntimeException(e);
        }
    }
}

Admin UI 에서 메세지 확인




Fanout Exchange

팬아웃 타입은 동작방식이 아주 단순합니다.
Publisher 로부터 메세지를 받으면 Exchange 는 자신과 연결된
모든 Queue 에 메세지를 전달합니다. routing_key 는 같이 보내도 무시됩니다.


테스트 코드

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.concurrent.TimeoutException;

public class FanoutPublishTest {
    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);

        try (Connection connection = factory.newConnection()) {
            Channel channel = connection.createChannel();

            final String EXCHANGE_NAME = "my.fanout.exchange";
            final String QUEUE_NAME_1 = "my.first.queue";
            final String QUEUE_NAME_2 = "my.second.queue";
            final String QUEUE_NAME_3 = "my.third.queue";
            
            // 이미 동일명칭의 Queue 및 Exchange 가 있다면 삭제
			// (혹여나 같은 이름 때문에 짜잘한 버그 생기는 게 싫어서 이러는 겁니다)
			channel.queueDelete(QUEUE_NAME_1);
			channel.queueDelete(QUEUE_NAME_2);
			channel.queueDelete(QUEUE_NAME_3);
			channel.exchangeDelete(EXCHANGE_NAME);

            // Fanout Exchange 생성
            channel.exchangeDeclare(EXCHANGE_NAME /*exchange 명*/,
                                    BuiltinExchangeType.FANOUT/*exchange type*/,
                                    true/*durable*/, 
                                    false/*auto-delete*/, 
                                    null/*arguments*/);

            // Queue 생성 + Binding (1)
            channel.queueDeclare(QUEUE_NAME_1/*queue name*/, 
            					true/*durable*/,
                                false/*exclusive*/, 
                                false/*auto-delete*/, 
                                null/*arguments*/);

            channel.queueBind(QUEUE_NAME_1/*queue name*/,
            				  EXCHANGE_NAME/*exchange name*/,
                              "hello.world"/*routing_key*/);


            // Queue 생성 + Binding (2)
            channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
            channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, "hi.there");


            // Queue 생성 + Binding (3)
            channel.queueDeclare(QUEUE_NAME_3, true, false, false, null);
            channel.queueBind(QUEUE_NAME_3, EXCHANGE_NAME, "bye.bye");


            // 전송할 메세지 내용 생성
            byte[] message
                    = "Fanout Message Publish! Timestamp => %s"
                    .formatted(Instant.now())
                    .getBytes(StandardCharsets.UTF_8);


            // Fanout Exchange 로 메세지 publish!
            channel.basicPublish(
                    EXCHANGE_NAME, // exchange 명
                    "this.will.be.ignored", // routing_key, 어떤값을 넣든 무시됩니다.
                    null,
                    message);

        } catch (IOException | TimeoutException e) {
            throw new RuntimeException(e);
        }
    }
}
  • my.fanout.exchange 라는 이름의 Fanout 타입 Exchange 를 생성하고,
  • Binding 하려는 Queue 3개를 생성해줍니다.
  • Queue 에 대해서 Fanout ExchangeBinding 해줍니다.
  • Fanout Exchange 에 메세지를 Publish 합니다. (routing_key 는 무시됩니다!)
  • 이후 Admin UI 에서 3개의 큐에 모두 메세지가 담긴 것을 확인할 수 있습니다.

Admin UI 에서 확인




Topic Exchange

Topic Exchange 는 Direct 타입처럼 Publisher 가 보낸 routing_key 과
Binding 의 routing_key 를 비교합니다.

다만 Bindingrouting_key패턴 을 사용할 수 있습니다.

예를 들어서 위 그림처럼 Queue-1, Queue-2 의 Binding 에 있는
routing_keyme.*.*, me.*.code 라는 패턴을 적용한 했다고 가정합시다.

이러면 Publisher 에서 me.daily.code 라고 전송하면 앞서 본 패턴과
일치하여 Queue-1, Queue-2 에는 메시지가 전송됩니다.

하지만 Queue-3 는 패턴이 맞지 않기 때문에 메세지를 전송받지 못하게 됩니다.

이렇듯 Binding 의 routing_key 패턴을 사용하는 것이 Topic Exchange 입니다.


테스트 코드

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.concurrent.TimeoutException;

public class TopicPublishTest {
    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);

        try (Connection connection = factory.newConnection()) {
            Channel channel = connection.createChannel();

            final String EXCHANGE_NAME = "my.topic.exchange";
            final String QUEUE_NAME_1 = "my.first.queue";
            final String QUEUE_NAME_2 = "my.second.queue";
            final String QUEUE_NAME_3 = "my.third.queue";
            
            // 이미 동일명칭의 Queue 및 Exchange 가 있다면 삭제
			// (혹여나 같은 이름 때문에 짜잘한 버그 생기는 게 싫어서 이러는 겁니다)
			channel.queueDelete(QUEUE_NAME_1);
			channel.queueDelete(QUEUE_NAME_2);
			channel.queueDelete(QUEUE_NAME_3);
			channel.exchangeDelete(EXCHANGE_NAME);

            // Fanout Exchange 생성
            channel.exchangeDeclare(EXCHANGE_NAME /*exchange 명*/,
                                    BuiltinExchangeType.TOPIC/*exchange type*/,
                                    true/*durable*/,
                                    false/*auto-delete*/,
                                    null/*arguments*/);

            // Queue 생성 + Binding (1)
            channel.queueDeclare(QUEUE_NAME_1/*queue name*/,
                                true/*durable*/,
                                false/*exclusive*/,
                                false/*auto-delete*/,
                                null/*arguments*/);

            channel.queueBind(QUEUE_NAME_1/*queue name*/,
                              EXCHANGE_NAME/*exchange name*/,
                              "me.*.code"/*routing_key*/);


            // Queue 생성 + Binding (2)
            channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
            channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, "*.*.code");


            // Queue 생성 + Binding (3)
            channel.queueDeclare(QUEUE_NAME_3, true, false, false, null);
            channel.queueBind(QUEUE_NAME_3, EXCHANGE_NAME, "your.*.code");


            // 전송할 메세지 내용 생성
            byte[] message
                    = "TOPIC Message Publish! Timestamp => %s"
                    .formatted(Instant.now())
                    .getBytes(StandardCharsets.UTF_8);


            // Fanout Exchange 로 메세지 publish!
            channel.basicPublish(
                    EXCHANGE_NAME,   // exchange 명
                    "me.daily.code", // routing key
                    null,
                    message);

        } catch (IOException | TimeoutException e) {
            throw new RuntimeException(e);
        }
    }
}
  • 현재 전송한 routing_key=me.daily.code 입니다.
  • 이걸 받은 Topic Exchange 는 현재 binding 목록 중에서 publisher
    가져온 routing_key 와 패턴 매칭이 되는 binding 을 추려냅니다.
    • my.first.queue pattern ==> [me.*.code] : 패턴 매치 🟢
    • my.second.queue pattern ==> [*.*.code] : 패턴 매치 🟢
    • my.third.queue pattern ==> [your.*.code] : 패턴 매치 ❌

Admin UI 에서 확인

  • 패턴매칭이 된 my.first.queue, my.second.queue 는 Messages 의 total=1 이다.
  • 패턴매칭이 안된 my.thrid.queue 는 Message 가 하나도 없는 것을 확인할 수 있습니다.




Headers Exchange

이 타입은 Binding 에있는 routing_key 대신에

  • publisher 가 전송한 Message headers 의 값들과
  • Binding 에 작성된 Parameters (또는 Arguments) 를 비교하여

메세지를 routing 하는 방식입니다.

이 타입의 ExchangePublisherrouting_key 를 보내도
해당 값은 무시하며 오로지 전달받은 headers 만 확인합니다.

Publisher 가 전송하는 headersBindingArguments
모두 key-value 형태로 정의되어 있으며, Arguments 의 경우에는
예외적으로 x-match 라는 특별한 key 를 갖습니다.

그리고 이러한 x-matchall/any 중 어떤 값을 갖냐에 따라 다르게 동작합니다.

  1. x-match=all 인 경우에는 x-match 를 제외한 모든 Arguments
    Publisher 의 Header 의 모든 key-value 와 일치해야만 Queue 로 메세지를 전송합니다.

  2. x-match=any 인 경우에는 x-match 를 제외하고 Arguments 중에
    전달받은 Header 와 key-value 단 하나라도 일치하면 Queue 로 메세지를 전송합니다.

이제 코드로 확인해보죠.


테스트 코드

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class HeaderPublishTest {
	public static void main(String[] args) {
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("localhost");
		connectionFactory.setPort(5672);
		
		try (Connection connection = connectionFactory.newConnection()) {
			Channel channel = connection.createChannel();
			final String EXCHANGE_NAME = "my.header.exchange";
			final String QUEUE_NAME = "my.header.queue";
			
			// 이미 동일명칭의 Queue 및 Exchange 가 있다면 삭제
			// (혹여나 같은 이름 때문에 짜잘한 버그 생기는 게 싫어서 이러는 겁니다)
			channel.queueDelete(QUEUE_NAME);
			channel.exchangeDelete(EXCHANGE_NAME);
			
			// 기존 Exchange 삭제, Header Exchange 생성
			channel.exchangeDeclare(EXCHANGE_NAME /*exchange 명*/,
				BuiltinExchangeType.HEADERS/*exchange type*/,
				true/*durable*/,
				false/*auto-delete*/,
				null/*arguments*/);

			// Queue 생성 + Binding
			channel.queueDeclare(QUEUE_NAME/*queue name*/,
				true/*durable*/,
				false/*exclusive*/,
				false/*auto-delete*/,
				null/*arguments*/);
			
			
			Map<String, Object> headerBindingParam = new HashMap<>();
			headerBindingParam.put("x-match", "any"); //any or all
			headerBindingParam.put("name", "dailycode");
			headerBindingParam.put("favorite", "bread");
			
			
			channel.queueBind(QUEUE_NAME/*queue name*/,
				EXCHANGE_NAME           /*exchange name*/,
				"",           			/*routing_key*/
				headerBindingParam);    /*binding parameters(= Arguments)*/

			
			// 전송할 메세지 내용 생성
			byte[] message
				= "HEADER Message Publish! Timestamp => %s"
				.formatted(Instant.now())
				.getBytes(StandardCharsets.UTF_8);
			

			// Property 생성 및 전송할 Header 의 key-value 설정
			AMQP.BasicProperties properties =
				new AMQP.BasicProperties().builder()
					.headers(Map.of("name", "dailycode"))
					.contentEncoding("UTF-8")
					.build();
			
			// Header Exchange 로 메세지 publish!
			channel.basicPublish(
				EXCHANGE_NAME,   // exchange 명
				"",              // routing key
				properties,      // property 설정!
				message);

			
		} catch (IOException | TimeoutException e) {
			throw new RuntimeException(e);
		}
	}
}
  • Binding parameter(=Arguments) 를 아래와 같이 설정
    • x-match : any
    • name : dailycode
    • favorite : bread
  • 위처럼 했으니 Publisher 가 전송하는 Headerkey-value 들 중에서
    단 하나라도 있으면 Queue 로 메세지를 전달하겠죠?
  • Publish 할 때 Header key-value 를 설정합니다.
    • name : dailycode
  • name : dailycode 가 하나 일치하므로 Headers Exchange
    Queue 로 메세지를 전달합니다.


Admin UI 에서 확인

  • 생성한 큐의 Arguments 를 한번 확인하고~

  • 큐의 메세지를 읽어보면 메세지 publish 할 때 사용했던 프로퍼티를 확인할 수 있습니다.
  • 메세지 내용(payload)도 잘 온 것을 확인할 수 있습니다.




🙌 고생하셨습니다.

여기까지 글을 보면서 실습하신 분들이 있다면... 정말 고생 많으셨습니다!

다음에는 기회가 되면 Spring Boot 와 연동하여 RabbitMQ 를 사용하는 실습
게시물을 작성해보도록 하겠습니다.

이번 글은 여기서 마치도록 하겠습니다.
읽어주셔서 감사합니다!

다음 글 - Spring Boot 로 다루는 RabbitMQ



참고 링크

profile
백엔드 개발자로 일하고 있는 식빵(🍞)입니다.

0개의 댓글