Server-Sent Events로 구현하는 AI 채팅 - Part 2: POST 요청이 가능한 커스텀 SSE 클라이언트와 React Query 통합하기

seo·2025년 2월 27일
1
post-thumbnail

이전 글에서는 AI 채팅의 실시간 응답 구현을 위해 SSE(Server-Sent Events)를 선택한 배경과 기본적인 개념들을 소개했습니다. AI 채팅 서비스에서는 코드 리뷰를 요청하거나, 긴 분석 보고서를 공유하고 싶을 때, 혹은 여러 파일을 함께 전송해야 하는 요구사항이 존재했습니다.

브라우저에서 기본으로 제공하는 표준 EventSource는 GET 요청만 가능하고, 요청 본문을 포함할 수 없으며, 헤더 설정에 제한이 있다는 한계가 있었습니다. 이번 글에서는 POST 요청을 지원하는 커스텀 EventSource 클라이언트 구현을 살펴보겠습니다.

Custom EventSource 구현

POST 요청을 지원하는 EventSource 구현을 위해 먼저 기존 라이브러리들을 검토했지만, 적절한 대안을 찾기 어려웠습니다.

대부분의 라이브러리들이 다음과 같은 문제를 가지고 있었습니다

  • 활발한 유지보수가 이루어지지 않음
  • 충분한 커뮤니티 지원이나 검증이 부족
  • 불필요한 기능들이 포함되어 있어 번들 사이즈가 증가

이러한 이유로 추가적인 의존성 없이 필요한 기능만을 포함한 커스텀 EventSource를 직접 구현하기로 결정했습니다.

AI 채팅에서의 요구사항을보면, SSE 연결은 다음과 같은 패턴으로 이루어집니다

  • 사용자가 메시지를 전송하면서 새로운 SSE 연결 시작
  • AI가 응답을 생성하는 동안 연결 유지
  • 응답이 완료되면 연결 종료

하나의 연결이 오래 지속되지 않고, 각 대화는 새로운 연결로 이루어집니다. 브라우저의 EventSource가 제공하는 자동 재연결 메커니즘은 현재 단계에서 구현 우선순위가 높지 않다고 판단했습니다. 대신 사용자가 긴 메시지와 파일을 전송할 수 있도록 하는 POST 요청 지원 기능에 집중했습니다.

사용 예시

브라우저의 기본 EventSource와 유사한 인터페이스를 따르도록 설계했습니다. SSEClient는 POST 요청, 커스텀 헤더, 요청 본문 설정 등 표준 EventSource에서 지원하지 않는 추가 기능들을 제공합니다.


const sseClient = new SSEClient({
  url: '/api/chat',
  method: 'POST',
  body: { message: '안녕하세요' }
});

// 이벤트 리스너 등록
sseClient.addEventListener('message', (data) => {
  console.log('AI 응답:', data.text);
});

// 연결 시작
sseClient.connect();

// 필요시 연결 종료
sseClient.stop();

SSEClient 클래스 구조 및 주요 기능

SSEClient 클래스의 전체 코드는 여기에서 확인할 수 있습니다. 주요 기능들에 대해 자세히 살펴보겠습니다.

연결 관리

SSE 클라이언트에서 가장 핵심이 되는 부분은 서버와의 연결을 수립하고 스트림 데이터를 받아오는 부분입니다.

async connect(): Promise<void> {
  // 1. SSE 연결 설정
  const response = await SSEFetch<Body>(this.config);

  // 2. 응답 스트림을 텍스트로 디코딩
  const stream = response.body?.pipeThrough(new TextDecoderStream());

  // 3. 스트림 데이터 처리
  await getBytes(stream, this.processEventChunks);
}

1. SSE 연결 설정

  • SSEFetch 함수를 사용하여 서버와 SSE 프로토콜 기반의 연결을 설정합니다. 설정된 URL로 요청을 보내고, 서버로부터 스트림 응답을 받아옵니다.
  • AbortController의 signal을 함께 전달하여 필요 시 요청 자체를 취소할 수 있습니다.
  • Axios는 XMLHttpRequest(XHR) 기반으로 구현되어 있어 스트림 데이터를 처리할 수 없기 때문에 Fetch API를 사용해야 합니다.
