asyncio로 구현하는 이메일 배치 발송 시스템

dasd412·2025년 12월 14일

실무 문제 해결

목록 보기
23/27

참고사항

아래 글은 claude를 많이 활용한 점을 미리 언급한다. 나도 처음 접해보는 내용을 공부하는 것이기 때문에, AI를 많이 활용했다.

특히, thread와 async가 너무 헷갈리고, 실무에선 처음이기 때문에...
일단, 개인 학습용이라고 보면 되는 글이다.

1. 들어가며

알림이 발생할 때마다 즉시 이메일을 발송하면 어떤 문제가 생길까? 1분에 10개의 알림이 발생한다면, 사용자는 1분에 10통의 이메일을 받게 된다. 이는 사용자 경험을 해치고, SMTP 서버에도 부담을 준다.

이 글에서는 30초 배치 버퍼 패턴asyncio 동시성 제어를 활용하여 이메일 과다 발송 문제를 해결한 과정을 공유한다.

해결하려는 문제

  • 알림 발생 시마다 개별 이메일 발송 → 과도한 메일 발생
  • 여러 알림을 동시에 처리하면서 30초마다 배치 발송 필요
  • DB 조회, SMTP 발송, API 호출 등 I/O 바운드 작업 최적화

해결 방향

  • 30초 동안 알림을 버퍼에 모아서 그룹화하여 발송
  • asyncio 기반 동시성 제어로 안전한 버퍼 관리
  • I/O 바운드 작업에 최적화된 비동기 처리

2. 왜 Threading이 아닌 Asyncio인가?

2-1. 이전 프로젝트 > Threading 사용 경험

이전에 MQTT 분산 토픽 통합 모듈을 개발할 때는 threading.Event를 사용했다. MQTT 브로커 연결을 대기하는 blocking I/O 작업이 주를 이루었고, 독립적인 프로세스로 동작했기 때문이다.

2-2. 이번 프로젝트 > Asyncio 선택 이유

하지만 이번 프로젝트에서는 asyncio를 선택했다. 이유는 다음과 같다.

1. FastAPI 프레임워크 제약

  • FastAPI는 asyncio 기반 프레임워크
  • 기존 시스템이 이미 asyncio 이벤트 루프 위에서 동작

2. I/O 바운드 작업의 특성

  • DB 조회
  • SMTP 이메일 발송
  • Slack API 호출
  • 이 모든 작업은 대기 시간이 대부분

3. 동시성 요구사항

  • 여러 Alert를 동시에 처리
  • 30초마다 배치 발송 스케줄러 실행
  • 수백 개의 동시 연결 처리 필요

2-3. Threading vs Asyncio 비교

구분ThreadingAsyncio
동작 방식OS 레벨 스레드 (선점형)단일 스레드 + 이벤트 루프 (협력형)
GIL 영향있음 (CPU 바운드 작업 제약)거의 없음 (I/O 바운드 최적화)
메모리스레드당 수 MB코루틴당 수 KB
적합한 작업I/O 바운드 (asyncio 생태계 외)I/O 바운드 (async/await 지원)
컨텍스트 스위칭 비용높음 (OS 개입)낮음 (유저 스페이스)

예시: DB 쿼리 100개를 동시에 처리

  • Threading: 스레드 100개 × 8MB = 800MB
  • Asyncio: 코루틴 100개 × 2KB = 200KB

I/O 대기 중에는 CPU가 놀고 있으므로, asyncio의 가벼운 코루틴이 훨씬 효율적이다.


3. Threading과 Asyncio를 같이 사용할 수 없는 이유?

"Threading과 asyncio는 같이 못 쓴다"는 말을 자주 듣지만, 정확히는 "명확한 경계 없이 섞으면 깨진다"가 맞다.

3-1. 이벤트 루프 충돌

Asyncio는 단일 이벤트 루프에서 모든 코루틴을 스케줄링한다. 이벤트 루프는 특정 스레드에 속하며, 다른 스레드에서 접근하면 충돌한다.

import asyncio
import threading

async def async_task():
    await asyncio.sleep(1)

def thread_task():
    # ❌ 크래시 발생!
    asyncio.run(async_task())  # 다른 스레드에서 이벤트 루프 실행

t = threading.Thread(target=thread_task)
t.start()

# RuntimeError: This event loop is already running

3-2. 동기화 메커니즘 불일치

Threading과 asyncio는 서로 다른 동기화 프리미티브를 사용한다.

  • threading.Lockasyncio.Lock
  • threading.Eventasyncio.Event

이들은 호환되지 않으므로, 한쪽에서 획득한 Lock을 다른 쪽에서 해제할 수 없다.

3-3. Blocking 호출이 전체 이벤트 루프 멈춤

import asyncio
import time

async def good_task():
    for i in range(3):
        print("Good")
        await asyncio.sleep(1)  # ✅ 다른 코루틴 실행 가능

async def bad_task():
    for i in range(3):
        print("Bad")
        time.sleep(1)  # ❌ 1초 동안 모든 코루틴 멈춤!

async def main():
    await asyncio.gather(good_task(), bad_task())

asyncio.run(main())

