Flask + SSE 기반 OpenAI 스트리밍 응답 처리 구현기

Wonhyo LEE·2025년 7월 29일
0

최근 OpenAI 기반 질문 응답 API의 사용자 경험을 개선하기 위해 Server Sent Events(SSE) 기반의 스트리밍 방식으로 전환했습니다. 이 글에서는 기존 Polling 방식의 문제점, SSE의 장점, 그리고 실제 백엔드와 프런트엔드에 적용한 코드를 중심으로 전체 흐름을 정리합니다.


왜 SSE로 바꿨는가?

기존 방식은 OpenAI API에서 응답을 전부 받아온 후 한꺼번에 클라이언트에 내려주는 Polling 방식이었습니다. 하지만 이 방식은 아래와 같은 단점이 있었습니다:

  • OpenAI 응답 생성 시간이 길어질수록 UX가 저하
  • 응답이 클 경우 타임아웃 발생 위험 증가
  • UI에서 typing animation 처리 필요 → 불필요한 리소스 낭비

✅ SSE(Server Sent Events)로 전환하면?

  • OpenAI에서 응답이 생성되는 청크(chunk) 단위로 실시간 스트리밍
  • 사용자 입장에서 훨씬 빠르게 답변을 확인
  • 타이핑 애니메이션 제거 가능 → 프런트 리소스 절감
  • 전체 응답 대기시간 감소: 평균 10~30초 → 3~10초 내 응답

전체 흐름 요약 (Back → Front)

[Client] 사용자가 질문 또는 파일 업로드
    ↓
[Next.js Route] /api/ask-4-1 → Flask API에 Proxy 요청
    ↓
[Flask API] /api/ask-4-1-mini
    ① 요청 파싱 및 세션관리
    ② 문맥 검색 및 메시지 구성
    ③ OpenAI API 스트리밍 호출
    ④ SSE 응답 스트리밍 (text → reference → metadata → done)
    ↓
[Client] EventSource로 스트림 수신 및 UI 실시간 업데이트

Flask SSE 처리 핵심 로직

1. 세션 및 메시지 관리

# 요청 파라미터 정리
question = (form_data.get("question") or "").strip()
session_id = form_data.get("sessionId") or str(uuid.uuid4())

# 세션 초기화 및 주제 벗어남 판단
if session_id not in session_store:
    session_store[session_id] = []
else:
    prev_user_questions = [...최근 질문 추출...]
    if is_topic_drifted(prev_user_questions, question):
        session_store[session_id] = []

2. 컨텍스트 검색 및 메시지 구성

context = get_context(question)
messages.append({"role": "system", "content": f"[참고 정보]\n{context}"})
messages.append({"role": "user", "content": question})

3. SSE 생성기 정의

def call_openai(
    messages,
    model="gpt-4.1-mini",
    # model="ft:gpt-4.1-mini-2025-04-14:charancha:ai-ex:BytKlEJB",
    use_web_search: bool = False,
    temperature: float = 0.7,
    top_p: float = 0.9,
    max_output_tokens: int = 4096,
):
    try:
        tools = None
        if use_web_search:
            tools = [
                {
                    "type": "web_search_preview",
                    "search_context_size": "high",
                }
            ]

        response = openai_client.responses.create(
            model=model,
            tools=tools,
            input=messages,
            temperature=temperature,
            top_p=top_p,
            max_output_tokens=max_output_tokens,
            stream=True, # ✅ 스트림모드 
        )
        # ✅ 응답은 chunk 단위로 들어옴
        for delta in response:
            if delta.type == "response.output_text.delta" and delta.delta:
                # (kind, payload) 형태로 내보냄
                yield ("text", delta.delta)
            elif delta.type in ("response.done", "response.completed"):
                u = delta.response.usage
                yield (
                    "usage",
                    {  # 최종 1회만 발생
                        "input": u.input_tokens,
                        "output": u.output_tokens,
                        "total": u.total_tokens,
                    },
                )

def generate():
    buffer = ""

    # OpenAI 응답 스트리밍 수신
    for chunk in call_openai(...):
        buffer += chunk
        yield sse("text", chunk)

    # 참고 링크 추출 및 응답
    for url in extract_urls_with_titles(buffer):
        yield sse("reference", url)

    # 세션 저장 및 메타데이터 응답
    session_store[session_id].append(...)
    yield sse("metadata", { ... })
    yield sse("done", {})

헬퍼 함수:

def sse(event_type, data):
    return f"data: {json.dumps({'type': event_type, 'content': data}, ensure_ascii=False)}\n\n".encode("utf-8")

Next.js Proxy 라우트 (프론트 ↔ 백 연결)

multipart/form-data 요청 → 그대로 스트림 전달

if (contentType.startsWith('multipart/form-data')) {
  const upstreamRes = await fetch(UPSTREAM, {
    method: 'POST',
    headers: req.headers,
    body: req.body,
    ...({ duplex: 'half' } as RequestInit),
  });
  return new Response(upstreamRes.body, {
    status: upstreamRes.status,
    headers: {
      'Content-Type': 'text/event-stream',
      Connection: 'keep-alive',
    },
  });
}

React 프론트에서 SSE 처리 흐름

사용자의 입력(텍스트 또는 파일)을 서버에 전송하고, SSE(Stream) 방식으로 응답을 받아 실시간으로 채팅 UI를 업데이트하는 비동기 함수


1. 함수 요약