export const SSEFetch = async <T>({
  url,
  method,
  headers,
  body,
}: SSEFetchPrams<T>) => {
  const response = await fetch(url, {
    method: method || 'POST',
    headers: createHeaders(accessToken, headers),
    ...(body && { body: JSON.stringify(body) }),
  });
  // 에러 처리 및 토큰 갱신 로직
  return response;
};

2. 응답 스트림을 텍스트로 디코딩

Fetch API 요청 시 response.body의 타입은 ReadableStream입니다. ReadableStream은 Web API의 일부인 Streams API를 통해 제공되며, 수신하는 리소스를 작은 청크로 분해하여 점진적으로 처리할 수 있게 해줍니다.

먼저 서버에서 받은 바이너리 데이터를 텍스트로 변환하는 과정이 필요합니다

const stream = response.body?.pipeThrough(new TextDecoderStream());

TextDecoderStreamReadableStream을 통해 들어오는 바이트 데이터를 UTF-8 텍스트로 디코딩합니다. 서버로부터 전송된 SSE 메시지를 JavaScript에서 처리할 수 있는 텍스트 형태로 변환하는 역할을 합니다.

이렇게 변환된 스트림은 다음 단계에서 getReader() 메서드를 통해 청크 단위로 읽을 수 있게 됩니다.

3. 스트림 데이터 처리

텍스트로 변환된 스트림에서 데이터를 읽는 과정은 getBytes 함수를 통해 이루어집니다.


export async function getBytes(
  stream: ReadableStream<string>,
  onChunk: (arr: string) => void,
  signal?: AbortSignal
) {
  const reader = stream.getReader();

  try {
    while (true) {
      if (signal?.aborted) {
        throw new DOMException('Stream aborted by user', 'AbortError');
      }

      const { value, done } = await reader.read();
      if (done) {
        break;
      }
      onChunk(value);
    }
  } catch (error) {
    if (error instanceof Error) {
      throw error;
    }
    throw new SSEError('Unknown error');
  } finally {
    reader.releaseLock();
  }
}
  • Reader의 상태와 잠금 메커니즘
    • getReader()를 호출하면 해당 스트림은 reader에 잠기게 됩니다. 이 상태에서는 다른 reader가 스트림에 접근할 수 없고 반드시 잠금이 해제되어야 다른 곳에서 스트림을 사용할 수 있습니다.
    • releaseLock()을 통해 스트림의 잠금을 해제할 수 있습니다. 채팅 시나리오에서는 여러 곳에서 동시에 스트림을 읽는 경우가 거의 없지만 코드의 안정성과 자원 관리를 위해 finally 블록에서 잠금을 해제해 주었습니다.
  • Promise 기반의 데이터 처리
    • read() 메서드는 Promise를 반환하며, 다음과 같은 상태를 처리합니다.
      • 데이터 수신: { value: chunk, done: false }
      • 스트림 종료: { value: undefined, done: true }
      • 에러 발생: Promise rejection과 함께 관련 에러가 전달됩니다.
  • 백프레셔(Backpressure) 처리
    • reader.read()는 내부적으로 백프레셔 메커니즘을 자동으로 처리합니다.
    • 데이터를 처리하는 속도가 느려져도 자동으로 데이터 흐름을 조절하여 청크가 누락되지 않고 안전하게 처리될 수 있게 해줍니다.
  • 중단 처리(Aborting)
    • 사용자 경험을 위해 AbortSignal을 통한 스트림 처리 중단 기능도 지원합니다.
    • 사용자가 요청을 취소하거나 타임아웃이 발생했을 때 signal.aborted가 true로 설정되어 스트림 처리를 즉시 중단할 수 있습니다.
    • 웹 표준에 따라 DOMException('Stream aborted by user', 'AbortError') 오류를 발생시켜 일관된 에러 처리가 가능하도록 했습니다.

ReadableStream API는 이런 방식으로 네트워크 상태나 서버의 데이터 전송 속도와 무관하게 안정적인 데이터 처리를 가능하게 합니다. 특히 내장된 백프레셔 기능은 데이터 흐름을 자동으로 조절하여 손실 없는 스트리밍을 보장합니다.

청크 데이터의 구조

