시작하기에 앞서, 아래의 내용을 build.gradle에 적어준다.
implementation 'org.springframework.boot:spring-boot-starter-amqp'
ConnectionFactory connectionFactory = new CachingConnectionFactory();
AmqpAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareQueue(new Queue("myqueue"));
AmqpTemplate template = new RabbitTemplate(connectionFactory);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");
@Configuration
public class RabbitConfiguration {
@Bean
public CachingConnectionFactory connectionFactory() {
return new CachingConnectionFactory("localhost");
}
@Bean
public RabbitAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
@Bean
public Queue myQueue() {
return new Queue("myqueue");
}
}
스프링 AMQP는 'spring-amqp'와 'spring-rabbit' 두 가지의 모듈로 이루어진다.
spring-amqp 모듈은 'org.springframework.amqp.core' 패키지를 포함한다.
이것의 의도는 개발자가 특정한 라이브러리나 broker구현에 의존하지 않게 하기 위함이다.
개발자는 추상화 계층에 대해서만 개발할 수 있기 때문에, 코드의 이식성이 높아질 수 있다.
코드는 broker 모듈에 의해 구현된다. 현재 RabbitMQ만 존재한다.
AMQP는 프로토콜 레벨에서 작동하므로 원칙적으로는 AMQP를 지원하는 어떤 broker에서도 작동할 수 있다.
0-9-1 AMQP는 Message 클래스 혹은 인터페이스를 정의하지 않는다.
basicPublish()와 같은 작업을 수행할 때 데이터의 내용을 byte 배열을 파라미터로 받고 추가 속성을 별도로 파라미터로 받는다.
Spring AMQP는 Message 클래스를 정의한다. Message 클래스의 정의는 다음과 같다.
public class Message {
private final MessageProperties messageProperties;
private final byte[] body;
public Message(byte[] body, MessageProperties messageProperties) {
this.body = body;
this.messageProperties = messageProperties;
}
public byte[] getBody() {
return this.body;
}
public MessageProperties getMessageProperties() {
return this.messageProperties;
}
}
Exchange 인터페이스는 producer가 보내는 AMQP Exchange를 나타낸다.
Exchange 인터페이스는 다음과 같다.
public interface Exchange {
String getName();
String getExchangeType();
boolean isDurable();
boolean isAutoDelete();
Map<String, Object> getArguments();
}
Exchange의 ExchangeType은 'direct', 'topic', 'fanout', 'header'가 있다.
ExchangeType에 따라 Queue에 Binding을 처리하는 방식이 다르다.
ExchangeType의 특징은 다음과 같다.
direct: 고정된 routing key로 queue를 binding한다.
topic: 1개의 와일드 카드를 포함하는 *와 0개 이상의 와일드 카드를 포함하는 #과 같은 routing pattern을 지원한다.
fanout: routing key를 고려하지 않고, binding된 모든 queue에 메시지를 보낸다.
header: x-match의 값에 따라 메시지 전달 방식이 달라진다.
Queue 클래스는 consumer가 메시지를 받는 queue와 속성들을 나타낸다.
Queue 클래스의 정의는 다음과 같다.
public class Queue {
private final String name;
private volatile boolean durable;
private volatile boolean exclusive;
private volatile boolean autoDelete;
private volatile Map<String, Object> arguments;
/**
* The queue is durable, non-exclusive and non auto-delete.
*
* @param name the name of the queue.
*/
public Queue(String name) {
this(name, true, false, false);
}
// Getters and Setters omitted for brevity
}
Binding 클래스는 Queue를 Exchange에 binding 한다. 이는 producer와 consumer를 연결하는 것을 의미한다.
Binding 클래스의 instance는 다음과 같이 생성할 수 있다.
new Binding(someQueue, someDirectExchange, "foo.bar");
new Binding(someQueue, someTopicExchange, "foo.*");
new Binding(someQueue, someFanoutExchange);
Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");
Binding 클래스의 instance는 연결에 대한 데이터만 보유한다. 즉, 해당 instance가 'binding을 활성화 함'의 의미가 아니다.
하지만, 이후에 나오는 AmqpAdmin 클래스는 Binding 클래스의 instance를 이용하여 broker에서 binding 작업을 유발할 수 있다.
멋있어요~