FastAPI 시작하기(feat. celery)

Hyeseong·2022년 5월 12일
1

fastapi

목록 보기
1/2

개요

  1. Celery와 FastAPI 셋업하기
  2. Celery tasks 실행하기
  3. Flower를 이용한 Celery app 모니터링하기

Redis 셋업

OS에 사실 레디스를 바로 설치 하거나 혹은 도커를 이용해서 레디스 서버를 구성할 수도 있습니다.
하지만 기본적으로 도커를 사용하는게 여러모로 편리합니다. 도커를 강추합니다.

도커로 설치

도커설치를 안했다면 일단 도커부터 설치하길 당부드립니다.

$ docker run -p 6379:6379 --name some-redis -d redis

도커허브에서 레디스 공식 이미지를 다운로드 할텐데요. PORT는 6379로 포트포워딩되고 백그라운드로 실행될 거에요.

정상적으로 컨테이너가 기동되었는지 아래 명령어를 날려주세요.

$ docker exec -it some-redis redis-cli ping
PONG

도커 업이 했을 경우

Redis 공식 웹사이트에서 다운받았든, package manager(ex. APT, YUM, Homebrew or Chocolatey)로 설치했든 아래 명령어를 통해 서버를 기동합니다.

$ redis-server

다른 터미널을 열어서 ping 명령어를 날려서 PONG 응답을 받습니다.

$ redis-cli ping
PONG

Celery 구성하기

FastAPI 프로젝트 생성하기

$ mkdir fastapi-celery-project && cd fastapi-celery-project

가상환경을 만들고 활성화 시킬게요.

$ python -m venv env
$ source env/bin/activate
(env)$

Poetry or Pipenv를 이용해서 기존 virtualenv나 Pip 가상환경을 좀더 모던한 파이썬 환경으로 변경시킬수 있어요.

poetry - https://python-poetry.org/
Pipenv - https://github.com/pypa/pipenv
Modern Python Environment - https://testdriven.io/blog/python-environments/

requirements.txt 파일에 아래 내용을 작성할게요.

fastapi==0.66.0
uvicorn[standard]==0.14.0

설치

(env)$ pip install -r requirements.txt

main.py라는 새로운 파일을 생성 할게요.

from fastapi import FastAPI

app = FastAPI()


@app.get("/")
async def root():
    return {"message": "Hello World"}

app 기동을 해 볼게요.

(env)$ uvicorn main:app --reload

INFO:     Uvicorn running on http://localhost:8000 (Press CTRL+C to quit)
INFO:     Started reloader process [96439] using watchgod
INFO:     Started server process [96482]
INFO:     Waiting for application startup.
INFO:     Application startup complete.

브라우저에 localhost 8000포트로 가봅니다. 그럼 "{'message':'Hello World'}" 데이터를 확인 할 수 있어요. 서버를 끌게요.

일단 프로젝트 구조는 아래와 같아요.

├── main.py
└── requirements.txt

Celery 추가하기

celery 설치와 구성을 해볼게요.
requirements.txt 파일을 업데이트 할게요.

celery==5.1.2
redis==3.5.3

설치하기

(env)$ pip install -r requirements.txt

main.py 파일을 업데이트 할게요.

from celery import Celery
from fastapi import FastAPI

app = FastAPI()


celery = Celery(
    __name__,
    broker="redis://127.0.0.1:6379/0",
    backend="redis://127.0.0.1:6379/0"
)


@app.get("/")
async def root():
    return {"message": "Hello World"}


@celery.task
def divide(x, y):
    import time
    time.sleep(5)
    return x / y
1. FastAPI 인스턴스를 만든 다음, 새로운 Celery 인스턴스 하나를 만듭니다. 
2. Celery에게 Redis를 `broker`, `backend`로 사용할 것을 지정해줍니다. 사실 위 방법 처럼 하드코딩하기 보다는 컨피그 파일을 별도로 두거나 혹은 환경 변수로 추후 변경 할 수 있습니다.
3. 임시로 long-running task를 만들기 위해서 divide 태스크를 구현했습니다. 

Sending a Task to Celery

설정이 완료 되었으면, Celery에게 task를 보내서 어떻게 작동하는지 확인해 보겠습니다.

