이 글에서 나오는 모든 소스 코드는 저의 github 에 올려놨습니다.
글을 읽다가 막히거나 애매할 때 참고하시기 바랍니다.
이전 글에서는 RabbitMQ
가 무엇이고, 왜 필요한지를 설명하고,
RabbitMQ
에서 제공하는 Java API
를 통해서 RabbitMQ
서버와 통신를 해봤습니다.
그리고 그 과정에서 RabbitMQ
의 필수 항목들인...
Publisher
Exchange
Queue
Binding
Channel
에 대해서도 어느정도 파악해봤습니다.
이번 글에서는 Spring Boot
에 RabbitMQ
를 적용해보고,
코드를 가볍게 실행해볼 수 있는 테스트 프로젝트를 만드는 시간을 갖겠습니다.
참고: 작성자의 개발환경
IDE
:IntelliJ IDEA 2024.1.2 (Ultimate Edition)
JDK
:azul-jdk 21
Spring Boot
:ver.3.3.0
Docker
:ver.26.1.1
docker engine 을 실행시키고 아래처럼 명령어를 작성해서 rabbitMQ docker instance 를
하나 생성해줍니다. RabbitMQ 와 소통하기 위한 5672
포트와 관리자 페이지 접속을 위한 15672
포트는 꼭 바인딩해주세요~
docker run -it --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management
먼저 maven
기반의 Spring Boot
프로젝트를 생성해주시기 바랍니다.
그리고 나서 pom.xml
을 아래와 같이 작성해주세요.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.3.0</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>coding.toast</groupId>
<artifactId>spring-boot-rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-boot-rabbitmq</name>
<description>spring-boot-rabbitmq</description>
<properties>
<java.version>21</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<!-- spring-boot-starter-web 을 사용하지 않으면 필요합니다. -->
<!--<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>-->
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
코딩의 순서는 다음과 같이 하겠습니다.
spring boot 의 핵심 기능 중 하나인 자동 설정(Auto configuration) 의 강력함은
이 글을 읽으시는 모든 분들이 아시리라 생각합니다.
그리고 현재 저희가 의존성으로 넣은 spring-boot-starter-amqp
덕분에 이러한
rabbitMQ 관련 자동 설정이 동작합니다.
RabbitMQ 자동 설정은 RabbitAutoConfiguration
클래스를 확인하면 됩니다.
이 클래스에서는 핵심적으로 아래 3가지 타입의 인스턴스를 생성합니다.
그렇다면 이 자동설정 클래스에 개발자의 설정값(RabbitMq 서버 호스트, 포트번호 등)
을 설정하려면 어떻게 할까요? 이건 RabbitProperties
클래스를 참고하면 됩니다.
애노테이션을 보면 알겠지만 spring.rabbitmq
로 시작하는 설정값들을
application.properties
에 적용해주면 됩니다.
저는 RabbitProperties
클래스를 참조하여
아래와 같은 설정들을 application.properties
작성했습니다.
# rabbitmq 호스트 및 포트 번호
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
# 인증이 필요한 경우에 사용되는 id,pw
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#관리자 기능(AmqpAdmin) 사용을 원하지 않으면 false 로 하시기 바랍니다.
#spring.rabbitmq.dynamic=false
application.yaml 로 작성하시는 분들은 아래처럼...
spring:
rabbitmq:
dynamic: true
host: localhost
port: 5672
username: guest
password: guest
먼저 생성하고자 하는 Exchange, Queue, Binding 을 빈으로 등록하겠습니다.
package me.dailycode.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqComponentsConfig {
@Bean
DirectExchange myDirectExchange() {
return ExchangeBuilder.directExchange("my.direct.exchange")
.durable(true).build();
}
@Bean
TopicExchange myTopicExchange() {
return ExchangeBuilder.topicExchange("my.topic.exchange")
.durable(true).build();
}
@Bean
Queue queue1() {
return QueueBuilder.durable("my.first.queue").build();
}
@Bean
Queue queue2() {
return QueueBuilder.durable("my.second.queue").build();
}
@Bean
Binding directBinding1(Queue queue1, DirectExchange myDirectExchange) {
return BindingBuilder.bind(queue1).to(myDirectExchange).with("my.daily.code");
}
@Bean
Binding directBinding2(Queue queue2, DirectExchange myDirectExchange) {
return BindingBuilder.bind(queue2).to(myDirectExchange).with("your.daily.code");
}
@Bean
Binding topicBinding1(Queue queue1, TopicExchange myTopicExchange) {
return BindingBuilder.bind(queue1).to(myTopicExchange).with("*.*.code");
}
@Bean
Binding topicBinding2(Queue queue2, TopicExchange myTopicExchange) {
return BindingBuilder.bind(queue2).to(myTopicExchange).with("my.*.*");
}
}
참고로 RabbitMQ
와 통신하는 핵심은 Provider
, Consumer
코드입니다.
그리고 해당 코드에서는 Exchange Name
, Queue Name
만 알면 통신이
가능하기 때문에 위처럼 거창하게 Exchange
, Queue
, Binding
타입의
인스턴스를 직접 생성할 필요는 없습니다.
다만 저는 애플리케이션에서 RabbitMQ
의 어떤 것들과 소통하는지
빠르게 참고하기 위해서 위처럼 작성한 것입니다.
package me.dailycode.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Configuration;
import java.util.List;
@Configuration
public class RabbitMqCreationConfig {
// 여기서 중요한 것은 AmqpAdmin 이다. 이걸 통해서
// RabbitMQ Admin UI 에서 사용하던 기능들을 사용할 수 있다.
public RabbitMqCreationConfig(AmqpAdmin rabbitAdmin,
DirectExchange myDirectExchange,
TopicExchange myTopicExchange,
List<Queue> queueList,
List<Binding> bindingList) {
// Exchange 생성 (RabbitMQ 에 없는 경우에만, 이미 있으면 ignore)
rabbitAdmin.declareExchange(myTopicExchange);
rabbitAdmin.declareExchange(myDirectExchange);
// Queue 생성 (RabbitMQ 에 없는 경우에만, 이미 있으면 ignore)
for (Queue queue : queueList) {
rabbitAdmin.declareQueue(queue);
}
// Binding 생성 (RabbitMQ 에 없는 경우에만, 이미 있으면 ignore)
for (Binding binding : bindingList) {
rabbitAdmin.declareBinding(binding);
}
}
}
참고
RabbitMQ
서버에Exchange
,Queue
,Binding
들이
모두 생성된 상태이면 위처럼 코드를 작성할 필요가 전혀 없습니다.
그냥 바로Publisher
,Consumer
코드를 작성하시면 됩니다.
위의 코드를 실행하고 RabbitMq Admin UI
에 접속하여 결과를 확인합니다.
my.first.queue
에 있는 binding
목록 확인my.second.queue
에 있는 binding
목록 확인이제 만든 Exchange 에 메세지를 전송할 Publisher 를 만들어 봅시다.
저는 Message
전송 시에 Json 형태의 메시지를 주고 받기
위해서 Message
통신용 설정 클래스를 아래와 같이 추가했습니다.
package coding.toast.rabbitmq.config;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.autoconfigure.amqp.RabbitTemplateCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MessageCommunicationConfig {
public static final String MESSAGE_CONVERTER_NAME
= "jacksonMessageConverter";
/**
* 전송 또는 수신하는 메시지에 대한 변환을 도와주는 컨버터 빈 등록
*/
@Bean(name = MESSAGE_CONVERTER_NAME)
public MessageConverter jacksonMessageConverter() {
ObjectMapper configure
= new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
return new Jackson2JsonMessageConverter(configure);
}
/**
* RabbitAutoConfiguration 에서 제공하는 default rabbitTemplate 에
* 추가적인 설정을 하려면 아래처럼 하면 됩니다!
*/
@Bean
public RabbitTemplateCustomizer rabbitTemplateCustomizer() {
return rabbitTemplate -> rabbitTemplate.setMessageConverter(jacksonMessageConverter());
}
// 참고 (1):
// 위처럼 ??Customizer 라는 이름으로 끝나는 클래스가
// RabbitAutoConfiguration 클래스에 많이 등장합니다.
// 잘 활용하면 자동으로 생성된 인스턴스에 추가적인 설정을 할 수 있습니다!
// 참고 (2):
// 개인의 rabbitTemplate 을 생성하고 싶다면 아래처럼하셔도 됩니다!
// 다만 아래처럼 설정하는 순간 기존에 RabbitAutoConfiguration 에서 생성되는
// rabbitTempalte 는 더 이상 생성되지 않습니다.
/*
@Bean
public RabbitTemplate jsonRabbitTemplate(
RabbitTemplateConfigurer configurer,
ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate();
configurer.configure(template, connectionFactory);
template.setMessageConverter(messageConverter());
return template;
}
*/
}
추가적으로 Json 형태로 메세지를 주고 받을 때 기반이 되는 POJO 클래스(PersonInfo)를 하나 생성했습니다.
package coding.toast.rabbitmq.data;
public record PersonInfo(String name, int age) {}
이제 Publisher 와 Consumer 간에 json 형태의 메세지를 주고 받기 위한
기본 설정은 끝났습니다! 이제 메세지를 날려보고, 받아봅시다!
package coding.toast.rabbitmq.provider;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
public class MyMessagePublisher {
private final TopicExchange myTopicExchange;
private final DirectExchange myDirectExchange;
/**
* 자동설정에 의해 생성된 rabbitTemplate 사용,
* 참고로 MessageCommunicationConfig 에서 커스텀 설정도 마친 상태입니다!
* 커스텀 설정에 의해서 default 로 Message 를 json 형태로 전송합니다.
*/
private final RabbitTemplate jsonRabbitTemplate;
/**
* 다이렉트 메세지 전송
*/
public void directMsgPub(String routingKey, Object msg) {
jsonRabbitTemplate.convertAndSend(myDirectExchange.getName(), routingKey, msg);
}
/**
* 토픽 메세지 전송
*/
public void topicMsgPub(String routingKey, Object msg) {
jsonRabbitTemplate.convertAndSend(myTopicExchange.getName(), routingKey, msg);
}
}
package coding.toast.rabbitmq.consumer;
import coding.toast.rabbitmq.data.PersonInfo;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
// 앞서 설정한 메시지 변환기 Bean 의 명칭입니다.
import static coding.toast.rabbitmq.config.MessageCommunicationConfig.MESSAGE_CONVERTER_NAME;
@Component
public class MyMessageConsumer {
@RabbitListener(queues = "my.first.queue",
messageConverter = MESSAGE_CONVERTER_NAME)
public void firstQueueListener(PersonInfo personInfo) {
System.out.printf("Message From my.first.queue => %s%n", personInfo);
}
@RabbitListener(queues = "my.second.queue",
messageConverter = MESSAGE_CONVERTER_NAME)
public void secondQueueListener(PersonInfo personInfo) {
System.out.printf("Message From my.second.queue => %s%n", personInfo);
}
}
@RabbitListener
를 사용해서 앞서 생성한 2개의 큐에 Subscribe
했습니다.
이제부터 큐에 들어오는 메세지는 모두 console 에 찍히게 될 겁니다.
간단하게 한번 Publish 를 해보고, MyMessageConsumer
에 의해서
전달받은 메세지를 정상적으로 수신해서 콘솔에 찍히는지 확인해봅시다.
package coding.toast.rabbitmq;
import coding.toast.rabbitmq.data.PersonInfo;
import coding.toast.rabbitmq.provider.MyMessagePublisher;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class SpringBootRabbitmqApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootRabbitmqApplication.class, args);
}
@Bean
public CommandLineRunner commandLineRunner(MyMessagePublisher publisher) {
return args -> {
// Binding 관련 내용을 까먹었으면 RabbitMqComponentsConfig 클래스를 다시 참조해주세요~
// DirectExchange
// => directBinding1 (routing_key = my.daily.code)
// => my.first.queue 로 메세지 전송
publisher.directMsgPub("my.daily.code",
new PersonInfo("MY_DAILY_CODE", 10));
// DirectExchange
// => directBinding2(routing_key = your.daily.code)
// => my.second.queue 로 메세지 전송
publisher.directMsgPub("your.daily.code",
new PersonInfo("YOUR_DAILY_CODE", 20));
// TopicExchange
// => topicBinding1(routing_key = *.*.code)
// => my.first.queue 로 메세지 전송
publisher.topicMsgPub("nice.cool.code",
new PersonInfo("NICE_COOL_CODE", 30));
// TopicExchange
// => topicBinding2(routing_key = my.*.*)
// => my.second.queue 로 메세지 전송
publisher.topicMsgPub("my.daily.life",
new PersonInfo("MY_DAILY_LIFE", 40));
};
}
}
코드를 실행하면 아래처럼 로그가 잘 찍히는 것을 확인할 수 있습니다.
조금 더 응용하면 아래처럼 Controller 를 통해서 메세지를 Publish 할 수도 있습니다.
@RestController
@RequiredArgsConstructor
public class MessageSendController {
private final MyMessagePublisher publisher;
@PostMapping("/sendMsg")
PersonInfo sendMsg(@RequestBody PersonInfo personInfo) {
System.out.println("personInfo = " + personInfo);
publisher.directMsgPub("my.direct.queue", personInfo);
return personInfo;
}
}
만약 여기까지 읽고 실습하신 모든 분들이 계시다면 정말 고생하셨고,
긴 글 읽어주셔서 정말 감사합니다~