# 출력: Bad만 3번 나오고 1초씩 멈춤, Good은 나중에 나옴

time.sleep()은 blocking 호출이므로 전체 이벤트 루프를 멈춘다. 반면 await asyncio.sleep()은 다른 코루틴이 실행될 수 있도록 양보한다.

3-4. 올바른 혼용 방법: asyncio.to_thread()

Blocking I/O를 asyncio에서 사용해야 할 때는 asyncio.to_thread()로 래핑한다:

import asyncio
import time

def blocking_smtp_send(recipient, message):
    """Blocking I/O 라이브러리 (smtplib 같은)"""
    time.sleep(2)  # SMTP 연결 및 발송
    return f"Email sent to {recipient}"

async def send_email(recipient, message):
    # ✅ Blocking 코드를 별도 스레드에서 실행
    result = await asyncio.to_thread(blocking_smtp_send, recipient, message)
    print(result)

asyncio.run(send_email("user@example.com", "Hello"))

원리

  • Blocking 함수를 워커 스레드에서 실행
  • 메인 이벤트 루프는 계속 돌아가며 다른 코루틴 처리
  • 워커 스레드 작업 완료 시 결과를 이벤트 루프로 반환

4. 30초 배치 버퍼 설계

4-1. 요구사항

  1. Alert 발생 즉시 버퍼에 추가
  2. 30초마다 장비별로 그룹화하여 이메일 발송
  3. 동시 읽기/쓰기 충돌 방지

4-2. 설계 흐름도

[Alert 발생] → [add_alert()] → [버퍼 추가 (Lock 획득)]
                                        ↓
[30초 타이머] → [flush_buffer()] → [버퍼 복사 (Lock 획득)]
                                        ↓
                                   [Lock 해제]
                                        ↓
                                   [이메일 발송]

핵심 설계 원칙

  • Lock 범위 최소화: 버퍼 복사 및 클리어만 Lock 안에서 수행
  • 시간이 오래 걸리는 이메일 발송은 Lock 밖에서 수행

4-3. Asyncio 동시성 제어 패턴

asyncio.Lock을 사용하여 버퍼의 읽기/쓰기를 보호한다:

import asyncio
from typing import Dict, List
from collections import defaultdict

class EmailBuffer:
    def __init__(self, flush_interval: int = 30):
        self.flush_interval = flush_interval
        self.buffer: Dict[str, List[Dict]] = defaultdict(list)
        self._lock = asyncio.Lock()  # 동시성 제어
        
    async def add_alert(self, device_id: str, alert_data: Dict):
        """버퍼에 알림 추가 (Lock으로 보호)"""
        async with self._lock:
            self.buffer[device_id].append(alert_data)

async with 패턴의 장점

  • Lock 획득/해제를 자동으로 처리
  • 예외 발생 시에도 Lock이 안전하게 해제됨
  • 코드 가독성 향상

5. 핵심 구현

5-1. asyncio.Lock을 사용한 동시성 제어

async def flush_buffer(self):
    """버퍼를 플러시하고 이메일 발송"""
    async with self._lock:
        if not self.buffer:
            return
        
        # 버퍼 복사 및 클리어 (Lock 안에서만)
        grouped = dict(self.buffer)
        self.buffer.clear()
    
    # Lock 밖에서 이메일 발송 (시간 오래 걸림)
    for device_id, alerts in grouped.items():
        await self._send_device_email(device_id, alerts)

Lock 범위 최소화의 중요성

  • Lock 안: 빠른 메모리 연산만 (복사, 클리어)
  • Lock 밖: 느린 I/O 작업 (이메일 발송)
  • 다른 코루틴이 버퍼에 추가할 수 있도록 빠르게 Lock 해제

5-2. asyncio.create_task()로 백그라운드 스케줄러

async def start_flush_scheduler(self):
    """30초마다 버퍼를 플러시하는 스케줄러 시작"""
    self.flush_task = asyncio.create_task(self._flush_loop())

async def _flush_loop(self):
    """무한 루프로 30초마다 플러시"""
    while True:
        await asyncio.sleep(self.flush_interval)
        await self.flush_buffer()

asyncio.create_task()의 역할

  • 코루틴을 백그라운드에서 실행
  • 메인 이벤트 루프는 계속 다른 작업 처리
  • Task 객체 반환으로 나중에 취소 가능

5-3. asyncio.to_thread()로 Blocking SMTP 처리

import smtplib
from email.mime.text import MIMEText

class EmailService:
    def __init__(self):
        self.smtp_host = "smtp.example.com"
        self.smtp_port = 25
        self.smtp_username = "service"
        self.smtp_password = "password"
        
    async def send_email(self, recipients: List[str], subject: str, body: str) -> bool:
        """이메일 발송 (비동기 래퍼)"""
        try:
            msg = self._create_mime_message(recipients, subject, body)
            
            # Blocking SMTP를 별도 스레드에서 실행
            await asyncio.to_thread(self._send_smtp, recipients, msg)
            return True
        except Exception as e:
            print(f"Failed to send email: {e}")
            return False
    
    def _send_smtp(self, recipients: List[str], msg: MIMEText):
        """실제 SMTP 발송 (Blocking I/O)"""
        with smtplib.SMTP(self.smtp_host, self.smtp_port) as smtp:
            smtp.ehlo()
            smtp.login(self.smtp_username, self.smtp_password)
            smtp.sendmail(self.sender_email, recipients, msg.as_string())
    
    def _create_mime_message(self, recipients: List[str], subject: str, body: str) -> MIMEText:
        """MIMEText 메시지 생성"""
        pass  # 구현 생략

