RabbitMQ - 이론과 활용

이지민·2023년 5월 11일
0

RabbitMQ

주요 개념

(출처: http://corecode.pe.kr/2018/02/04/RabbitMQ_usage/)
  • Producer
    • 메시지를 생성하고 발송하는 주체
    • Exchanger를 통해 Queue에 접근
    • 이 메시지는 Queue에 저장
  • Customer
    • 메시지를 수신하는 주체
    • Queue에 직접 접근하여 메시지를 가져옴
  • Queue
    • Producer가 발송한 메시지가 Customer가 소비하기 전까지 보관되는 장소
    • 같은 이름, 같은 설정으로 기존 Queue에 연결 가능, but 같은 이름, 다른 설정으로 Queue를 생성하고자하면 에러 발생
  • Exchange
    • Producer들로부터 전달받은 메시지를 어떤 Queue로 발송할지를 결정하는 객체
    • direct, topic, headers, fanout 네가지 타입이 있음
    • 라우터 개념
  • Binding
    • Exchange에게 메시지를 라우팅 할 규칙을 지정하는 행위
    • 특정 조건에 맞는 메시지를 특정 큐에 전송하도록 설정 가능
    • Exchange 타입에 맞게 설정 되어야함 (Exchange ↔ Queue m:n binding 가능)

Exchange 타입 별 특징 및 예제

Direct

(출처: RabbitMQ 공식 페이지) 위 그림은 error 메세지만 저장소에 기록하고, info와 warning을 포함한 모든 정보를 디스플레이에 출력할 때를 나타내는 모식도 (C1 : 저장소, C2 : 디스플레이)
  • emit_log_direct.py
#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
 
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
    exchange='direct_logs', routing_key=severity, body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
  • receive_logs_direct.py
#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
 
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
 
severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)
 
for severity in severities:
    channel.queue_bind(
        exchange='direct_logs', queue=queue_name, routing_key=severity)
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
 
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
 
 
channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)
 
channel.start_consuming()
# error 큐로 온 메시지는 로그 파일에 쓰고, info, warning, error 메시지는 화면에 출력
$ python receive_logs_direct.py error > logs_from_rabbit.log
$ python receive_logs_direct.py info warning error
# 프로듀서가 메시지를 발송
$ python emit_log_direct.py error "Run. Run. Or it will explode."
$ python emit_log_direct.py info "Run. Run. Or it will explode."

Topic

(출처: RabbitMQ 공식 페이지)

라우팅 키가 example.orange.rabbit 인 경우 메시지가 Q1, Q2에 모두 전달
라우팅 키가 example.orange.turtle 인 경우 메시지가 Q1에만 전달
라우팅 키가 lazy.grape.rabbit인 경우엔 메시지가 Q2에 한 번만 전달 (라우팅 패턴이 여러 개 일치하더라도 하나의 큐에는 메시지가 한 번만 전달)

  • emit_log_topic.py
#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
 
routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
    exchange='topic_logs', routing_key=routing_key, body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
  • receive_logs_topic.py
#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
 
result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue
 
binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)
 
for binding_key in binding_keys:
    channel.queue_bind(
        exchange='topic_logs', queue=queue_name, routing_key=binding_key)
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
 
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
 
 
channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)
 
channel.start_consuming()
$ python receive_logs_topic.py "*.orange.*"
 [*] Waiting for logs. To exit press CTRL+C
 [x] 'example.orange.rabbit':b'msg1'
 [x] 'example.orange.turtle':b'msg2'
$ python receive_logs_topic.py "*.*.rabbit" "laze.#"
 [*] Waiting for logs. To exit press CTRL+C
 [x] 'example.orange.rabbit':b'msg1'
 [x] 'lazy.grape.rabbit':b'msg3'
# 메시지 발생
$ python emit_log_topic.py "example.orange.rabbit" "msg1"
$ python emit_log_topic.py "example.orange.turtle" "msg2"
$ python emit_log_topic.py "lazy.grape.rabbit" "msg3"

Fanout

(출처: RabbitMQ 공식 페이지) 모든 메시지를 C1에서는 로그파일로 남기고, C2에서는 화면에 출력함 * emit_log.py
#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='logs', exchange_type='fanout')
 
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)
connection.close()
  • receive_logs.py
#!/usr/bin/env python
import pika
 
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='logs', exchange_type='fanout')
 
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
 
channel.queue_bind(exchange='logs', queue=queue_name)
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] %r" % body)
 
channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)
 
channel.start_consuming()
# 모든 메시지를 로그파일에도 쓰고 화면에도 출력함
$ python receive_logs.py > logs_from_rabbit.log
$ python receive_logs.py
# 메시지 발송
$ python emit_log.py

RabbitMQ 활용

주문 서비스 예시

(출처: https://microservices.io/, Microservices.io) 위 그림은 MSA Saga 패턴 중 하나인 Orchestration-based saga 패턴의 한 예제이다. 그림을 보면 두 종류의 Message Broker가 있다. Customer Service Command Channel 고객 서비스 (Customer Service)에서만 메시지를 소비한다. 그렇기 때문에 direct exchange 타입으로 구현하면 된다. 마찬가지로 Create Order Saga Reply Channel은 주문 서비스 (order service)에서만 메시지를 소비한다. 그렇기 때문에 direct exchange 타입을 사용하면 된다. 한편 위 그림에는 없지마 만약 주문 서비스가 주문이 완료 되면 해당 이벤트를 발행한다고 하자. 그리고 이 이벤트는 접근 권한만 있다면 누구나 소비할 수 있게 broadcast 형태로 이벤트를 발행한다고 하자. 그러면 fanout exchange 타입으로 구현 하면 된다.
profile
개발하는 사람입니다.

0개의 댓글