Model Service를 위한 Celery 구성 및 모니터링(Flower + Prometheus + Grafana)

노란선물·2021년 8월 12일
0

Celery

목록 보기
1/1
post-thumbnail

추론 시간이 오래 걸리는 모델 서비스를 위한 Celery 구성을 기록한 글입니다.

더 좋은 방법이 있으면 알려주세요!

Docker-compose 설정

redis:
    image: redis
    container_name: redis
    restart: always

rabbitmq:
  image: rabbitmq
  container_name: rabbitmq
  restart: always

worker:
    build: .
    container_name: worker
    restart: always
    command: "celery -A app.worker.celery_worker worker --without-heartbeat --without-mingle -P solo -l info -E --concurrency=1 -Ofair"
    depends_on:
      - redis
      - rabbitmq

prometheus:
    image: prom/prometheus
    container_name: prometheus
    restart: always
    volumes:
      - ./prometheus/:/etc/prometheus/
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
    ports:
      - 9090:9090
    
  flower:  
    image: mher/flower
    container_name: flower
    environment:
      - CELERY_BROKER_URL=amqp://guest@rabbitmq:5672//
      - FLOWER_PORT=7777
    ports:  
      - 7777:7777
  
  grafana:
    image: grafana/grafana
    container_name: grafana
    restart: always
    depends_on:
      - prometheus
    ports:
      - "3000:3000"
  • app.worker.celery_worker: worker로 실행할 코드가 있는 경로
  • --without-heartbeat --without-mingle: diagnostic and redundant heartbeat messages 보내지 않음
  • -P solo: 서비스 특성에 맞게 설정 여기선 single모드인 solo로 설정
  • -E: prometheus로 monitoring을 위한 event 설정
  • --concurrency=1: 서비스 특성에 맞게 설정
  • -Ofair: https://daeguowl.tistory.com/157

Celery 설정

from celery import Celery

app = Celery("worker", backend="redis://redis:6379", broker="amqp://guest@rabbitmq:5672//")

# https://ichi.pro/ko/dib-leoning-algolijeum-eul-seobiseulo-silhaeng-117261469126622
app.conf.update(
    broker_pool_limit=None,
    task_acks_late=True,
    broker_heartbeat=None,
    worker_prefetch_multiplier=1,
    task_track_started=True,
    worker_send_task_events=True,
    task_send_sent_event=True,
    result_expires=86400,  # one day,
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
    timezone="Asia/Seoul",
    enable_utc=False
)
  • broker_pool_limit: connection pool에서 열 수 있는 최대 connection수입니다. None으로 설정하면 connection pool이 비활성화되고 connection이 설정되고 사용할 때마다 닫힙니다.
  • task_acks_late: 기본적으로 작업을 수행하면 작업이 실행되기 직전에 "acked" 되지만 오랜시간이 걸리는 딥러닝 작업의 경우 계산 된 후에만 "acked" 되기 위한 설정
  • broker_heartbeat: hearbeat 모니터링을 할지 안할지 결정한다. 작업이 오래걸리므로 None 설정
  • worker_prefetch_multiplier: worker가 받을 메세지 수량을 결정한다. 1로 설정하여 한번에 하나의 메시지만 받도록 설정
  • task_track_started: True 설정시 작업의 상태를 알 수 있다.
  • worker_send_task_events, task_send_sent_event: prometheus로 모니터링을 하기 위한 설정
  • result_expires: backend에 데이터가 남아있는 기간을 설정

Celery 실행

from celery import uuid

def celery_start():
  task_id = uuid()
  celery_test.apply_async(...),task_id=task_id)

  return task_id

apply_async()를 이용하여 uuid로 생성한 task_id를 이용하여 실행된 함수의 상태가 backend로 지정한 redis에 저장되며 우리는 task_id를 이용하여 task info를알수있다.

Celery 실행 함수

from celery.contrib.abortable import AbortableTask

@app.task(bind=True, max_retries=0, base=AbortableTask)
def celery_test():
	...
  • bind: The bind argument means that the function will be a “bound method” so that you can access attributes and methods on the task type instance.
  • max_retries: max_retries 설정된 수 만큼 작업을 재시도 한다. 0으로 설정하여 재시도를 안하도록함
  • base: AbortableTask 를 설정하여 celery가 돌아가는 중간에 중단시킬 수 있도록한다.

Celery 상태 확인

from celery.result import AsyncResult

def celery_state(task_id):
    task = AsyncResult(id=task_id, app=app)
   
    return {'state':task.state, 'info'=task.info}
    

task_id를 이용하여 celery의 상태값인 STARTED, SUCCESS, PENDING, REVOKED 같은 정보를 알 수 있다. SUCCESS상태면 추론 완료, REVOKED 상태면 추론 중단 등 활용이 가능하다.

Celery 실행 중단

from celery.contrib.abortable import AbortableAsyncResult
from celery.result import AsyncResult

def celery_stop(task_id):
  try:
    AbortableAsyncResult(task_id).abort()
    AsyncResult(id=task_id).revoke(terminate=True, signal="SIGKILL")
    log.info("task kill")
    return Response("success task kill")
  except Exception:
    return Response("fail task kill")

사용자가 추론시간이 긴 작업을 수행중 중단을 할 경우 celery는 계속 돌아가고 있기 떄문에 task를 중단 해줘야함

Celery 모니터링

Celery -> Flower -> Prometheus -> Grafana

💡 Flower는 따로 설정 안하셔도 됩니다.

prometheus 설정

# prometheus.yml

global:
  scrape_interval:     15s
  evaluation_interval: 15s

scrape_configs:
  - job_name: flower
    static_configs:
      - targets: ['flower:5555']

localhost:9090 에서 flower가 검색되는지 확인합니다. flower가 검색이 안되면 각 설정의 URL들을 다시 확인합니다.

Grafana 설정

Prometheus Integration - Flower 1.0.1 documentation

💡 celery-monitoring-grafana-dashboard.json 파일은 밑 주소에서 확인할 수 있습니다.

https://github.com/mher/flower/blob/master/examples/celery-monitoring-grafana-dashboard.json


위 Grafana 설정대로 따라하셨으면 dashboard결과를 확인할 수 있습니다.

참고자료

딥 러닝 알고리즘을 서비스로 실행

https://flower.readthedocs.io/en/latest/prometheus-integration.html

https://www.cloudamqp.com/docs/celery.html

https://daeguowl.tistory.com/157

https://newbiestory.tistory.com/61

https://kyoungmookryu.medium.com/python-celery-d347ac155720

0개의 댓글