최근 OpenAI 기반 질문 응답 API의 사용자 경험을 개선하기 위해 Server Sent Events(SSE) 기반의 스트리밍 방식으로 전환했습니다. 이 글에서는 기존 Polling 방식의 문제점, SSE의 장점, 그리고 실제 백엔드와 프런트엔드에 적용한 코드를 중심으로 전체 흐름을 정리합니다.
기존 방식은 OpenAI API에서 응답을 전부 받아온 후 한꺼번에 클라이언트에 내려주는 Polling 방식이었습니다. 하지만 이 방식은 아래와 같은 단점이 있었습니다:
[Client] 사용자가 질문 또는 파일 업로드
↓
[Next.js Route] /api/ask-4-1 → Flask API에 Proxy 요청
↓
[Flask API] /api/ask-4-1-mini
① 요청 파싱 및 세션관리
② 문맥 검색 및 메시지 구성
③ OpenAI API 스트리밍 호출
④ SSE 응답 스트리밍 (text → reference → metadata → done)
↓
[Client] EventSource로 스트림 수신 및 UI 실시간 업데이트
# 요청 파라미터 정리
question = (form_data.get("question") or "").strip()
session_id = form_data.get("sessionId") or str(uuid.uuid4())
# 세션 초기화 및 주제 벗어남 판단
if session_id not in session_store:
session_store[session_id] = []
else:
prev_user_questions = [...최근 질문 추출...]
if is_topic_drifted(prev_user_questions, question):
session_store[session_id] = []
context = get_context(question)
messages.append({"role": "system", "content": f"[참고 정보]\n{context}"})
messages.append({"role": "user", "content": question})
def call_openai(
messages,
model="gpt-4.1-mini",
# model="ft:gpt-4.1-mini-2025-04-14:charancha:ai-ex:BytKlEJB",
use_web_search: bool = False,
temperature: float = 0.7,
top_p: float = 0.9,
max_output_tokens: int = 4096,
):
try:
tools = None
if use_web_search:
tools = [
{
"type": "web_search_preview",
"search_context_size": "high",
}
]
response = openai_client.responses.create(
model=model,
tools=tools,
input=messages,
temperature=temperature,
top_p=top_p,
max_output_tokens=max_output_tokens,
stream=True, # ✅ 스트림모드
)
# ✅ 응답은 chunk 단위로 들어옴
for delta in response:
if delta.type == "response.output_text.delta" and delta.delta:
# (kind, payload) 형태로 내보냄
yield ("text", delta.delta)
elif delta.type in ("response.done", "response.completed"):
u = delta.response.usage
yield (
"usage",
{ # 최종 1회만 발생
"input": u.input_tokens,
"output": u.output_tokens,
"total": u.total_tokens,
},
)
def generate():
buffer = ""
# OpenAI 응답 스트리밍 수신
for chunk in call_openai(...):
buffer += chunk
yield sse("text", chunk)
# 참고 링크 추출 및 응답
for url in extract_urls_with_titles(buffer):
yield sse("reference", url)
# 세션 저장 및 메타데이터 응답
session_store[session_id].append(...)
yield sse("metadata", { ... })
yield sse("done", {})
헬퍼 함수:
def sse(event_type, data):
return f"data: {json.dumps({'type': event_type, 'content': data}, ensure_ascii=False)}\n\n".encode("utf-8")
if (contentType.startsWith('multipart/form-data')) {
const upstreamRes = await fetch(UPSTREAM, {
method: 'POST',
headers: req.headers,
body: req.body,
...({ duplex: 'half' } as RequestInit),
});
return new Response(upstreamRes.body, {
status: upstreamRes.status,
headers: {
'Content-Type': 'text/event-stream',
Connection: 'keep-alive',
},
});
}
사용자의 입력(텍스트 또는 파일)을 서버에 전송하고, SSE(Stream) 방식으로 응답을 받아 실시간으로 채팅 UI를 업데이트하는 비동기 함수
const sendMessage = useCallback(async (
override: string,
systemPrompt?: string,
isWebSearch?: boolean,
temperature?: number,
topP?: number,
maxTokens?: number,
files?: File[],
) => {
if (isLoading || (!override.trim() && (!files || !files.length))) return;
if (abortRef.current) abortRef.current.abort();
abortRef.current = new AbortController();
const userMsg = {
id: Date.now(),
role: 'user',
text: [override.trim(), files?.map(f => f.name).join(', ')].filter(Boolean).join('\n\n'),
createdAt: formatKoreanTime(new Date()),
};
const loadingMsg = {
id: userMsg.id + 1,
role: 'assistant',
text: files?.length ? '파일 분석 중' : isWebSearch ? '웹 찾아보는 중' : '답변을 생각 중이에요',
loading: true,
};
setMessages(prev => [...prev, userMsg, loadingMsg]);
setIsLoading(true);
try {
const res = await doRequest({ ...params, signal: abortRef.current.signal });
const reader = res.body.getReader();
const decoder = new TextDecoder('utf-8');
let buffer = '', plain = '', urls = [];
while (true) {
const { value, done } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const splitIdx = buffer.lastIndexOf('\n\n');
if (splitIdx === -1) continue;
const consumable = buffer.slice(0, splitIdx + 2);
buffer = buffer.slice(splitIdx + 2);
for (const jsonStr of parseSSE(consumable)) {
let parsed;
try { parsed = JSON.parse(jsonStr); } catch { continue; }
switch (parsed.type) {
case 'text':
plain += parsed.content;
updateMessageById(loadingMsg.id, { text: plain, loading: false });
break;
case 'reference':
urls.push(parsed.url);
break;
case 'metadata':
if (parsed.imageResults)
updateMessageById(loadingMsg.id, { imageResults: parsed.imageResults, loading: false });
break;
case 'done':
break;
}
}
}
finalizeAssistantMsg(loadingMsg.id, plain, urls, isWebSearch);
setChatId(newId);
if (!initialId && onAnswerComplete) onAnswerComplete(newId);
} catch (err) {
updateMessageById(loadingMsg.id, {
text: '요청 전송에 실패했습니다. 잠시 후 다시 시도해 주세요.',
loading: false,
isError: true,
});
finalizeAssistantMsg(loadingMsg.id, '요청 전송에 실패했습니다.', [], isWebSearch);
if (initialId && onAnswerComplete) onAnswerComplete(initialId);
} finally {
setIsLoading(false);
abortRef.current = null;
}
}, [...deps]);
if (isLoading || (!override.trim() && (!files || !files.length))) return;
if (abortRef.current) abortRef.current.abort();
abortRef.current = new AbortController();
fetch
요청에 signal
을 포함시켜 취소 가능하게 구성const userMsg = { ... };
const loadingMsg = { ... };
setMessages(prev => [...prev, userMsg, loadingMsg]);
messages
상태에 추가const res = await doRequest({ ... });
const reader = res.body.getReader();
doRequest
로 서버 API 호출TextDecoder
로 실시간 디코딩for (const jsonStr of parseSSE(consumable)) {
const parsed = JSON.parse(jsonStr);
...
}
\n\n
단위로 수신된 응답을 나눠서 JSON 파싱type
별로 분기 처리type: text
plain += parsed.content;
updateMessageById(...);
type: reference
urls.push(parsed.url);
type: metadata
updateMessageById(...);
finalizeAssistantMsg(loadingMsg.id, plain, urls, isWebSearch);
updateMessageById(loadingMsg.id, { isError: true, text: '요청 실패...' });
setIsLoading(false);
abortRef.current = null;
평균 응답 시간: 기존 10~30초 → 3~10초 단축
프런트 리소스 사용량 감소: 타이핑 효과 제거로 불필요 애니메이션 제거
OpenAI 기반 LLM API에 SSE를 도입하면서 사용자 경험이 눈에 띄게 개선되었습니다. 실시간 피드백, 리소스 절감, 에러 감소까지 여러 장점이 있으며, 특히 질문이 길거나 파일 업로드 등 복합 작업 시에도 타임아웃 없이 안정적으로 동작합니다.