이전 글에서는 AI 채팅의 실시간 응답 구현을 위해 SSE(Server-Sent Events)를 선택한 배경과 기본적인 개념들을 소개했습니다. AI 채팅 서비스에서는 코드 리뷰를 요청하거나, 긴 분석 보고서를 공유하고 싶을 때, 혹은 여러 파일을 함께 전송해야 하는 요구사항이 존재했습니다.
브라우저에서 기본으로 제공하는 표준 EventSource는 GET 요청만 가능하고, 요청 본문을 포함할 수 없으며, 헤더 설정에 제한이 있다는 한계가 있었습니다. 이번 글에서는 POST 요청을 지원하는 커스텀 EventSource 클라이언트 구현을 살펴보겠습니다.
POST 요청을 지원하는 EventSource 구현을 위해 먼저 기존 라이브러리들을 검토했지만, 적절한 대안을 찾기 어려웠습니다.
대부분의 라이브러리들이 다음과 같은 문제를 가지고 있었습니다
이러한 이유로 추가적인 의존성 없이 필요한 기능만을 포함한 커스텀 EventSource를 직접 구현하기로 결정했습니다.
AI 채팅에서의 요구사항을보면, SSE 연결은 다음과 같은 패턴으로 이루어집니다
하나의 연결이 오래 지속되지 않고, 각 대화는 새로운 연결로 이루어집니다. 브라우저의 EventSource가 제공하는 자동 재연결 메커니즘은 현재 단계에서 구현 우선순위가 높지 않다고 판단했습니다. 대신 사용자가 긴 메시지와 파일을 전송할 수 있도록 하는 POST 요청 지원 기능에 집중했습니다.
브라우저의 기본 EventSource와 유사한 인터페이스를 따르도록 설계했습니다. SSEClient는 POST 요청, 커스텀 헤더, 요청 본문 설정 등 표준 EventSource에서 지원하지 않는 추가 기능들을 제공합니다.
const sseClient = new SSEClient({
url: '/api/chat',
method: 'POST',
body: { message: '안녕하세요' }
});
// 이벤트 리스너 등록
sseClient.addEventListener('message', (data) => {
console.log('AI 응답:', data.text);
});
// 연결 시작
sseClient.connect();
// 필요시 연결 종료
sseClient.stop();
SSEClient 클래스의 전체 코드는 여기에서 확인할 수 있습니다. 주요 기능들에 대해 자세히 살펴보겠습니다.
SSE 클라이언트에서 가장 핵심이 되는 부분은 서버와의 연결을 수립하고 스트림 데이터를 받아오는 부분입니다.
async connect(): Promise<void> {
// 1. SSE 연결 설정
const response = await SSEFetch<Body>(this.config);
// 2. 응답 스트림을 텍스트로 디코딩
const stream = response.body?.pipeThrough(new TextDecoderStream());
// 3. 스트림 데이터 처리
await getBytes(stream, this.processEventChunks);
}
1. SSE 연결 설정
SSEFetch
함수를 사용하여 서버와 SSE 프로토콜 기반의 연결을 설정합니다. 설정된 URL로 요청을 보내고, 서버로부터 스트림 응답을 받아옵니다.export const SSEFetch = async <T>({
url,
method,
headers,
body,
}: SSEFetchPrams<T>) => {
const response = await fetch(url, {
method: method || 'POST',
headers: createHeaders(accessToken, headers),
...(body && { body: JSON.stringify(body) }),
});
// 에러 처리 및 토큰 갱신 로직
return response;
};
2. 응답 스트림을 텍스트로 디코딩
Fetch API 요청 시 response.body의 타입은 ReadableStream
입니다. ReadableStream
은 Web API의 일부인 Streams API를 통해 제공되며, 수신하는 리소스를 작은 청크로 분해하여 점진적으로 처리할 수 있게 해줍니다.
먼저 서버에서 받은 바이너리 데이터를 텍스트로 변환하는 과정이 필요합니다
const stream = response.body?.pipeThrough(new TextDecoderStream());
TextDecoderStream
은 ReadableStream
을 통해 들어오는 바이트 데이터를 UTF-8 텍스트로 디코딩합니다. 서버로부터 전송된 SSE 메시지를 JavaScript에서 처리할 수 있는 텍스트 형태로 변환하는 역할을 합니다.
이렇게 변환된 스트림은 다음 단계에서 getReader()
메서드를 통해 청크 단위로 읽을 수 있게 됩니다.
3. 스트림 데이터 처리
텍스트로 변환된 스트림에서 데이터를 읽는 과정은 getBytes 함수를 통해 이루어집니다.
export async function getBytes(
stream: ReadableStream<string>,
onChunk: (arr: string) => void,
signal?: AbortSignal
) {
const reader = stream.getReader();
try {
while (true) {
if (signal?.aborted) {
throw new DOMException('Stream aborted by user', 'AbortError');
}
const { value, done } = await reader.read();
if (done) {
break;
}
onChunk(value);
}
} catch (error) {
if (error instanceof Error) {
throw error;
}
throw new SSEError('Unknown error');
} finally {
reader.releaseLock();
}
}
getReader()
를 호출하면 해당 스트림은 reader에 잠기게 됩니다. 이 상태에서는 다른 reader가 스트림에 접근할 수 없고 반드시 잠금이 해제되어야 다른 곳에서 스트림을 사용할 수 있습니다.releaseLock()
을 통해 스트림의 잠금을 해제할 수 있습니다. 채팅 시나리오에서는 여러 곳에서 동시에 스트림을 읽는 경우가 거의 없지만 코드의 안정성과 자원 관리를 위해 finally
블록에서 잠금을 해제해 주었습니다.read()
메서드는 Promise를 반환하며, 다음과 같은 상태를 처리합니다.{ value: chunk, done: false }
{ value: undefined, done: true }
reader.read()
는 내부적으로 백프레셔 메커니즘을 자동으로 처리합니다.ReadableStream API는 이런 방식으로 네트워크 상태나 서버의 데이터 전송 속도와 무관하게 안정적인 데이터 처리를 가능하게 합니다. 특히 내장된 백프레셔 기능은 데이터 흐름을 자동으로 조절하여 손실 없는 스트리밍을 보장합니다.
청크 데이터의 구조
getBytes
함수에서 onChunk
콜백으로 전달되는 데이터의 구조를 살펴보겠습니다. 실제로 value
를 콘솔에 출력해보면, 예상과 달리 이벤트 단위가 아닌 불규칙한 청크 단위로 데이터가 전달됩니다:
console.log('Received chunk:', value);
// 출력 결과:
Received chunk: "event: message
data: {"text": "안녕하세요"}
event: message
data: {"text": "첫 번째 메시지입니다"}
event: message
data: {"text": "두 번째 메시지입니다"}"
--------------------------------------------------------
// 다음 청크
Received chunk: "event: message
data: {"text": "세 번째 메시지입니다"}
event: message
data: {"text": "네 번째 메시지입니다"}
event: message
data: {"text": "다섯 번째 메시지입니다"}
event: message
data: {"text": "여섯 번째 메시지입니다"}"
--------------------------------------------------------
// 마지막 청크
Received chunk: "event: message
data: {"text": "일곱 번째 메시지입니다"}
event: close
data: {"reason": "conversation_completed"}"
\n\n
)로 구분됩니다.받은 청크를 줄바꿈(\n\n
)을 기준으로 개별 이벤트로 분리하는 처리가 필요합니다. 다음 단계에서는 청크 데이터를 SSE 프로토콜에 맞게 파싱하여 실제 이벤트 타입과 데이터를 추출하는 방법을 살펴보겠습니다.
불규칙한 크기로 전달되는 청크 데이터를 라인 단위로 분석하고 완성된 이벤트를 생성하는 과정은 다음과 같은 세 단계로 이루어집니다.
// 1. 청크 데이터 처리
private processEventChunks = (chunk: string): void => {
let start = 0;
const length = chunk.length;
for (let i = 0; i < length; i++) {
// CRLF (\r\n) 처리
if (this.isRecognizingCRLF && chunk[i] === '\n') {
this.isRecognizingCRLF = false;
start = this.processLine(chunk, start, i - 1);
continue;
}
this.isRecognizingCRLF = false;
// \r 또는 \n 처리
if (chunk[i] === '\r' || chunk[i] === '\n') {
start = this.processLine(chunk, start, i);
this.isRecognizingCRLF = chunk[i] === '\r';
}
}
// 청크(chunk)가 줄 단위로 완전히 끝나지 않고 중간에 잘려서 온 경우
if (start < length) {
this.eventChunksBuffer += chunk.slice(start);
}
};
// 2. 라인 단위 처리
private processLine = (chunk: string, start: number, end: number) => {
const lineContent = chunk.slice(start, end);
if (lineContent.length === 0) {
this.processEventBuffer();
return end + 1;
}
this.eventChunksBuffer += lineContent + '\n';
return end + 1;
};
// 3. 이벤트 버퍼 처리
private processEventBuffer = () => {
if (this.eventChunksBuffer.length === 0) return;
const event = parseSSEEvent(this.eventChunksBuffer);
if (event) {
this.handleEvent(event.type, event.data);
}
this.eventChunksBuffer = '';
};
1. 청크 데이터 처리 (processEventChunks
)
스트림에서 전달받은 불규칙한 크기의 청크를 한 문자씩 살펴보며 줄바꿈 문자를 찾아냅니다.
\n
, \r\n
, \r
)을 모두 인식하여 안정적인 처리가 가능합니다.processLine
함수로 넘겨 처리합니다.2. 라인 단위 처리 (processLine
)
첫 번째 단계에서 찾아낸 각 라인(예: event: message", "data: {...}
)을 수집하는 역할을 합니다.
processEventBuffer
를 호출하여 이벤트 처리를 시작합니다.3. 이벤트 버퍼 처리 (processEventBuffer
)
이 이벤트 버퍼가 필요한 이유는 하단의 트러블 슈팅 부분 참고
parseSSEEvent
함수에 전달하여 이벤트 객체로 변환합니다.라인 단위로 분리된 문자열을 구조화된 객체 형태로 변환하는 과정을 살펴보겠습니다.
export const parseSSEEvent = (eventString: string) => {
const eventTypeMatch = eventString.match(EVENT_REGEX);
const dataMatch = eventString.match(DATA_REGEX) || eventString.match(ERROR_REGEX);
const idMatch = eventString.match(ID_REGEX);
if (!eventTypeMatch || !dataMatch) {
return null;
}
try {
return {
id: idMatch?.[1] ?? null,
type: eventTypeMatch[1] as EventType,
data: JSON.parse(dataMatch[1]),
};
} catch (error) {
throw new SSEError('json parsing error');
}
};
event:
, data:
, id:
필드를 찾아냅니다.{type, data, id}
)로 변환합니다.null
을 반환해 불완전한 이벤트는 걸러냅니다.옵저버 패턴(Observer Pattern)을 활용하여 이벤트 처리 시스템을 구현했습니다.
private eventHandlers: Map<
keyof EventTypes,
EventHandlerMap<EventTypes>[keyof EventTypes][]
> = new Map();
addEventListener<K extends keyof EventTypes>(
eventType: K,
callback: EventHandlerMap<EventTypes>[K]
): void {
if (!this.eventHandlers.has(eventType)) {
this.eventHandlers.set(eventType, []);
}
const handlers = this.eventHandlers.get(eventType);
if (handlers) {
handlers.push(callback as EventHandlerMap<EventTypes>[keyof EventTypes]);
}
}
private handleEvent<K extends keyof EventTypes>(
type: K,
data: EventTypes[K]
): void {
const handlers = this.eventHandlers.get(type);
if (handlers) {
handlers.forEach((handler) => handler(data));
}
if (type === 'error') {
const errorData = data as SSEErrorData;
throw new SSEError(
errorData.code
);
}
}
addEventListener
: 이벤트 타입별로 핸들러를 등록handleEvent
: 이벤트 발생 시 등록된 핸들러들을 호출옵저버 패턴 기반 이벤트 핸들링 시스템을 통해 이벤트 발생과 처리 로직을 분리하여 코드의 결합도를 낮추고 새로운 이벤트 타입 추가 시 기존 코드를 수정하지 않고도 확장할 수 있도록 설계했습니다.
프로젝트 내에서 모든 서버 통신을 React Query의 mutation
으로 처리하고 있었기 때문에 SSE 연동도 같은 패턴을 유지하고 싶었습니다. React Query의 선언적인 데이터 관리 방식을 실시간 스트리밍에도 적용하면 코드 일관성을 유지하면서 복잡한 상태 관리를 추상화할 수 있겠다고 생각했습니다. React Query의 메인테이너가 블로그에서 WebSocket과 React Query를 통합하는 방법에 대해 작성한 글을 참고하여 유사한 패턴을 적용했습니다. TkDodo가 강조한 핵심은 다음과 같습니다.
React Query는 데이터를 가져오는 방식에는 관여하지 않으며, 성공 또는 실패를 반환하는 Promise만 있으면 된다
즉, fetch API, axios, WebSocket, SSE 등 어떤 방식을 쓰더라도 Promise로 결과만 반환하면 React Query의 상태 관리 기능을 활용할 수 있습니다.
const createChat = async ({ params, callbacks }) => {
let resolvePromise: (value: ChatResponse) => void;
let lastData: ChatResponse = { content: '', citations: [] };
const abort = () => {
if (client) {
client.stop();
resolvePromise({ ...lastData });
callbacks.onAbort?.();
}
};
const promise = new Promise<ChatResponse>((resolve) => {
resolvePromise = resolve;
const client = new SSEClient({/*...*/});
client.addEventListener('message', (data) => {
lastData = { ...lastData, ...data };
callbacks.onMessage(data);
});
client.addEventListener('close', () => {
resolve(lastData);
client?.cleanup();
});
client.connect().catch((error) => {
client?.cleanup();
reject(error);
});
});
return { promise, abort };
};
const useMultiModelFileAiChatMutation = (options) => {
const abortRef = useRef(null);
const mutation = useMutation({
mutationFn: async (params) => {
const { promise, abort } = await createChat({ ...params });
abortRef.current = abort;
return promise;
},
...options,
});
const abort = useCallback(() => {
if (abortRef.current) {
abortRef.current();
abortRef.current = null;
}
}, []);
return { ...mutation, abort };
};
이런 접근 방식의 가장 큰 장점은 컴포넌트가 SSE의 복잡한 내부 동작을 알 필요 없이 익숙한 React Query 패턴으로 데이터를 다룰 수 있다는 점입니다. isLoading
, isError
와 같은 상태 플래그를 활용하여 UI를 선언적으로 관리할 수 있습니다.
// 사용하는 컴포넌트 관점
const ChatComponent = () => {
const { mutate, isLoading, isError } = useMultiModelAiChatMutation({
onSuccess: (data) => {
// 채팅 완료 처리
},
onError: (error) => {
// 에러 처리
}
});
const handleSubmit = () => {
mutate({
params: { message, modelId },
callbacks: {
onMessage: (message) => {
// 실시간 메시지 처리
}
}
});
};
};
이번 글에서는 브라우저의 기본 EventSource의 한계를 극복하기 위한 커스텀 SSE 클라이언트 구현과 React Query와의 통합 과정을 살펴보았습니다. POST 요청 지원, 커스텀 헤더 설정, 스트리밍 데이터를 파싱하는 로직을 구현하여 AI 채팅 서비스에 실시간 응답 기능을 적용했습니다.
옵저버 패턴을 활용한 이벤트 처리 시스템과 Promise 기반의 React Query 연동을 통해 스트리밍 데이터도 다른 API 요청과 동일한 패턴으로 관리할 수 있게 되었습니다. 덕분에 서버와 통신하는 코드를 일관된 방식으로 작성할 수 있고, 사용자에게는 실시간으로 응답하는 채팅 기능을 제공할 수 있었습니다.
다음 글에서는 이 시스템을 더욱 발전시키기 위한 성능 최적화 전략에 대해 다루려고 합니다. 스로틀링을 적용한 렌더링 최적화, 메모리 누수 문제 해결 방법, 그리고 이 모든 기능의 안정성을 보장하기 위한 단위 테스트 전략까지 살펴보겠습니다.