아래 글은 claude를 많이 활용한 점을 미리 언급한다. 나도 처음 접해보는 내용을 공부하는 것이기 때문에, AI를 많이 활용했다.
특히, thread와 async가 너무 헷갈리고, 실무에선 처음이기 때문에...
일단, 개인 학습용이라고 보면 되는 글이다.
알림이 발생할 때마다 즉시 이메일을 발송하면 어떤 문제가 생길까? 1분에 10개의 알림이 발생한다면, 사용자는 1분에 10통의 이메일을 받게 된다. 이는 사용자 경험을 해치고, SMTP 서버에도 부담을 준다.
이 글에서는 30초 배치 버퍼 패턴과 asyncio 동시성 제어를 활용하여 이메일 과다 발송 문제를 해결한 과정을 공유한다.
이전에 MQTT 분산 토픽 통합 모듈을 개발할 때는 threading.Event를 사용했다. MQTT 브로커 연결을 대기하는 blocking I/O 작업이 주를 이루었고, 독립적인 프로세스로 동작했기 때문이다.
하지만 이번 프로젝트에서는 asyncio를 선택했다. 이유는 다음과 같다.
1. FastAPI 프레임워크 제약
2. I/O 바운드 작업의 특성
3. 동시성 요구사항
| 구분 | Threading | Asyncio |
|---|---|---|
| 동작 방식 | OS 레벨 스레드 (선점형) | 단일 스레드 + 이벤트 루프 (협력형) |
| GIL 영향 | 있음 (CPU 바운드 작업 제약) | 거의 없음 (I/O 바운드 최적화) |
| 메모리 | 스레드당 수 MB | 코루틴당 수 KB |
| 적합한 작업 | I/O 바운드 (asyncio 생태계 외) | I/O 바운드 (async/await 지원) |
| 컨텍스트 스위칭 비용 | 높음 (OS 개입) | 낮음 (유저 스페이스) |
예시: DB 쿼리 100개를 동시에 처리
I/O 대기 중에는 CPU가 놀고 있으므로, asyncio의 가벼운 코루틴이 훨씬 효율적이다.
"Threading과 asyncio는 같이 못 쓴다"는 말을 자주 듣지만, 정확히는 "명확한 경계 없이 섞으면 깨진다"가 맞다.
Asyncio는 단일 이벤트 루프에서 모든 코루틴을 스케줄링한다. 이벤트 루프는 특정 스레드에 속하며, 다른 스레드에서 접근하면 충돌한다.
import asyncio
import threading
async def async_task():
await asyncio.sleep(1)
def thread_task():
# ❌ 크래시 발생!
asyncio.run(async_task()) # 다른 스레드에서 이벤트 루프 실행
t = threading.Thread(target=thread_task)
t.start()
# RuntimeError: This event loop is already running
Threading과 asyncio는 서로 다른 동기화 프리미티브를 사용한다.
threading.Lock ≠ asyncio.Lockthreading.Event ≠ asyncio.Event이들은 호환되지 않으므로, 한쪽에서 획득한 Lock을 다른 쪽에서 해제할 수 없다.
import asyncio
import time
async def good_task():
for i in range(3):
print("Good")
await asyncio.sleep(1) # ✅ 다른 코루틴 실행 가능
async def bad_task():
for i in range(3):
print("Bad")
time.sleep(1) # ❌ 1초 동안 모든 코루틴 멈춤!
async def main():
await asyncio.gather(good_task(), bad_task())
asyncio.run(main())
# 출력: Bad만 3번 나오고 1초씩 멈춤, Good은 나중에 나옴
time.sleep()은 blocking 호출이므로 전체 이벤트 루프를 멈춘다. 반면 await asyncio.sleep()은 다른 코루틴이 실행될 수 있도록 양보한다.
asyncio.to_thread()Blocking I/O를 asyncio에서 사용해야 할 때는 asyncio.to_thread()로 래핑한다:
import asyncio
import time
def blocking_smtp_send(recipient, message):
"""Blocking I/O 라이브러리 (smtplib 같은)"""
time.sleep(2) # SMTP 연결 및 발송
return f"Email sent to {recipient}"
async def send_email(recipient, message):
# ✅ Blocking 코드를 별도 스레드에서 실행
result = await asyncio.to_thread(blocking_smtp_send, recipient, message)
print(result)
asyncio.run(send_email("user@example.com", "Hello"))
원리
[Alert 발생] → [add_alert()] → [버퍼 추가 (Lock 획득)]
↓
[30초 타이머] → [flush_buffer()] → [버퍼 복사 (Lock 획득)]
↓
[Lock 해제]
↓
[이메일 발송]
핵심 설계 원칙
asyncio.Lock을 사용하여 버퍼의 읽기/쓰기를 보호한다:
import asyncio
from typing import Dict, List
from collections import defaultdict
class EmailBuffer:
def __init__(self, flush_interval: int = 30):
self.flush_interval = flush_interval
self.buffer: Dict[str, List[Dict]] = defaultdict(list)
self._lock = asyncio.Lock() # 동시성 제어
async def add_alert(self, device_id: str, alert_data: Dict):
"""버퍼에 알림 추가 (Lock으로 보호)"""
async with self._lock:
self.buffer[device_id].append(alert_data)
async with 패턴의 장점
asyncio.Lock을 사용한 동시성 제어async def flush_buffer(self):
"""버퍼를 플러시하고 이메일 발송"""
async with self._lock:
if not self.buffer:
return
# 버퍼 복사 및 클리어 (Lock 안에서만)
grouped = dict(self.buffer)
self.buffer.clear()
# Lock 밖에서 이메일 발송 (시간 오래 걸림)
for device_id, alerts in grouped.items():
await self._send_device_email(device_id, alerts)
Lock 범위 최소화의 중요성
asyncio.create_task()로 백그라운드 스케줄러async def start_flush_scheduler(self):
"""30초마다 버퍼를 플러시하는 스케줄러 시작"""
self.flush_task = asyncio.create_task(self._flush_loop())
async def _flush_loop(self):
"""무한 루프로 30초마다 플러시"""
while True:
await asyncio.sleep(self.flush_interval)
await self.flush_buffer()
asyncio.create_task()의 역할
asyncio.to_thread()로 Blocking SMTP 처리import smtplib
from email.mime.text import MIMEText
class EmailService:
def __init__(self):
self.smtp_host = "smtp.example.com"
self.smtp_port = 25
self.smtp_username = "service"
self.smtp_password = "password"
async def send_email(self, recipients: List[str], subject: str, body: str) -> bool:
"""이메일 발송 (비동기 래퍼)"""
try:
msg = self._create_mime_message(recipients, subject, body)
# Blocking SMTP를 별도 스레드에서 실행
await asyncio.to_thread(self._send_smtp, recipients, msg)
return True
except Exception as e:
print(f"Failed to send email: {e}")
return False
def _send_smtp(self, recipients: List[str], msg: MIMEText):
"""실제 SMTP 발송 (Blocking I/O)"""
with smtplib.SMTP(self.smtp_host, self.smtp_port) as smtp:
smtp.ehlo()
smtp.login(self.smtp_username, self.smtp_password)
smtp.sendmail(self.sender_email, recipients, msg.as_string())
def _create_mime_message(self, recipients: List[str], subject: str, body: str) -> MIMEText:
"""MIMEText 메시지 생성"""
pass # 구현 생략
왜 asyncio.to_thread()가 필요한가?
smtplib는 blocking 라이브러리class EmailBuffer:
def __init__(self, flush_interval: int = 30):
self.flush_interval = flush_interval
self.buffer: Dict[str, List[Dict]] = defaultdict(list)
self._lock = asyncio.Lock()
self.email_service = EmailService()
async def add_alert(self, device_id: str, alert_data: Dict):
"""알림 추가"""
async with self._lock:
self.buffer[device_id].append(alert_data)
async def start_flush_scheduler(self):
"""스케줄러 시작"""
self.flush_task = asyncio.create_task(self._flush_loop())
async def _flush_loop(self):
"""30초마다 플러시"""
while True:
await asyncio.sleep(self.flush_interval)
await self.flush_buffer()
async def flush_buffer(self):
"""버퍼 플러시 및 이메일 발송"""
async with self._lock:
if not self.buffer:
return
grouped = dict(self.buffer)
self.buffer.clear()
# Lock 밖에서 발송
for device_id, alerts in grouped.items():
await self._send_device_email(device_id, alerts)
async def _send_device_email(self, device_id: str, alerts: List[Dict]):
"""장비별 이메일 발송"""
device_name = alerts[0]["device_name"]
recipients = await self._get_recipients(device_id)
subject = f"[알림] {device_name} - {len(alerts)}건"
body = self._create_email_body(device_name, alerts)
await self.email_service.send_email(recipients, subject, body)
async def _get_recipients(self, device_id: str) -> List[str]:
"""수신자 조회 (DB 쿼리)"""
pass # 구현 생략
def _create_email_body(self, device_name: str, alerts: List[Dict]) -> str:
"""이메일 본문 생성"""
pass # 구현 생략
주의: CPU 바운드 작업은 multiprocessing을 사용해야 한다. Threading은 GIL 때문에 CPU 바운드 작업에서 성능 향상이 없다.
asyncio.to_thread() (Asyncio → Blocking)사용 케이스: FastAPI에서 blocking 라이브러리 호출
# 앞서 본 SMTP 예시가 대표적
await asyncio.to_thread(blocking_smtp_send, recipient, message)
asyncio.run_coroutine_threadsafe() (Threading → Asyncio)사용 케이스: Flask 같은 동기 프레임워크에서 비동기 함수 호출
import asyncio
import threading
# 비동기 함수 (asyncio 기반 DB 쿼리)
async def fetch_data_async():
await asyncio.sleep(1)
return {"data": "result"}
# Asyncio 이벤트 루프를 별도 스레드에서 실행
loop = asyncio.new_event_loop()
def run_event_loop():
asyncio.set_event_loop(loop)
loop.run_forever()
loop_thread = threading.Thread(target=run_event_loop, daemon=True)
loop_thread.start()
# 동기 함수에서 비동기 함수 호출
def sync_function():
# ✅ 동기 환경에서 비동기 함수 호출
future = asyncio.run_coroutine_threadsafe(fetch_data_async(), loop)
result = future.result(timeout=5) # Blocking으로 결과 대기
return result
print(sync_function())
사용 예시