getBytes 함수에서 onChunk 콜백으로 전달되는 데이터의 구조를 살펴보겠습니다. 실제로 value를 콘솔에 출력해보면, 예상과 달리 이벤트 단위가 아닌 불규칙한 청크 단위로 데이터가 전달됩니다:


console.log('Received chunk:', value);

// 출력 결과:
Received chunk: "event: message
data: {"text": "안녕하세요"}

event: message
data: {"text": "첫 번째 메시지입니다"}

event: message
data: {"text": "두 번째 메시지입니다"}"
--------------------------------------------------------
// 다음 청크
Received chunk: "event: message
data: {"text": "세 번째 메시지입니다"}

event: message
data: {"text": "네 번째 메시지입니다"}

event: message
data: {"text": "다섯 번째 메시지입니다"}

event: message
data: {"text": "여섯 번째 메시지입니다"}"
--------------------------------------------------------
// 마지막 청크
Received chunk: "event: message
data: {"text": "일곱 번째 메시지입니다"}

event: close
data: {"reason": "conversation_completed"}"
  • 청크의 크기는 일정하지 않고, 한 번의 청크에 여러 개의 이벤트가 포함될 수 있습니다.
  • 각 이벤트는 줄바꿈 문자(\n\n)로 구분됩니다.

받은 청크를 줄바꿈(\n\n)을 기준으로 개별 이벤트로 분리하는 처리가 필요합니다. 다음 단계에서는 청크 데이터를 SSE 프로토콜에 맞게 파싱하여 실제 이벤트 타입과 데이터를 추출하는 방법을 살펴보겠습니다.

이벤트 청크 처리

불규칙한 크기로 전달되는 청크 데이터를 라인 단위로 분석하고 완성된 이벤트를 생성하는 과정은 다음과 같은 세 단계로 이루어집니다.


// 1. 청크 데이터 처리
  private processEventChunks = (chunk: string): void => {
    let start = 0;
    const length = chunk.length;

    for (let i = 0; i < length; i++) {
      // CRLF (\r\n) 처리
      if (this.isRecognizingCRLF && chunk[i] === '\n') {
        this.isRecognizingCRLF = false;
        start = this.processLine(chunk, start, i - 1);
        continue;
      }

      this.isRecognizingCRLF = false;

      // \r 또는 \n 처리
      if (chunk[i] === '\r' || chunk[i] === '\n') {
        start = this.processLine(chunk, start, i);
        this.isRecognizingCRLF = chunk[i] === '\r';
      }
    }

    // 청크(chunk)가 줄 단위로 완전히 끝나지 않고 중간에 잘려서 온 경우
    if (start < length) {
      this.eventChunksBuffer += chunk.slice(start);
    }
  };
  
  // 2. 라인 단위 처리
  private processLine = (chunk: string, start: number, end: number) => {
  const lineContent = chunk.slice(start, end);

  if (lineContent.length === 0) {
    this.processEventBuffer();
    return end + 1;
  }
  this.eventChunksBuffer += lineContent + '\n';
  return end + 1;
};

// 3. 이벤트 버퍼 처리 
private processEventBuffer = () => {
  if (this.eventChunksBuffer.length === 0) return;

  const event = parseSSEEvent(this.eventChunksBuffer);

  if (event) {
    this.handleEvent(event.type, event.data);
  }
  this.eventChunksBuffer = '';
};

1. 청크 데이터 처리 (processEventChunks)

스트림에서 전달받은 불규칙한 크기의 청크를 한 문자씩 살펴보며 줄바꿈 문자를 찾아냅니다.

  • 서버 환경에 따라 다양하게 나타날 수 있는 줄바꿈 형식(\n, \r\n, \r)을 모두 인식하여 안정적인 처리가 가능합니다.
  • 줄바꿈을 발견하면 해당 라인을 processLine 함수로 넘겨 처리합니다.
  • 청크가 중간에 잘려서 도착한 경우에도 나머지 부분을 버퍼에 저장해두고 다음 청크가 도착했을 때 함께 처리합니다.

2. 라인 단위 처리 (processLine)

첫 번째 단계에서 찾아낸 각 라인(예: event: message", "data: {...})을 수집하는 역할을 합니다.

  • 일반 필드 라인은 eventChunksBuffer에 추가하여 이벤트를 구성합니다.
  • 빈 줄을 만나면 하나의 완전한 이벤트가 형성되었다고 판단하고, processEventBuffer를 호출하여 이벤트 처리를 시작합니다.

