- 오픈소스의 메시지 브로커
- 홈페이지: https://www.rabbitmq.com/
(출처: http://corecode.pe.kr/2018/02/04/RabbitMQ_usage/)
- Route key가 정확히 일치하는 queue에 메시지 전송
- Unicast
- 예제: https://www.rabbitmq.com/tutorials/tutorial-four-python.html
(출처: RabbitMQ 공식 페이지)
위 그림은 error 메세지만 저장소에 기록하고, info와 warning을 포함한 모든 정보를 디스플레이에 출력할 때를 나타내는 모식도 (C1 : 저장소, C2 : 디스플레이)
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
exchange='direct_logs', routing_key=severity, body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
for severity in severities:
channel.queue_bind(
exchange='direct_logs', queue=queue_name, routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
# error 큐로 온 메시지는 로그 파일에 쓰고, info, warning, error 메시지는 화면에 출력
$ python receive_logs_direct.py error > logs_from_rabbit.log
$ python receive_logs_direct.py info warning error
# 프로듀서가 메시지를 발송
$ python emit_log_direct.py error "Run. Run. Or it will explode."
$ python emit_log_direct.py info "Run. Run. Or it will explode."
(출처: RabbitMQ 공식 페이지)
- Routing key 패턴이 일치하는 Queue에 메시지 전송
- Multicast
- 예제: https://www.rabbitmq.com/tutorials/tutorial-five-python.html
라우팅 키가 example.orange.rabbit 인 경우 메시지가 Q1, Q2에 모두 전달
라우팅 키가 example.orange.turtle 인 경우 메시지가 Q1에만 전달
라우팅 키가 lazy.grape.rabbit인 경우엔 메시지가 Q2에 한 번만 전달 (라우팅 패턴이 여러 개 일치하더라도 하나의 큐에는 메시지가 한 번만 전달)
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
exchange='topic_logs', routing_key=routing_key, body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(
exchange='topic_logs', queue=queue_name, routing_key=binding_key)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
$ python receive_logs_topic.py "*.orange.*"
[*] Waiting for logs. To exit press CTRL+C
[x] 'example.orange.rabbit':b'msg1'
[x] 'example.orange.turtle':b'msg2'
$ python receive_logs_topic.py "*.*.rabbit" "laze.#"
[*] Waiting for logs. To exit press CTRL+C
[x] 'example.orange.rabbit':b'msg1'
[x] 'lazy.grape.rabbit':b'msg3'
# 메시지 발생
$ python emit_log_topic.py "example.orange.rabbit" "msg1"
$ python emit_log_topic.py "example.orange.turtle" "msg2"
$ python emit_log_topic.py "lazy.grape.rabbit" "msg3"
- 해당 Exchange에 등록된 모든 Queue에 메시지 전송
- Broadcast
- 예제: https://www.rabbitmq.com/tutorials/tutorial-three-python.html
(출처: RabbitMQ 공식 페이지)
모든 메시지를 C1에서는 로그파일로 남기고, C2에서는 화면에 출력함
* emit_log.py
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)
connection.close()
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
# 모든 메시지를 로그파일에도 쓰고 화면에도 출력함
$ python receive_logs.py > logs_from_rabbit.log
$ python receive_logs.py
# 메시지 발송
$ python emit_log.py
(출처: https://microservices.io/, Microservices.io)
위 그림은 MSA Saga 패턴 중 하나인 Orchestration-based saga 패턴의 한 예제이다.
그림을 보면 두 종류의 Message Broker가 있다. Customer Service Command Channel 고객 서비스 (Customer Service)에서만 메시지를 소비한다. 그렇기 때문에 direct exchange 타입으로 구현하면 된다. 마찬가지로 Create Order Saga Reply Channel은 주문 서비스 (order service)에서만 메시지를 소비한다. 그렇기 때문에 direct exchange 타입을 사용하면 된다.
한편 위 그림에는 없지마 만약 주문 서비스가 주문이 완료 되면 해당 이벤트를 발행한다고 하자. 그리고 이 이벤트는 접근 권한만 있다면 누구나 소비할 수 있게 broadcast 형태로 이벤트를 발행한다고 하자. 그러면 fanout exchange 타입으로 구현 하면 된다.