Celery 사용기

남영민·2023년 4월 13일
0

Celery는 task를 여러 쓰레드나 기기에 나누어주는 비동기 태스크 큐 입니다.

Airflow로 task 관리를 하던 중 Airflow에서 task를 trigger하고, 로컬 Windows PC에서 해당 task를 처리해야할 필요가 있어 사용하게 되었습니다.

방식은 Airflow에서 원하는 message를 만들어 AWS의 SQS로 발송하고(Airflow가 task를 생성하는 producer 역할, SQS가 task를 분배하는 broker 역할), 로컬 Windows PC에서 CMD로 workers가 SQS를 바라보고 있다가, message가 수신되면 task를 실행하는 방식으로 구현하였습니다.

<Airflow 상 코드>

app = Celery()
app.conf.broker_url = "sqs://"
app.conf.broker_transport_options = {
    "region": "ap-northeast-2",
    "polling_interval": 1,
    "visibility_timeout": 3600,
    "predefined_queues": {
        "SQS-PIPELINE": {
            "url": "https://sqs.ap-northeast-2.amazonaws.com/xxxxxx/Test-Queue",
            }
    }
}

app.conf.timezone = "Asia/Seoul"
app.conf.task_default_queue = "SQS-PIPELINE"

@app.task(name='celery_messages')
def run_celery(start_date, end_date, task):
    """
    넘겨받은 parameter들을 sqs에 전달
    """
    pass

def celery_messages(**context):
    """
    delay함수가 실제로 전달하는 trigger가 됨
    """
    start_date = context['start_date']
    end_date = context['end_date']
    task = context['task']
    run_celery.delay(start_date, end_date, task)
    
send_sqs_messages = PythonOperator(
    task_id = 'send_sqs_messages',
    python_callable = celery_messages,
    op_kwargs = {
        'start_date': env_vars["start_date"],
        'end_date': env_vars["end_date"],
        'task': 'task'
    },
    dag = dag
)

<Task를 실행할 코드의 Celery 설정>

#celery_app.py

from celery import Celery
import TestTask

app = Celery(task_cls=TestCeleryTask)
app.conf.broker_url = "sqs://"
app.conf.broker_transport_options = {
    "region": "ap-northeast-2",
    "polling_interval": 1,
    "visibility_timeout": 1800,
    "predefined_queues": {
        "SQS-PIPELINE-UI-Collector": {
            "url": "https://sqs.ap-northeast-2.amazonaws.com/xxxxxxxxx/Test-Queue",
        }
    }
}
app.conf.timezone = "Asia/Seoul"
app.conf.task_default_queue = "Test-Queue"


@app.task(name='test_task_messages', retry_kwargs={'default_retry_delay': 3600, 'max_retries': 1}, loglevel='Error')
def run_test_task(start_date, end_date, task):
    import TestTask()

<코드를 실행할 PC의 커맨드라인 명령어>

celery -A celery_app worker -l INFO -P solo --concurrency=1 --loglevel ERROR
profile
성장하는 개발자

0개의 댓글