3. 이벤트 버퍼 처리 (processEventBuffer)

이 이벤트 버퍼가 필요한 이유는 하단의 트러블 슈팅 부분 참고

  • 버퍼에 모아둔 필드 라인들을 parseSSEEvent 함수에 전달하여 이벤트 객체로 변환합니다.
  • 이벤트 타입(message, error, close 등)에 따라 적절한 핸들러 함수를 호출합니다.
  • 이벤트 처리가 끝나면 버퍼를 비워 다음 이벤트를 받을 준비를 합니다.

이벤트 파싱 (parseSSEEvent)

라인 단위로 분리된 문자열을 구조화된 객체 형태로 변환하는 과정을 살펴보겠습니다.

export const parseSSEEvent = (eventString: string) => {
  const eventTypeMatch = eventString.match(EVENT_REGEX);
  const dataMatch = eventString.match(DATA_REGEX) || eventString.match(ERROR_REGEX);
  const idMatch = eventString.match(ID_REGEX);

  if (!eventTypeMatch || !dataMatch) {
    return null;
  }

  try {
    return {
      id: idMatch?.[1] ?? null,
      type: eventTypeMatch[1] as EventType,
      data: JSON.parse(dataMatch[1]),
    };
  } catch (error) {
    throw new SSEError('json parsing error');
  }
};
  • 정규식으로 문자열에서 event:, data:, id: 필드를 찾아냅니다.
  • 추출된 필드들을 JavaScript 객체 형태({type, data, id})로 변환합니다.
  • 이벤트 타입이나 데이터 같은 필수 필드가 없으면 null을 반환해 불완전한 이벤트는 걸러냅니다.
  • 문자열 형태의 데이터는 JSON.parse로 JavaScript 객체로 파싱합니다.
  • JSON 파싱 중 오류가 생기면 적절한 에러 메시지와 함께 예외를 발생시킵니다.

이벤트 핸들링 시스템

옵저버 패턴(Observer Pattern)을 활용하여 이벤트 처리 시스템을 구현했습니다.

private eventHandlers: Map<
  keyof EventTypes,
  EventHandlerMap<EventTypes>[keyof EventTypes][]
> = new Map();

addEventListener<K extends keyof EventTypes>(
  eventType: K,
  callback: EventHandlerMap<EventTypes>[K]
): void {
  if (!this.eventHandlers.has(eventType)) {
    this.eventHandlers.set(eventType, []);
  }
  const handlers = this.eventHandlers.get(eventType);
  if (handlers) {
    handlers.push(callback as EventHandlerMap<EventTypes>[keyof EventTypes]);
  }
}

private handleEvent<K extends keyof EventTypes>(
  type: K,
  data: EventTypes[K]
): void {
  const handlers = this.eventHandlers.get(type);
  if (handlers) {
    handlers.forEach((handler) => handler(data));
  }
  if (type === 'error') {
    const errorData = data as SSEErrorData;
    throw new SSEError(
			 errorData.code
    );
  }
}
  • 옵저버 패턴 구현
    • addEventListener: 이벤트 타입별로 핸들러를 등록
    • handleEvent: 이벤트 발생 시 등록된 핸들러들을 호출

옵저버 패턴 기반 이벤트 핸들링 시스템을 통해 이벤트 발생과 처리 로직을 분리하여 코드의 결합도를 낮추고 새로운 이벤트 타입 추가 시 기존 코드를 수정하지 않고도 확장할 수 있도록 설계했습니다.

SSEClient 클래스 전체 코드

SSE와 React Query

프로젝트 내에서 모든 서버 통신을 React Query의 mutation으로 처리하고 있었기 때문에 SSE 연동도 같은 패턴을 유지하고 싶었습니다. React Query의 선언적인 데이터 관리 방식을 실시간 스트리밍에도 적용하면 코드 일관성을 유지하면서 복잡한 상태 관리를 추상화할 수 있겠다고 생각했습니다. React Query의 메인테이너가 블로그에서 WebSocket과 React Query를 통합하는 방법에 대해 작성한 글을 참고하여 유사한 패턴을 적용했습니다. TkDodo가 강조한 핵심은 다음과 같습니다.