const sendMessage = useCallback(async (
  override: string,
  systemPrompt?: string,
  isWebSearch?: boolean,
  temperature?: number,
  topP?: number,
  maxTokens?: number,
  files?: File[],
) => {
  if (isLoading || (!override.trim() && (!files || !files.length))) return;

  if (abortRef.current) abortRef.current.abort();
  abortRef.current = new AbortController();

  const userMsg = {
    id: Date.now(),
    role: 'user',
    text: [override.trim(), files?.map(f => f.name).join(', ')].filter(Boolean).join('\n\n'),
    createdAt: formatKoreanTime(new Date()),
  };

  const loadingMsg = {
    id: userMsg.id + 1,
    role: 'assistant',
    text: files?.length ? '파일 분석 중' : isWebSearch ? '웹 찾아보는 중' : '답변을 생각 중이에요',
    loading: true,
  };

  setMessages(prev => [...prev, userMsg, loadingMsg]);
  setIsLoading(true);

  try {
    const res = await doRequest({ ...params, signal: abortRef.current.signal });
    const reader = res.body.getReader();
    const decoder = new TextDecoder('utf-8');
    let buffer = '', plain = '', urls = [];

    while (true) {
      const { value, done } = await reader.read();
      if (done) break;
      buffer += decoder.decode(value, { stream: true });

      const splitIdx = buffer.lastIndexOf('\n\n');
      if (splitIdx === -1) continue;
      const consumable = buffer.slice(0, splitIdx + 2);
      buffer = buffer.slice(splitIdx + 2);

      for (const jsonStr of parseSSE(consumable)) {
        let parsed;
        try { parsed = JSON.parse(jsonStr); } catch { continue; }

        switch (parsed.type) {
          case 'text':
            plain += parsed.content;
            updateMessageById(loadingMsg.id, { text: plain, loading: false });
            break;
          case 'reference':
            urls.push(parsed.url);
            break;
          case 'metadata':
            if (parsed.imageResults)
              updateMessageById(loadingMsg.id, { imageResults: parsed.imageResults, loading: false });
            break;
          case 'done':
            break;
        }
      }
    }

    finalizeAssistantMsg(loadingMsg.id, plain, urls, isWebSearch);
    setChatId(newId);
    if (!initialId && onAnswerComplete) onAnswerComplete(newId);

  } catch (err) {
    updateMessageById(loadingMsg.id, {
      text: '요청 전송에 실패했습니다. 잠시 후 다시 시도해 주세요.',
      loading: false,
      isError: true,
    });
    finalizeAssistantMsg(loadingMsg.id, '요청 전송에 실패했습니다.', [], isWebSearch);
    if (initialId && onAnswerComplete) onAnswerComplete(initialId);
  } finally {
    setIsLoading(false);
    abortRef.current = null;
  }
}, [...deps]);

주요 기능 설명

1. 입력 유효성 검사 및 요청 중복 방지

if (isLoading || (!override.trim() && (!files || !files.length))) return;
  • 현재 메시지가 전송 중이라면 중복 요청 방지
  • 사용자 입력 텍스트 또는 파일이 하나도 없으면 실행하지 않음

2. AbortController로 이전 요청 중단

if (abortRef.current) abortRef.current.abort();
abortRef.current = new AbortController();
  • 동일한 사용자의 빠른 연속 입력 시, 기존 요청 취소
  • fetch 요청에 signal을 포함시켜 취소 가능하게 구성

3. 사용자 메시지 및 로딩 메시지 구성

const userMsg = { ... };
const loadingMsg = { ... };
setMessages(prev => [...prev, userMsg, loadingMsg]);
  • 사용자 입력 메시지를 messages 상태에 추가
  • 서버 응답이 오기 전까지 보여줄 로딩 메시지도 함께 추가

4. 서버 요청 및 스트리밍 응답 처리

const res = await doRequest({ ... });
const reader = res.body.getReader();
  • doRequest로 서버 API 호출
  • 응답은 SSE 형식 스트림으로 받아 TextDecoder로 실시간 디코딩

5. SSE 이벤트 스트림 처리

for (const jsonStr of parseSSE(consumable)) {
  const parsed = JSON.parse(jsonStr);
  ...
}
  • \n\n 단위로 수신된 응답을 나눠서 JSON 파싱
  • 아래와 같은 type별로 분기 처리

type: text

plain += parsed.content;
updateMessageById(...);
  • AI 응답이 문자열 단위로 전송됨 → 기존 메시지에 누적 갱신

type: reference

urls.push(parsed.url);
  • 추천된 외부 링크 수집

type: metadata

updateMessageById(...);
  • 이미지 검색 결과, YouTube 결과, 토큰 수, 기타 데이터 반영

6. 최종 메시지 마무리 처리

finalizeAssistantMsg(loadingMsg.id, plain, urls, isWebSearch);
  • 로딩 중이던 assistant 메시지를 최종 텍스트와 함께 마무리
  • URL 목록이나 웹검색 여부를 반영

7. 에러 핸들링

updateMessageById(loadingMsg.id, { isError: true, text: '요청 실패...' });
  • 네트워크 오류, 응답 포맷 오류 등 발생 시 사용자에게 안내 메시지 출력

8. 리소스 해제

setIsLoading(false);
abortRef.current = null;
  • 로딩 상태 해제 및 AbortController 초기화

성능 확인

평균 응답 시간: 기존 10~30초 → 3~10초 단축
프런트 리소스 사용량 감소: 타이핑 효과 제거로 불필요 애니메이션 제거


📌 마무리

OpenAI 기반 LLM API에 SSE를 도입하면서 사용자 경험이 눈에 띄게 개선되었습니다. 실시간 피드백, 리소스 절감, 에러 감소까지 여러 장점이 있으며, 특히 질문이 길거나 파일 업로드 등 복합 작업 시에도 타임아웃 없이 안정적으로 동작합니다.

profile
프론트마스터를 꿈꾸는...

0개의 댓글