(env)$ celery -A main.celery worker --loglevel=info
[config]
.> app:         main:0x10ad0d5f8
.> transport:   redis://127.0.0.1:6379/0
.> results:     redis://127.0.0.1:6379/0
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)

[queues]
.> celery           exchange=celery(direct) key=celery


[tasks]
  . main.divide

이전 터미널을 열어서 python shell 모드로 진입하겠습니다.

(env)$ python

Celery worker에 task 몇 개를 보내보겠습니다.

>>> from main import app, divide
>>> task = divide.delay(1, 2)

내부 작동
1. 메시지 브로커에게 새로운 메시지를delay method를 이용하여 보냈습니다. 그리고 큐로부터 워커가 픽업한 다음 해당 태스크를 실행하게 됩니다.
2. 엔터키를 탁! 누르고 난 다음 백그라운드로 해당 태스크 작업이 돌아가고 마친다고 보면 됩니다.

아래와 같은 로그 메시지를 셀러리 워커 터미널에서 보게 될 것입니다.

[2022-05-05 20:40:17,901: INFO/MainProcess] Task main.divide[ec8f8ee0-4e47-4a25-851e-acd69bc3e3b1] received
[2022-05-05 20:40:22,913: INFO/ForkPoolWorker-16] Task main.divide[ec8f8ee0-4e47-4a25-851e-acd69bc3e3b1] succeeded in 5.011416195999999s: 0.5

20:40:17,901에 task를 받아 워커에서 처리했습니다. 시작하고 마치는데 약 5초가 소요되었습니다.

  1. 셀러리 클라이언트(Producer)는 메시지 브로커를 통해 queue에 새로운 작업을 더하게됩니다.
  2. 셀러리 워커(Consumer)는 메시지 브로커를 통해서 태스크를 큐로부터 가져와서 해당 태스크를 처리합니다.
  3. 처리 결과는 result backend에 저장됩니다.

새로운 task를 추가하겠습니다.

>>> task = divide.delay(1, 2)

>>> type(task)
<class 'celery.result.AsyncResult'>

delay 메서드를 호출한 후 AsyncResult 반환 값 또는 exception detail과 같은 정보등과 함께 작업 상태를 확인하는 데 사용할 수 있는 인스턴스를 얻습니다.

새 task을 추가한 다음 task.state, task.result를 print로 찍어내보겠습니다.

>>> print(task.state, task.result)
PENDING None

>>> print(task.state, task.result)
PENDING None

>>> print(task.state, task.result)
PENDING None

>>> print(task.state, task.result)
PENDING None

>>> print(task.state, task.result)
PENDING None

>>> print(task.state, task.result)
SUCCESS 0.5

>>> print(task.state, task.result)
SUCCESS 0.5

오류 발생시 어떻게 되는지 확인해보겠습니다.

>>> task = divide.delay(1, 0)

>>> task.state
'FAILURE'

>>> task.result
ZeroDivisionError('division by zero')

Flower를 이용하여 Celery모니터링 하기

Flower는 Celery용 실시간 웹 애플리케이션 모니터링 및 관리 도구입니다.
dependency를 requirements.txt파일에 추가하고 설치 후 실행 해 보겠습니다.

flower==1.0.0
$ pip install -r requirements.txt
$ celery -A main.celery flower --port=5555

대시보드를 보려면 http://localhost:5555로 이동하면 됩니다. 완료된 작은 상당 Tasks를 클릭하면 볼 수 있습니다.

UUID 컬럼이 보일 텐데요 task에 고유하게 부여되는 id입니다. 보이는 UUID중 한가지를 선택하여 카피한다음 shell에서 작업을 해보겠습니다.


>>> from celery.result import AsyncResult
>>> task = AsyncResult('9ad203e0-d59d-48e4-8805-9ac81708b649')  # UUID를 붙여 넣습니다.
>>>
>>> task.state
'FAILURE'
>>>
>>> task.result
ZeroDivisionError('division by zero')

해당 대시보드에서 작업을 통해서 편리하게 처리된 태스크의 내역들을 일목요연하며 직관적으로 여러 정보를 파악할 수 있어서 장점이 있습니다.

profile
어제보다 오늘 그리고 오늘 보다 내일...

0개의 댓글