대량 메시지 파이프라인 최적화 해보기 1편(Rate Limit)

Jeerryy·2025년 4월 24일
0

Optimization

목록 보기
1/2

개요

한창 장마철일 때에는 강수량이 부쩍 늘어 하천이 범람하여 침수되었다는 안타까운 소식들이 많이 들립니다.

이러한 사태를 미연에 방지하기 위해 배수 시설을 점검하는 방법이 있을 것이고 더나아가 스케일이 커질 경우 댐을 건설하여 실시간 강수량을 파악하여 직접적으로 유량을 조절하는 방법이 있습니다.
댐

구조

제가 운영하는 서비스에서는 수많은 IoT 데이터가 물밀듯이 인입되고 있습니다.

구조는 아래와 같이 이루어져있습니다.
구조

  1. 장비 서버를 통해 사용량 데이터 수집

  2. 사용량 데이터를 이용해 PubSub 메시지 발행

  3. Pipeline Server(이하 Server)에서 PubSub message를 InfluxDB 저장

PoC 단계에서는 문제가 없는 것을 확인하고 개발 환경부터 파이프라인을 태우기 시작했고 후에 운영 환경에서도 VM을 띄어 사용했습니다.

문제 발현

하지만 시간에 지남에 따라 데이터가 늘게 되었고 몇몇 문제가 발생했습니다.

  1. 메시지가 하나의 topic에 발행이 되어 개발/운영 환경에 들어오는 메시지가 같았고

  2. 비용 문제로 고스펙의 VM을 사용하지 못하고 (운영/개발 환경의 VM 스펙 차이 포함)

  3. 클라이언트 측에서 데이터 재수집 기능을 사용할 경우 순간적으로 인입되는 데이터가 늘어납니다.

따라서 Server 에서는 메시지가 들어오는대로 메시지를 소화했기 때문에 한번에 많은 기기의 재수집을 요청하거나 기간이 길게 요청할 경우 개발 VM은 재수집과 동시에 임계치에 도달했고 운영 VM도 뒤늦게 임계치에 도달하는 상황이 발생했습니다.

PoC 단계에서부터 PubSub의 flowControl을 이용하여 어느 정도 메시지 흐름을 조정했지만 점차 메시지가 많아지면서 Server보다 VM 처리 능력이 턱없이 모자라게 됐고 해결 방법을 강구하게 됐습니다.

해결 방법은 명확했습니다.

  1. VM Scale Up

  2. Rate Limit Algorithm 적용

이미 비용 문제를 겪고 있었기 때문에 2번을 채택하게 됐습니다.

Rate Limit Algorithm

Rate Limit Algorithm은 간단하게 얘기하면 서비스 가용성을 확보하기 위한 수단입니다.

알고리즘에 대한 자세한 내용은 좋은 아티클이 있어 아티클로 대체합니다! 아티클을 먼저 읽고 아래 내용을 읽는 것이 이해하기 더 편할 것이라고 생각합니다.

서비스 가용성 확보에 필요한 Rate Limiting Algorithm에 대해

Leaky Bucket

Server에서는

  1. Pubsub Message를 InfluxDB에 저장하는 로직

  2. 저장에 성공한 1분 데이터를 5분 데이터로 DownSampling 하는 로직

    총 2군데에 조정이 필요했고 여러가지의 알고리즘 중에 Leaky Bucket과 배치 처리 알고리즘을 사용하기로 결정했습니다.

데이터 주입 속도를 조절하여 일정한 수의 데이터를 유출한다는 Leaky Bucket의 특징은 서버의 넉넉한 메모리, Streaming pull하고 있는 Subscriber, 재수집으로 인해 급증할 수 있는 Pubsub Message 등 현재 상황에 가장 최적의 알고리즘이라고 생각했습니다.

Leaky Bucket 알고리즘 코드는 아래와 같으며 LeakRate는 환경 변수에서 수정할 수 있도록하여 개발/운영별로 다르게 조절할 수 있도록 했습니다.

this.messageBuffer.push(...createPointList(processedUsageData, measurementName));

// 현재 시간과 마지막 누출 시간 간의 경과 시간 계산 (초 단위)
const currentTimestamp = dayjs().valueOf();
const elapsedTime = (currentTimestamp - this.lastLeakTime) / 1000;

// 누출될 메시지 수 계산
let messagesToLeak = elapsedTime * this.INFLUXDB_PUBSUB_MESSAGE_LEAK_RATE;

// 버킷에서 메시지를 누출하여 처리
while (this.messageBuffer.length > 0 && messagesToLeak > 0) {
  const point = this.messageBuffer.shift();
  this.writeApi.writePoint(point);
  messagesToLeak--;
}
this.lastLeakTime = currentTimestamp;

배치 처리 알고리즘도 마찬가지로 환경별로 수치 조정이 용이하도록 설정해두었습니다.

writeSuccess: (lines: Array<string>): void => {
  this.logger.log(`${lines.length}건 저장을 완료헀습니다.`);
  // 데이터 삽입에 성공한 데이터 중 `usage_1m measurement` 데이터 필터링
  this.succeedWrite1mDataBuffer.push(...filterArrayByKeyword(lines, this.usage1mMeasurementName));

  // batchSize 이상일 경우 버킷에서 메시지를 누출하여 처리
  if (this.succeedWrite1mDataBuffer.length > this.INFLUXDB_SUCCESS_MESSAGE_BATCH_SIZE) {
    const usage1mDataGroup = new UsageDataGroup(
        this.succeedWrite1mDataBuffer.splice(0, this.INFLUXDB_SUCCESS_MESSAGE_BATCH_SIZE),
    );
    this.aggregateData(usage1mDataGroup, '5m').catch((error) => {
      throw error;
    });
  }

기존에는 Pub/Sub 메시지가 많아질 경우 InfluxDB가 모든 부담을 감당했다면
변경 전

개선 작업 이후로는 서버에서 데이터 흐름을 조절해주어 항상 높은 수준의 성능을 낼 수 있는 환경이 되었습니다.
변경 후

profile
다양한 경험을 해보고자 하는 Backend-Engineer.

0개의 댓글