pub/sub에서 메세지 브로커 역할을 하는 RabbitMQ AMQP 서버를 구축한 다음 Spring Boot application을 만들어서 서버랑 상호작용할 수 있도록 해볼 것이다. (해당 message broker을 활용해 message를 publish/subscribe)
RabbitMQ는 AMQP를 따르는 오픈소스 메세지 브로커이기 때문에 먼저 AMQP에 대해 알아보겠다.
AMQP는 Advanced Message Queuing Protocol의 약자로 위키에서 보면
open standard application layer protocol for message-oriented middleware. Defining features of AMQP are message orientation, queuing, routing (including point-to-point and publish-and-subscribe), reliability and security.
이상한 용어가 많은데
즉 간단히 말하면 응용 계층에서 활용하는 메세지 전달/수신 관련 표준이라는 것이다.
AMQP를 따르는 MOM들은 서로 다른 언어로 구현이 되어 있어도 상호작용이 가능하다. 이 성질을 interoperable이라고 하는데, 여기에 해당하는 또다른 유명한 protocol이 바로 HTTP다. 이 프로토콜이 등장하기 전에는 API를 사용했었는데, 이러면 프로그래머들이야 해당 API를 사용하면 통일된 규격을 유지하는게 가능하나 각 API별로 내부 동작이 다 다르다는 문제점이 있었다. 반면 AMQP는 어떻게 구현하는지는 상관없고 그냥 이 통일된 표준을 따르기만 하면 서로 상호작용이 가능하다는 특징이 있다.
게다가 여러 형태의 message delivery guarantee도 제공한다. at-most-once, at-least-once, exactly-once 등. 게다가 point-to-point, pub-sub messaging 둘 다 지원한다. 이 튜토리얼에서는 후자에 관심이 많다. 심지어 보안 인증 및 암호화도 지원.
메커니즘에 대한 큰 틀을 보면 일단 메세지를 관리/분산 시키는 것을 담당하는 브로커가 존재한다. 우리가 이번에 사용하는 RabbitMQ가 여기에 해당. 메세지를 받으면 이것을 binding rule에 따라 적절한 queue에다가 보관을 한다. 이 과정을 routing이라고 한다. 그리고 메세지 수신자들은 이 queue에서 메세지를 받는다. 이 과정을 pull이라고 한다.
routing을 담당하는 agent를 exchange라고 하며, 이들이 어떻게 메세지를 적절한 queue에 routing을 하는지에 따라 또 여러개로 나뉘어진다. direct/fanout/topic/headers로 나뉘어짐. 이 exchange랑 binding 규칙이라는 것까지 함께 해서 최종적으로 routing이 이루어진다. 이것에 대해 더 알고 싶으면 이 글을 보자. 이 글도 괜찮다.
표준이라고 말한걸 보면 알겠지만, 전달되는 메세지들이 갖춰야 할 규격이 전부 명시가 되어 있다. 이 규격에는 어떻게 routing을 할지 등의 정보가 담겨있는 경우도 있다.
여하튼, 위가 AMQP에 대한 간략한 설명이고 이걸 그냥 구현한 오픈소스 프로그램이 RabbitMQ다.
게다가 플러그인도 지원을 하기 때문에 STOMP, MQTT같은 다른 프로토콜도 지원하는게 가능하다.
이 녀석과 가장 많이 비교되는 녀석은 바로 Apache Kafka다. 얘도 오픈소스 메세지 브로커다. 즉 컨셉적으로 RabbitMQ와의 차이점은 없다.
다만 둘의 내부 구현은 차이가 좀 있는데, exchange/queue/binding을 기반으로 메세지를 routing하는 것이 핵심인 RabbitMQ와 다르게 Topic/partition/ZooKeeper(최근엔 Apache Kafka Raft)를 기반으로 데이터를 스트림 형태로 전달한다는 것이다.
둘의 목표도 약간 다르다. RabbitMQ는 서로 다른 endpoint사이의 메세지 전달을 우선순위를 기반으로 전달하는, 범용적으로 사용이 가능한 브로커인 반면 Kafka는 빅데이터 스트리밍에 좀 더 특화되어 있고 이 때문에 실시간 데이터 처리와 관련해서는 RabbitMQ보다 성능이 더 좋다고 볼 수 있다.
RabbitMQ랑 별로 상관은 없고 동시성 관리에 좀 쓰이는 class다.
C++의 condition variable과 유사한 역할을 한다.
여기서는 메세지 수신이 완료될때까지 기다리는데 사용이 되었다.
message listener container 정의 / queue랑 exchange 정의 후 binding / listener 테스팅을 위해 message를 전달할 component 등을 만들 것이다.
첫번째의 경우 이전의 Redis Messaging과 과정이 크게 다르진 않다. 물론 추상적으로 그렇다는 것이고, 실제 작용하는 class들은 기존의 Redis것이 아닌 RabbitMQ에 대응되는 class들로 전부 바꿔야 한다. 여기에 해당하는건 지금 말하는 SimpleMessageListenerContainer
뿐만 아니라 ConnectionFactory
, MessageListenerAdapter
등도 해당된다.
그런데 SimpleMessageListenerContainer
형성 과정에서 추가적으로 설정해야 하는 것이 있는데 바로 QueueName
을 지정해줘야 한다는 것이다. 이건 그냥 listener이 들을 queue가 무엇인지를 지정할 뿐임.
두번째의 queue, exchange 정의 후 binding을 하는 이유는 사실 String AMQP에서 이를 요구하기 때문에 정의해놓은 것이다. queue에, 어떤 exchange 방식을 활용해가지고 message를 routing하라는 것을 지정하려면 queue, exchange를 bind한 객체를 만들어야 하며, 코드에서는 queueName
이름의 queue에 spring-boot-exchange
라는 이름을 가지는 topic exchange를 만듦, 그 topic exchange가 가질 규칙을 with
를 통해 설정. 여기서는 foo.bar.#
이 해당 topic exchange가 활용할 규칙이다.
그래서 이걸 왜 String AMQP에서 요구하는가? 보면 결국 Binding
을 사용하는 코드가 그 어디에도 존재하지 않는데도 말이다. 사실 저 binding 정의가 있어야 Spring이 application initialization때 외부의 RabbitMQ server에다가 Queue/exchange를 생성하는 것이 가능하기 때문이다. 이것도 autoconfiguration의 일종이다. 지정하지 않으면 애초에 해당 queue/exchange가 따로 돌아가고 있는 RabbitMQ에 존재하지 않게 되기 때문에 문제가 발생함.
마지막으로 message 전달 component는... 뭐 역시나 redis messaging과 매우 유사한 방식으로 message를 전달하고 있다. CommandLineRunner
을 활용하고 있으며, getLatch
method를 통해 수신이 제대로 이루어졌는지를 확인한다. (현실에선 저렇게 확인하진 않는다.)
rabbitmq-1 | 2024-01-09 08:37:11.860387+00:00 [info] <0.1340.0> accepting AMQP connection <0.1340.0> (172.24.0.1:48826 -> 172.24.0.2:5672)
rabbitmq-1 | 2024-01-09 08:37:11.883037+00:00 [info] <0.1340.0> connection <0.1340.0> (172.24.0.1:48826 -> 172.24.0.2:5672) has a client-provided name: rabbitConnectionFactory#8bffb8b:0
rabbitmq-1 | 2024-01-09 08:37:11.886074+00:00 [info] <0.1340.0> connection <0.1340.0> (172.24.0.1:48826 -> 172.24.0.2:5672 - rabbitConnectionFactory#8bffb8b:0): user 'guest' authenticated and granted access to vhost '/'
2024-01-09T17:37:11.268+09:00 INFO 24920 --- [ main] o.e.m.MessagingRabbitmqApplication : No active profile set, falling back to 1 default profile: "default"
2024-01-09T17:37:11.850+09:00 INFO 24920 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2024-01-09T17:37:11.888+09:00 INFO 24920 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#8bffb8b:0/SimpleConnection@126f1ba8 [delegate=amqp://guest@127.0.0.1:5672/, localPort=57458]
2024-01-09T17:37:11.893+09:00 INFO 24920 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable, auto-delete, or exclusive Queue (spring-boot) durable:false, auto-delete:false, exclusive:false. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2024-01-09T17:37:11.928+09:00 INFO 24920 --- [ main] o.e.m.MessagingRabbitmqApplication : Started MessagingRabbitmqApplication in 0.897 seconds (process running for 1.277)
Sending message...
Received <Hello from RabbitMQ!>