Celery 찍어먹어보기

Matthew Woo·2022년 4월 6일
1

My Review

목록 보기
9/11

django-admin startproject mq_celery . : Celery 를 사용해볼 장고를 띄움

pip install celery : celery 설치
brew install rabbitmq : 브로커 역할을 해줄 토끼mq

python manage.py startapp app1
mq_clerey/settings.py 에서 app1 을 추가해준다.

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'app1' # 이거 추가 안해줬다가 task에 잡히질 앉는 오류를 범했었다.
]
  • mq_celery/celery.py
from __future__ import absolute_import, unicode_literals

import os

from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mq_celery.settings') # celery instance 생성 전에 설정해줘야함

app = Celery('mq_celery') # celery instance 가 생성됨

app.config_from_object('django.conf:settings', namespace='CELERY')

app.autodiscover_tasks() # 이러면 등록된 앱들의 task를 오토로 챙겨온다.

app.autodicover 에는 어떤 package, 어떤 파일에서 테스크를 가져올 지 인자로 설정할 수 있다.

force (bool) – By default this call is lazy so that the actual auto-discovery won’t happen until an application imports the default modules. 
Forcing will cause the auto-discovery to happen immediately.

이 외에도 bool 속성으로 위의 내용을 설정할 수 있다는데, 내가 맞게 이해한건지는 모르겠다. autodiscover가 각 app들이 import 되고 나서 실행되는데 이걸 각 app들이 import 되기 전에 먼저 autodiscover 할 수 있다는 것 같다.

  • app1/tasks.py
from __future__ import absolute_import, unicode_literals

from celery import shared_task

@shared_task 
def add(x, y):
    return x + y

@shared_task 를 사용하면 app.task와 같이 app에 종속적이지 않게 사용할 수 있다. 이로 인해 app 을 import해오지 않아도 된다.

celery -A mq_celery worker -l info: celery 실행 tasks에 테스크가 잡힌다.

임의로 테스크를 넣어주었다. 오타는 덤.

[2022-04-06 13:37:51,037: INFO/MainProcess] Task app1.tasks.add[677b0c4e-ab18-4a3c-a076-6bb0d5f20678] received
[2022-04-06 13:37:51,041: INFO/ForkPoolWorker-8] Task app1.tasks.add[677b0c4e-ab18-4a3c-a076-6bb0d5f20678] succeeded in 0.0006483820034191012s: 8
[2022-04-06 13:39:11,150: INFO/MainProcess] Task app1.tasks.add[1fcdab19-11fc-4c5f-8cf9-87df5525d4b4] received
[2022-04-06 13:39:22,370: INFO/ForkPoolWorker-8] Task app1.tasks.add[1fcdab19-11fc-4c5f-8cf9-87df5525d4b4] succeeded in 0.000147557002492249s: 6
[2022-04-06 13:39:32,711: INFO/MainProcess] Task app1.tasks.add[4bddfcb3-d309-41c8-9d41-03cc94ac1793] received
[2022-04-06 13:39:34,297: INFO/ForkPoolWorker-8] Task app1.tasks.add[4bddfcb3-d309-41c8-9d41-03cc94ac1793] succeeded in 0.00013701198622584343s: 6

결과는 잘 나오는군.

이메일 보내는 처리를 clerey task를 이용해본다.

# task.py
from celery import shared_task
from celery.utils.log import get_task_logger

from .email import send_review_email # 이메일 전송하는 코드를 만들어놓고 import 하였음

logger = get_task_logger(__name__)

@shared_task(name="send_reviw_email_task")
def send_review_email_task(name, email, review):
    logger.info("Sent review email!!")
    return send_review_email(name, email, review) # celery shared_task 로 wrapping 함
# form.py

from django import forms
from app2.tasks import send_review_email_task # 위 task 로 지정한 것을 import

class ReviewForm(forms.Form):
    name = forms.CharField(
        label='Firstname', min_length=4, max_length=50, widget=forms.TextInput(
            attrs={'class': 'form-control mb-3', 'placeholder': 'Firstname', 'id': 'form-firstname'}))
    email = forms.EmailField(
        max_length=200, widget=forms.TextInput(
            attrs={'class': 'form-control mb-3', 'placeholder': 'E-mail', 'id': 'form-email'}))
    review = forms.CharField(
        label="Review", widget=forms.Textarea(attrs={'class': 'form-control', 'rows': '5'}))

    def send_email(self): # send 하게 되면 celery task로 enqueue 됨
        send_review_email_task.delay( 
            self.cleaned_data['name'], self.cleaned_data['email'], self.cleaned_data['review'])

깃(https://github.com/jungbumwoo/mq_celery)


참고(https://youtu.be/fBfzE0yk97k)

profile
지속가능하고 안정적인 시스템을 만들고자 합니다.

0개의 댓글