asyncio.to_thread()가 필요한가?

  • smtplib는 blocking 라이브러리
  • 직접 호출하면 전체 이벤트 루프가 멈춤
  • 워커 스레드에서 실행하여 다른 코루틴 영향 없음

5-4. 전체 흐름 통합

class EmailBuffer:
    def __init__(self, flush_interval: int = 30):
        self.flush_interval = flush_interval
        self.buffer: Dict[str, List[Dict]] = defaultdict(list)
        self._lock = asyncio.Lock()
        self.email_service = EmailService()
    
    async def add_alert(self, device_id: str, alert_data: Dict):
        """알림 추가"""
        async with self._lock:
            self.buffer[device_id].append(alert_data)
    
    async def start_flush_scheduler(self):
        """스케줄러 시작"""
        self.flush_task = asyncio.create_task(self._flush_loop())
    
    async def _flush_loop(self):
        """30초마다 플러시"""
        while True:
            await asyncio.sleep(self.flush_interval)
            await self.flush_buffer()
    
    async def flush_buffer(self):
        """버퍼 플러시 및 이메일 발송"""
        async with self._lock:
            if not self.buffer:
                return
            grouped = dict(self.buffer)
            self.buffer.clear()
        
        # Lock 밖에서 발송
        for device_id, alerts in grouped.items():
            await self._send_device_email(device_id, alerts)
    
    async def _send_device_email(self, device_id: str, alerts: List[Dict]):
        """장비별 이메일 발송"""
        device_name = alerts[0]["device_name"]
        recipients = await self._get_recipients(device_id)
        
        subject = f"[알림] {device_name} - {len(alerts)}건"
        body = self._create_email_body(device_name, alerts)
        
        await self.email_service.send_email(recipients, subject, body)
    
    async def _get_recipients(self, device_id: str) -> List[str]:
        """수신자 조회 (DB 쿼리)"""
        pass  # 구현 생략
    
    def _create_email_body(self, device_name: str, alerts: List[Dict]) -> str:
        """이메일 본문 생성"""
        pass  # 구현 생략

6. Threading vs Asyncio 선택 기준

6-1. Threading을 선택해야 할 때

  • I/O 바운드 작업 중 asyncio 생태계가 아닌 경우: 레거시 blocking 라이브러리 (오래된 DB 드라이버 등)
  • 독립적인 프로세스 대기: MQTT 브로커 연결처럼 단순 대기만 하는 경우
  • 간단한 병렬 I/O 작업: 파일 읽기, 네트워크 요청 등

주의: CPU 바운드 작업은 multiprocessing을 사용해야 한다. Threading은 GIL 때문에 CPU 바운드 작업에서 성능 향상이 없다.

6-2. Asyncio를 선택해야 할 때

  • I/O 바운드 작업: DB, API, 네트워크 요청
  • FastAPI, aiohttp 등 async 프레임워크: 이미 asyncio 생태계
  • 수천 개의 동시 연결: WebSocket, 실시간 스트리밍
  • 메모리 효율: 수백 개의 동시 작업을 적은 메모리로 처리

6-3. 병행 사용 가능 케이스

방법 1: asyncio.to_thread() (Asyncio → Blocking)

사용 케이스: FastAPI에서 blocking 라이브러리 호출

# 앞서 본 SMTP 예시가 대표적
await asyncio.to_thread(blocking_smtp_send, recipient, message)

방법 2: asyncio.run_coroutine_threadsafe() (Threading → Asyncio)

사용 케이스: Flask 같은 동기 프레임워크에서 비동기 함수 호출

import asyncio
import threading

# 비동기 함수 (asyncio 기반 DB 쿼리)
async def fetch_data_async():
    await asyncio.sleep(1)
    return {"data": "result"}

# Asyncio 이벤트 루프를 별도 스레드에서 실행
loop = asyncio.new_event_loop()

def run_event_loop():
    asyncio.set_event_loop(loop)
    loop.run_forever()

loop_thread = threading.Thread(target=run_event_loop, daemon=True)
loop_thread.start()

# 동기 함수에서 비동기 함수 호출
def sync_function():
    # ✅ 동기 환경에서 비동기 함수 호출
    future = asyncio.run_coroutine_threadsafe(fetch_data_async(), loop)
    result = future.result(timeout=5)  # Blocking으로 결과 대기
    return result

print(sync_function())

사용 예시

  • Flask/Django + async DB 드라이버 (asyncpg, motor)
  • PyQt/Tkinter GUI에서 비동기 네트워크 요청
  • 동기 테스트 환경에서 비동기 함수 테스트

profile
아키텍쳐 설계에 관심이 많은 백엔드 개발자입니다.

0개의 댓글