React Query는 데이터를 가져오는 방식에는 관여하지 않으며, 성공 또는 실패를 반환하는 Promise만 있으면 된다

즉, fetch API, axios, WebSocket, SSE 등 어떤 방식을 쓰더라도 Promise로 결과만 반환하면 React Query의 상태 관리 기능을 활용할 수 있습니다.


const createChat = async ({ params, callbacks }) => {
  let resolvePromise: (value: ChatResponse) => void;
  let lastData: ChatResponse = { content: '', citations: [] };

	
	const abort = () => {
    if (client) {
      client.stop();
      resolvePromise({ ...lastData });
      callbacks.onAbort?.();
    }
  };
  
  const promise = new Promise<ChatResponse>((resolve) => {
    resolvePromise = resolve;

    const client = new SSEClient({/*...*/});
    
    client.addEventListener('message', (data) => {
      lastData = { ...lastData, ...data };
      callbacks.onMessage(data);
    });

    client.addEventListener('close', () => {
      resolve(lastData);
      client?.cleanup();
    });

    client.connect().catch((error) => {
      client?.cleanup();
      reject(error);
    });
    
  });

  return { promise, abort };
};

const useMultiModelFileAiChatMutation = (options) => {
  const abortRef = useRef(null);
  
  const mutation = useMutation({
    mutationFn: async (params) => {
      const { promise, abort } = await createChat({ ...params });
      abortRef.current = abort;
      return promise;
    },
    ...options,
  });

  const abort = useCallback(() => {
    if (abortRef.current) {
      abortRef.current();
      abortRef.current = null;
    }
  }, []);

  return { ...mutation, abort };
};
  • 이벤트와 Promise 연결: 이벤트 기반 SSE를 Promise 객체로 한번 감싸서 Promise 기반의 React Query와 연결했습니다
  • 상태 누적: 스트림으로 들어오는 각 메시지를 누적하여 최종 상태(lastData)를 만듭니다.
  • 콜백 인터페이스: 실시간 업데이트를 처리할 수 있는 콜백 시스템을 제공합니다.
  • 중단 메커니즘: 사용자가 필요할 때 스트림을 중단할 수 있는 abort 기능을 제공합니다

이런 접근 방식의 가장 큰 장점은 컴포넌트가 SSE의 복잡한 내부 동작을 알 필요 없이 익숙한 React Query 패턴으로 데이터를 다룰 수 있다는 점입니다. isLoading, isError와 같은 상태 플래그를 활용하여 UI를 선언적으로 관리할 수 있습니다.


// 사용하는 컴포넌트 관점
const ChatComponent = () => {
  const { mutate, isLoading, isError } = useMultiModelAiChatMutation({
    onSuccess: (data) => {
      // 채팅 완료 처리
    },
    onError: (error) => {
      // 에러 처리
    }
  });

  const handleSubmit = () => {
    mutate({
      params: { message, modelId },
      callbacks: {
        onMessage: (message) => {
          // 실시간 메시지 처리
        }
      }
    });
  };
};

마무리

이번 글에서는 브라우저의 기본 EventSource의 한계를 극복하기 위한 커스텀 SSE 클라이언트 구현과 React Query와의 통합 과정을 살펴보았습니다. POST 요청 지원, 커스텀 헤더 설정, 스트리밍 데이터를 파싱하는 로직을 구현하여 AI 채팅 서비스에 실시간 응답 기능을 적용했습니다.

옵저버 패턴을 활용한 이벤트 처리 시스템과 Promise 기반의 React Query 연동을 통해 스트리밍 데이터도 다른 API 요청과 동일한 패턴으로 관리할 수 있게 되었습니다. 덕분에 서버와 통신하는 코드를 일관된 방식으로 작성할 수 있고, 사용자에게는 실시간으로 응답하는 채팅 기능을 제공할 수 있었습니다.

다음 글에서는 이 시스템을 더욱 발전시키기 위한 성능 최적화 전략에 대해 다루려고 합니다. 스로틀링을 적용한 렌더링 최적화, 메모리 누수 문제 해결 방법, 그리고 이 모든 기능의 안정성을 보장하기 위한 단위 테스트 전략까지 살펴보겠습니다.

0개의 댓글