[Message Queue> Consumer 설정 정보들 알아보기

김태훈·2024년 4월 10일
0

성균관대 Skkuding

목록 보기
14/15
post-thumbnail

💡 해당 페이지의 모든 내용은 RabbitMQ 공식문서를 참고하였습니다.
Consumers | RabbitMQ

이번 시간에는 Consumer에 대해 자세히 알아보겠습니다.

Consumer가 Broker와 통신하는 방법, 그리고 어떤 정보를 주고받는지 주목하면서 공부해봅시다.

✏️ Consumer란

Consumer는 Message를 소비하고, Message에 대한 ACK 를 Broker에게 전달합니다. (이는 Publisher와 Broker간의 통신에서도 마찬가지입니다)

여기서 Consumer는 메시지 전달 요청이 수행되기 전에, 메시지 요청을 subscribe 하면서 메시지를 받을 준비를 합니다.

✏️ Consumer의 요소들

(1) Consumer Tag

Consumer Tag란 Consumer에게 붙는 Identifier입니다.

여러 Consumer가 동일한 Queue를 subsribe할 수 있습니다. 이 때, 이러한 Consumer들을 식별하고, 관리하는 데 사용하는 것이 Consumer Tag입니다.

저희 서비스 코드에는 Consumer Tag가 명확히 사용되고 있지는 않습니다.
Iris 서버에서는 Consumer Config에서 다음과 같이 consumer-tag를 사용하고 있지만, 이를 통해 Consumer를 식별하는 역할은 수행하지 않습니다.

consumer tag는 필요에 따라 사용하는 Optional한 속성입니다. (Node 진영에서는 임의의 name으로 자동으로 붙여준다고 합니다)

connector.Factory(
  connector.RABBIT_MQ,
  connector.Providers{Router: routeProvider, Logger: logProvider},
  rabbitmq.ConsumerConfig{
    AmqpURI:        uri,
    ConnectionName: utils.Getenv("RABBITMQ_CONSUMER_CONNECTION_NAME", "iris-consumer"),
    QueueName:      utils.Getenv("RABBITMQ_CONSUMER_QUEUE_NAME", "client.q.judge.submission"),
    **Ctag:           utils.Getenv("RABBITMQ_CONSUMER_TAG", "consumer-tag"),**
  },
  rabbitmq.ProducerConfig{
    AmqpURI:        uri,
    ConnectionName: utils.Getenv("RABBITMQ_PRODUCER_CONNECTION_NAME", "iris-producer"),
    ExchangeName:   utils.Getenv("RABBITMQ_PRODUCER_EXCHANGE_NAME", "iris.e.direct.judge"),
    RoutingKey:     utils.Getenv("RABBITMQ_PRODUCER_ROUTING_KEY", "judge.result"),
  },
).Connect(context.Background())

(2) Consumer LifeCycle

Consumer가 Broker와의 통신이 맺어지고 끊어지는 생명 주기를 의미합니다. 우리의 서비스에서는, 서버 시작과 동시에 서버가 죽을 때까지 계속해서 Subscribe 되어야 하므로, Consumer LifeCycle은 서버 시작부터 종료까지입니다.

(3) Connection Recovery

MQ를 Subscribe하는 Client는 RabbitMQ와 통신이 끊어질 수도 있습니다. 이런 상황을 막기 위해 Recovery 전략을 취합니다. 아직 저희 동아리에서는 이러한 전략이 존재하진 않습니다.

  • Recover connection
  • Recover channels
  • Recover queues
  • Recover exchanges
  • Recover bindings
  • Recover consumers

이러한 Recovery 전략을 취해야 하는데, 아직 이를 위한 준비가 되어있지 않습니다. (하지만 죽을일이 거의 없긴합니다. On-premise에서 사용하는 것도 아니고, AWS Resource들 끼리의 통신이니까요. 필요성이 있나? 싶기는 합니다.)

✏️ (Subscribing, “Push API”) Consumer 등록

💡 Pull API (주기적 Fetching 방식) 도 있어요 !
Pull API는 Message를 주기적으로 fetching해오는 방식으로 작동합니다.
하지만 공식문서에서 말하기를, 권장되는 방법이 아니라고 합니다.

이유는 메시지를 지속적으로 확인하는 polling 방식을 사용하기 때문에, 메시지들이 항상 Queue에 존재하는 것이 아니라면, 쓸 데없는 overhead가 생기기 때문입니다.
또, 즉각적으로 메시지가 도착하자 마자 처리가 되어야 하는 경우, 정해진 Polling 주기 때문에, 처리되기까지의 latency가 존재합니다.

Consumer가 Queue를 Subscribe하여 Queue에 들어온 Message들을 받는 방법으로, Broker가 Consumer에게 ‘Push’ 한다고 해서 Push API라고 부릅니다.

이 때, 성공적인 Subscription이 만들어지면, Consumer Identifier라고 설명했던, Consumer Tag객체가 반환됩니다. 이를 사용해, Consumer가 Subscription을 해제할 수 있습니다.

  • Nest.js Subscribe 등록 함수
createSubscriber<T>(handler: SubscriberHandler<T>, msgOptions: MessageHandlerOptions, originalHandlerName: string, consumeOptions?: ConsumeOptions): Promise<SubscriptionResult>;
  • 반환 객체인 SubscriptionResult
export interface SubscriptionResult {
    consumerTag: ConsumerTag;
}

(1) Message Properties and Delivery Metadata

모든 Delivery에는 message의 metadata와 delivery metadata가 포함되어야 합니다.
다음은 AMQP Go 라이브러리에서 정의한 Delivery 객체입니다.

type Delivery struct {
	Acknowledger Acknowledger // the channel from which this delivery arrived
	Headers Table // Application or header exchange table
	// Properties
	ContentType     string    // MIME content type
	ContentEncoding string    // MIME content encoding
	DeliveryMode    uint8     // queue implementation use - non-persistent (1) or persistent (2)
	Priority        uint8     // queue implementation use - 0 to 9
	CorrelationId   string    // application use - correlation identifier
	ReplyTo         string    // application use - address to reply to (ex: RPC)
	Expiration      string    // implementation use - message expiration spec
	MessageId       string    // application use - message identifier
	Timestamp       time.Time // application use - message timestamp
	Type            string    // application use - message type name
	UserId          string    // application use - creating user - should be authenticated user
	AppId           string    // application use - creating application id
	// Valid only with Channel.Consume
	ConsumerTag string
	// Valid only with Channel.Get
	MessageCount uint32
	DeliveryTag uint64
	Redelivered bool
	Exchange    string // basic.publish exchange
	RoutingKey  string // basic.publish routing key
	Body []byte
}

1. message metadata

Publisher와 Consumer간의 메시지 처리를 위한 정보들입니다. 이는 개발자가 “직접” 정의하여 전달합니다.
3주차에서도 살펴봤는데, Nest.js에서 publish와 관련된 코드를 다시 봐봅시다.

this.amqpConnection.publish(EXCHANGE, SUBMISSION_KEY, judgeRequest, {
      messageId: String(submission.id),
      persistent: true,
      type: PUBLISH_TYPE
})

메시지 publish당시 여러 설정값들을 message에 포함하는 것을 알 수 있습니다.

이 중, 사용하는 message metadata는 다음과 같습니다.

  1. messageId (Optional)
  2. persistent (Required)
  3. type (Optional)

metadata중에서 필수로 포함되어야할 속성은 persistent(Delivery Mode 설정 정보)속성뿐입니다. 나머지는 Application 서비스 로직에 맞게 필요하면 넣고, 불필요하면 넣지 않아도 됩니다. 저희 서비스의 경우 messageId는 곧 submissionId이므로 무조건 포함시켜야 할 message metadata 입니다.

  • message metadata 속성들
    PropertyTypeDescriptionRequired?
    Delivery modeEnum (1 or 2)2 for "persistent", 1 for "transient". Some client libraries expose this property as a boolean or enum.Yes
    TypeStringApplication-specific message type, e.g. "orders.created"No
    HeadersMap (string => any)An arbitrary map of headers with string header namesNo
    Content typeStringContent type, e.g. "application/json". Used by applications, not core RabbitMQNo
    Content encodingStringContent encoding, e.g. "gzip". Used by applications, not core RabbitMQNo
    Message IDStringArbitrary message IDNo
    Correlation IDStringHelps correlate requests with responses, see https://www.rabbitmq.com/tutorialsNo
    Reply ToStringCarries response queue name, see https://www.rabbitmq.com/tutorialsNo
    ExpirationStringhttps://www.rabbitmq.com/docs/ttlNo
    TimestampTimestampApplication-provided timestampNo
    User IDStringUser ID, https://www.rabbitmq.com/docs/validated-user-id if setNo
    App IDStringApplication nameNo
      

2. delivery metadata

Publish나 Consumer 측에서 코드로 정의하긴 하지만, 실질적으로 정의하는 주체는 Broker인 설정 정보입니다.
Broker가 설정을 한다는 뜻은, Message를 어떠한 Queue, 어떠한 Consumer에게 전달할지 정의된 속성이라는 뜻입니다.

아까 본 publish 코드들에서, Exchange SUBMISSION_KEY 정보들이 이에 포함됩니다.

(2) Message Types

Message의 Type을 정의합니다.
Nest.js에서 Broker에게 Publish하는 경우에, PUBLISH_TYPE 상수로 정의하여 사용하고 있습니다. 이러한 Type을 통해 Iris서버에서 에러를 처리합니다. ++ messageId 또한 반드시 필요하다는 것도 알 수 있습니다.

if message.Type == "" {
	result = router.NewResponse("", nil, fmt.Errorf("type(message property) must not be empty")).Marshal()
} else if message.MessageId == "" {
	result = router.NewResponse("", nil, fmt.Errorf("message_id(message property) must not be empty")).Marshal()
} else {
	result = c.router.Route(message.Type, message.MessageId, message.Body)
}
profile
기록하고, 공유합시다

0개의 댓글