서버에서 스트림으로 송신하는 데이터를 클라이언트에서 수신할때, 일부 데이터가 Truncate
가 되는 현상이 발견되었다. 그로 인해 Json 구조가 깨지게 되어 예외가 발생한다
로컬 서버에서 OpenAI
로 데이터를 요청할때 로그 확인결과 누락 없음
로컬 서버에서 curl
로 gateway
로 직접 데이터를 요청할때 출력 확인 결과 누락 없음
로컬 서버의 UI
에서 gateway
에 데이터를 요청할때 console.log
출력 누락 확인
javascript fetch api
를 통해 stream 데이터를 처리하고 있다.
const textDecoder = new TextDecoder('utf-8');
// Request
const response = await fetch(streamUrl, {
method: 'POST',
body: JSON.stringify(data),
headers: {
'Content-Type': 'application/json'
}
});
// Stream reader
const reader = response.body.getReader();
while (true) {
const { done, value } = await reader.read();
// Stream status
if (done) break;
// Stream decode
let decodeValue = textDecoder.decode(value);
// Data loop
for (let data of decodeValue.split('data:') ) {
let cleanData = data.replace(/}\n/gi, '}');
// Validate
if (cleanData !== '' && cleanData.startsWith('{')) {
let jsonData = JSON.parse(cleanData);
console.log(jsonData);
}
}
}
fetch
에서 제공하는 getReader
방식을 사용하면 어떤 방식의 코드를 구현하더라도 잘림현상이 발생한다. 또한 split
부분 처럼 1건씩 처리가 되는것이 아닌, N개의 데이터가 수신된다.
수신되는 데이터의 양이 많아서 그런것인가? 라는 생각을 하였지만, 오히려 1건의 데이터가 수신될때도 해당 예외는 발생하였다.
Fetch에 대해 스트림 처리시 비슷한 사례가 있다. 하지만 비슷한 사례일뿐, 실제 같은 환경도 아니며 해당 이슈는 결국 전체 데이터를 받아와서 처리한다.
서버에서 클라이언트로 단방향으로 데이터를 전송하는 웹 기술
현재 데이터의 형식이
data:
가 붙어서 와서 왜그렇지? 했는데 WebClient로 데이터를 송신할때, SSE방식으로 보내서 그런듯 하다.
현재 상황에서 웹소켓방식을 사용하기에는 서버쪽에서의 작업이 필요하다. 그렇다면 SSE를 사용하면 어떨까? 확인해보니 서버의 작업은 필요없고, 클라이언트에서만 작업이 필요하다.
Javascript에서 SSE를 사용할때, EventSource를 통해 이벤트를 발생시켜 서버에서 송신하는 데이터를 수신한다. 단, 여기에는 치명적인 문제가 있었는데 그것은
GET 방식만 지원한다
꽤 오랜시간 리서치를 통해 나와 같은 고민을 하는 사람이 있었고, 고맙게도 이 부분을 다른 방향으로 처리한 글이 있다.
원본 글 Server-Sent Events, but with POST (solovyov.net)
XMLHttpRequest
으로 SSE의 구현 코드를 커스텀
하여 Post 방식의 데이터를 처리할 수 있도록 하였다.
해당 글을 통해 내가 원하는 형태로 재가공하고 호출하여 사용할 수 있도록 했다.
다음은 현재 내가 적용한 코드 이다.
postEventSource.js
function sseevent(message) {
let data = message.replace(/data:/gi, '');
return new MessageEvent('message', {data: JSON.parse(data)})
}
function XhrSource(url, opts) {
const eventTarget = new EventTarget();
const xhr = new XMLHttpRequest();
xhr.open(opts.method || 'GET', url, true);
for (var k in opts.headers) {
xhr.setRequestHeader(k, opts.headers[k]);
}
var ongoing = false, start = 0;
xhr.onprogress = function () {
if (!ongoing) {
// onloadstart is sync with `xhr.send`, listeners don't have a chance
ongoing = true;
eventTarget.dispatchEvent(new Event('open', {
status: xhr.status,
headers: xhr.getAllResponseHeaders(),
url: xhr.responseUrl,
}));
}
var i, chunk;
while ((i = xhr.responseText.indexOf('\n\n', start)) >= 0) {
chunk = xhr.responseText.slice(start, i);
start = i + 2;
if (chunk.length) {
eventTarget.dispatchEvent(sseevent(chunk));
}
}
}
xhr.onloadend = _ => {
eventTarget.dispatchEvent(new CloseEvent('close'))
}
xhr.timeout = opts.timeout;
xhr.ontimeout = _ => {
eventTarget.dispatchEvent(new CloseEvent('error', {reason: 'Network request timed out'}));
}
xhr.onerror = _ => {
eventTarget.dispatchEvent(new CloseEvent('error', {reason: xhr.responseText || 'Network request failed'}));
}
xhr.onabort = _ => {
eventTarget.dispatchEvent(new CloseEvent('error', {reason: 'Network request aborted'}));
}
eventTarget.close = _ => {
xhr.abort();
}
xhr.send(opts.body);
return eventTarget;
}
chat.html
// Call Event XhrSource
const eventTarget = XhrSource(streamUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(data)
});
// Registry event handler 'open'
eventTarget.addEventListener('open', function (event) {
console.log('Connection opened:', event);
});
// Registry event handler 'message'
eventTarget.addEventListener('message', function (event) {
console.log(event.data);
// Do something...
});
// Registry event handler 'close'
eventTarget.addEventListener('close', function (event) {
console.log('Connection closed:', event);
});
해당 방식으로 진행하면, 데이터의 잘림 현상도 사라지고, data:
가 필터링되어 데이터를 받을때 Json 구조로 받을 수 있게 된다.
Stream 'Open'
Stream 'Close'