Redis 를 공부하면서 Redis 의 Mesaage Queuing 을 공부하다가 Kafka 와 RabbitMQ 에 대해 알게 되었다.
내가 일하고 있는 회사에서 Kafka 를 사용하기에는 시스템 대비 Learning Curve 및 데이터 사용량이 적기도 하고 RabbitMQ 가 조금더 적합해보여서 정리해보려고 한다.
AMQP를 구현하여 서버간 메세지(데이터)를 전달해주는 오픈 소스 메시지 브로커 소프트웨어 ( 메시지 지향 미들웨어 )
메시지 지향 미들웨어를 위한 개방형 표준 응용 계층 프로토콜
종류 | 설명 | 특징 |
---|---|---|
Direct | Routing Key 가 정확히 일치하는 Queue 에게 Message 전송 | Unicast or Multicast |
Topic | Routing Key 의 패턴이 일치하는 Queue 에게 Message 전송 | Multicast |
Headers | ( Key:Value ) 로 이루어진 Header 값 을 기준으로 일치하는 Queue 에게 Message 전송 | Multicast |
Fanout | 해당 Exchange 에 등록 된 모든 Queue 에게 Message 전송 | Broadcast |
<!-- RabbitMQ Dependency-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
host: localhost // RabbitMQ host ip
port: 5672 // RabbitMQ port
username : guest // RabbitMQ 웹 관리 콘솔 아이디
password: guest // RabbitMQ 웹 관리 콘솔 비밀번호
queue:
name: sample-queue // 사용할 queue 이름
exchange:
name: sample-exchange // 사용할 exchange 이름
routing:
key : key
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@RequiredArgsConstructor
@Configuration
public class RabbitMQConfig {
@Va
@Value("${spring.rabbitmq.queue.name}")
private String queueName;
@Value("${spring.rabbitmq.exchange.name}")
private String exchangeName;
@Value("${spring.rabbitmq.routing.key}")
private String routingKey;
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String userName;
@Value("${spring.rabbitmq.password}")
private String password;
// org.springframework.amqp.core.Queue
@Bean
public Queue queue() {
return new Queue(queueName);
}
/**
* 지정된 Exchange 이름으로 Direct Exchange Bean 을 생성
*/
@Bean
public DirectExchange directExchange() {
return new DirectExchange(exchangeName);
}
/**
* 주어진 Queue 와 Exchange 을 Binding 하고 Routing Key 을 이용하여 Binding Bean 생성
* Exchange 에 Queue 을 등록
**/
@Bean
public Binding binding(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(routingKey);
}
/**
* RabbitMQ 연동을 위한 ConnectionFactory 빈을 생성하여 반환
**/
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(this.host);
connectionFactory.setPort(this.port);
connectionFactory.setUsername(this.userName);
connectionFactory.setPassword(this.password);
return connectionFactory;
}
/**
* RabbitTemplate
* ConnectionFactory 로 연결 후 실제 작업을 위한 Template
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
return rabbitTemplate;
}
/**
* 직렬화(메세지를 JSON 으로 변환하는 Message Converter)
*/
@Bean
public MessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
@RequiredArgsConstructor
@Service
public class RabbitMqService {
@Value("${spring.rabbitmq.queue.name}")
private String queueName;
@Value("${spring.rabbitmq.exchange.name}")
private String exchangeName;
@Value("${spring.rabbitmq.routing.key}")
private String routingKey;
private final RabbitTemplate rabbitTemplate;
/**
* 1. Queue 로 메세지를 발행
* 2. Producer 역할 -> Direct Exchange 전략
**/
public void sendMessage(MessageDto messageDto) {
log.info("messagge send: {}",messageDto.toString());
this.rabbitTemplate.convertAndSend(exchangeName,routingKey,messageDto);
}
/**
* 1. Queue 에서 메세지를 구독
**/
@RabbitListener(queues = "${rabbitmq.queue.name}")
public void receiveMessage(MessageDto messageDto) {
log.info("Received Message : {}",messageDto.toString());
}
}
@RequiredArgsConstructor
@RestController
public class RabbitMqController {
private final RabbitMqService rabbitMqService;
@PostMapping("/send/message")
public ResponseEntity<String> sendMessage(
@RequestBody MessageDto messageDto
) {
this.rabbitMqService.sendMessage(messageDto);
return ResponseEntity.ok("Message sent to RabbitMQ");
}
}
RabbitMQ 에 관한 내용과 Spring Boot 에서의 연계까지 모두 정리하였다.
조금만 더 공부해서 응용하면 실무에서 문제없이 사용가능할 것 같다.