메시지 큐(Message Queue) & Node.js에서 celery queue로 메시지 보내기 | 오늘의 맛집은? 돼지맨숀

Sanghwa Lee·2022년 3월 3일
2
post-thumbnail

메시지 큐 사용 이유

기존에는 Service server가 Inference server로 요청을 보낼 때 HTTP 통신을 했다. 여기엔 몇 가지 문제가 있었다.

  1. Inference time이 너무 오래 걸려 응답을 기다리지 않고 싶다.
  2. Inference server에서 여러 개의 요청을 비동기로 처리하고 싶다.
  3. HTTP 통신은 보안에 취약하다.

문제 1, 3을 해결하기 위해 메시지 큐를 사용하기로 했고 그중에서도 비동기 작업 처리를 위해 셀러리 큐를 사용하기로 했다.

메시지 보내기

MsgIssuer.js

'use strict';

const amqplib = require('amqplib');
const Logger = require('Logger');

class MsgIssuer {

    constructor() {
        this.rabbitMqUrl = 'amqp://userid:userpw@serverIP:PORT';
    }

    issueTask(task, kwargs) {
        this._issuing(task, kwargs, 'cbill0');
    }

    _issuing(task, kwargs, idHead) {
        const open = amqplib.connect(this.rabbitMqUrl);

        const body = {
            id: `${idHead}-${(new Date()).getTime()}`,
            task: task,
            args: [],
            kwargs,
            retries: 0,
            timelimit: [null, null],
        }
        console.log(body);
        // Publisher
        const routingKey = 'celery';
        open.then(function (conn) {
            return conn.createChannel();
        }).then(function (ch) {
            // return ch.purgeQueue(routingKey); // to purge message in the queue
            return ch.assertQueue(routingKey)
                .then(function (ok) {
                    console.log(ok);
                    return ch.sendToQueue(
                        routingKey,
                        Buffer.from(JSON.stringify(body)),
                        {
                            contentType: 'application/json',
                            contentEncoding: 'UTF-8',
                            deliveryMode: 1,
                        });
                });
        }).catch(Logger.error);
    }
}
module.exports = MsgIssuer;

MsgIssuer class를 만들어 관련 method를 묶었다. 생성자에서는 큐의 url을 올바르게 넣으면 MsgIssuer 객체가 생성될 때 큐가 연결된다. userid, userpw는 기본적으로 guest를 사용하면 되고 serverIP는 큐를 연결해 놓은 서버의 IP 주소를 넣으면 된다. 이때 큐를 연결한 포트 번호를 PORT에 쓰면 되는데, 보통 5672번을 사용하는 것 같다. 우리는 Inference server에 celery queue를 연결했고, 그건 다른 팀원이 맡아주셨다.

외부에서 issueTask method를 호출해 issue를 발생시킬 수 있고, issueTask에서 id의 head를 넣어 _issuing method를 호출한다. body는 큐에 담기는 메시지의 구성을 의미한다. 나는 이 메시지를 사용하는 팀원과 협의하여 bodykwargs 부분에 모든 데이터를 넣는 것으로 결정했다. ch.assertQueue(routingKey).then 안에 있는 console.log(ok); 를 통해 큐의 상태를 콘솔에서 볼 수 있다.

메시지-보내는-파일.js

const MsgIssuer = require('./MsgIssuer');

let queue = new MsgIssuer();
let message = {
  ...
}
queue.issueTask('tasks.report.inference', message);

외부 파일에서 이렇게 사용하면 된다. 대충 큐 생성부터는 함수나 라우터 안에서 하면 된다. 여기서 message는 메시지 bodykwargs가 된다. task 이름도 메시지를 사용하는 팀원과 맞춰줘야 한다.

메시지 보내는 건 은근(?) 어렵지 않았다. 이후가 어려웠지 ... TBC

오늘의 맛집

난 아직도 그날에 살고 있어 ... 인턴이 끝났던 날 ㅜㅜ

판교 아비뉴프랑에 있는 돼지맨숀이다. 돼지고기 맛집이다. 내 밥친구 말고 다른 사람들이랑 고깃집 갈 때는 구울 사람이 정해져 있지 않으니까 직원이 구워주는 곳을 가게 된다. 그런 점에서 이곳이 좋았고, 고기도 두툼하고 맛있었다. 삼겹살은 너무 심하게 빠삭 익혀주셔서 목살이 더 맛있었다. 고기에 얹어 먹을 수 있는 양념도 여러 가지였다. 와사비도 내가 많이 먹을 수 있을 만큼 맵지 않았다. 아 요즘 밖에 안 나가니까 외식하고 싶어져써 ~

여기서 유명한 청어알 비빔밥이다. 이렇게 보기엔 순해 보이지만 청어알도 빨갛고 양념도 빨개서 비비면 엄청 빨개진다. 나는 원래 청어알 파스타도 즐겨 먹었기 때문에 이게 진짜 맛있었다! 감칠맛이 대박이양

이 날따라 내 식탐이 엄청나서 술도 안 먹었는데 4명이서 12만원이나 먹었다. 일반 고깃집보다는 비싼 편이긴 한데 비슷한 서비스를 제공하는 프랜차이즈들(하남돼지집 등)과 비교하면 합리적인 가격인 것 같았다.

profile
맛집 보고 가세요 😀

2개의 댓글

comment-user-thumbnail
2023년 11월 27일

돼지맨숀^^ 맛있겠네요~^^

1개의 답글