서비스를 MSA와 시키는 decoupling 작업을 하면서
SQS와 Lambda로 빅데이터를 처리 중이었다.
- 하지만 이 구조를 만들면서 굉장히 맘에 들지 않았다. 그 이유는 바로
Django의 ORM이나 Celery를 사용할 수 없었다는 점.
- 그래서 대용량 실시간 처리에 조금더 유리하다고 생각한 Kafka를 사용해보기로 했고, 이렇게 했을 때, 메세지 Consumer를 서비스 레벨에서 핸들링 할 수 있었다. 그리고 Topic별로 나누어 메세지를 처리하고 replica를 활용하여 고가용성을 유지할 수 있었기에 사용해보기로 했다.
prod 환경에서는 AWS의 MSK를 사용할 예정이었기 때문에, 로컬에서 테스트할 카프카 환경을 만들어야 했다.
version: '2'
services:
zookeeper:
image: zookeeper
container_name: local-zookeeper
ports:
- "22181:2181"
kafka:
image: wurstmeister/kafka
container_name: local-kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ADVERTISED_PORT: 9092
KAFKA_CREATE_TOPICS: "analysis-request"
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
docker-compose up -d
멀티쓰레드, 비동기 및 기술적 요구사항을 처리하기 위한 방법이다.
import json
import threading
import time
from threading import Thread
from django.apps import AppConfig
from kafka import KafkaConsumer
from celery import shared_task
class KafkaConsumerThread(threading.Thread):
def __init__(self, topic_name):
super(KafkaConsumerThread, self).__init__(daemon=True)
self.topic_name = topic_name
self.broker = "localhost:9092"
self.group_id = "my_group"
def run(self):
consumer = KafkaConsumer(
self.topic_name,
bootstrap_servers=[self.broker],
group_id=self.group_id,
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
auto_offset_reset='earliest',
enable_auto_commit=True,
)
for message in consumer:
# 메세지 처리 로직
....
....
..
.
@shared_task
def start_kafka_consumer_thread(topic_name):
thread = KafkaConsumerThread(topic_name)
thread.start()
class KafkaConfig(AppConfig):
name = "app.kafka"
def ready(self):
kafka_consumer_thread.delay("my_topic")
- Django 앱 최초 1회 실행 시 컨수머를 생성하고
그 이후부터 지속적으로 kafka의 메세지를 Listen 하기 위해
AppConfig의 ready 메서드내에 컨수머를 생성
- Consumer를 비동기로 처리하기 위해 Celery를 사용
- 해당 비동기 프로세스를 멀티쓰레드로 작동시키기 위해 Threading 활용
아주 흡족했다.