현업 프로젝트에서 실시간 로그를 받아서 보여줘야하는 케이스가 있어, 이를 따로 기록해두면 좋을거 같아 게시글을 쓴다.
프로젝트 상황상 데이터를 받는 환경이 api 외에도 실시간으로 받는 데이터 통신을 mqtt를 사용하는 상황이 생겨 이를 구현하였다.
mqtt와 흔히 알고있는 Socket.io 통신과 차이점이 어느정도 존재하는데,
이러한 차이로 MQTT에는 브로커라는 중심역할 개념이 존재하고, 지금 하는 프로젝트에는 어떤 센서장비에 통신을 받는 상황이라 더 적합한 형태였다.
mqtt Client에 구독하여, 해당 메세지를 받을때마다, 상태값에 넣어서 Grid에 보여줘야하는 경우가 있는데, 처음에는 일전에 채팅 프로젝트를 진행할때 처럼 웹소켓 통신을 생각하고 구현을 진행했다.
// MQTT Client 설정 및 연결
useEffect(() => {
const mqttClient = mqtt.connect(`${mqttHost}:${mqtt61850Port}/mqtt`, {
protocolVersion: 5,
});
// Client Connect
mqttClient.on('connect', () => {
console.log('Connected to mqtt');
setIsConnected(true);
mqttClient.subscribe('/LOG/#', function (err) {
if (!err) {
console.log('subscribe');
}
});
});
// Message Callback
mqttClient.on('message', (topic, payload) => {
const parsedPayload = JSON.parse(payload.toString());
if (parsedPayload.type !== 'extra') {
if (parsedPayload.command === 'GetLogicalNodeDirectory(URCB)') {
if (parsedPayload.type !== 'applog') {
setLogData((prevState) => {
const updatedMessages = [parsedPayload, ...prevState];
return updatedMessages.slice(0, 500); // 최대 500개의 메시지만 유지
});
}
} else {
setLogData((prevState) => {
const updatedMessages = [parsedPayload, ...prevState];
return updatedMessages.slice(0, 500); // 최대 500개의 메시지만 유지
});
}
}
});
// Error Callback
mqttClient.on('error', (error) => {
console.error('Connection failed:', error);
setIsConnected(false);
});
return () => {
if (mqttClient) {
console.log('end');
mqttClient.end();
}
};
}, []);
다음과 같은 코드를 적용했을때, 데이터를 잘 받아져서 상태가 갱신이 되었으나, 시간이 지나면서 데이터가 쌓일수록 상태를 렌더링 하는 과정에서 메모리가 쌓여 크롬창이 팅기는 현상이 발생했다.
지속적으로 데이터를 받는 상황에서 어떻게 처리할지 고민했는 과정이 필요했고,
내용을 찾아보다, Throttle과 debounce, buffer 등의 방법으로 시도를 할 수 있었다.
// 메시지 처리를 throttled 함수로 설정
const handleThrottledMessage = throttle((parsedPayload) => {
setLogData((prevState) => {
const updatedMessages = [parsedPayload, ...prevState];
return updatedMessages.slice(0, 500); // 최대 500개의 메시지만 유지
});
}, 1000); // 1초에 한 번만 처리
우선 Throttle은 이벤트를 일정한 주기마다 발생하도록 하는 기술로,
마지막 함수가 호출된 후 일정시간이 지나기전에 다시 호출되지 않도록 하는 것이다.
Throttle로 함수를 1초에 한 번만 실행 할 수 있도록 제한 해볼수 있었다.
다만 이를 통한 결과가 동일한 주기에 여러 메세지가 들어올때, 일부 메세지가 스킵되었다.
이는 중요한 메세지가 아니면 간소화 시킬 경우에 더 적합하다고 판단되었고, 주로 스크롤링을 진행했을때 더 어울릴거 같았다.
debounce는 특정 이벤트가 일정 시간 동안 반복 호출되지 않을 때만 실행되는 방식으로, 이를 테스트를 진행했을때도, 업데이트 빈도는 줄었으나, 중간에 생략되는 메세지가 많아 동시다발적으로 왔을때 적합한 부분은 아니였다.
const updateLogData = debounce((newMessage) => {
setLogData((prevState) => {
const updatedMessages = [newMessage, ...prevState];
return updatedMessages.slice(0, 500); // 최대 500개 유지
});
}, 300); // 300ms 디바운싱
buffer 방식은 Ref를 활용해서 임시 저장한후에 일정 주기를 주어 한번씩 UI에 업데이트 하는 방식으로
주기를 설정하는대로 동시에 데이터를 처리해주므로 놓치는 경우가 없었다.
Throttle과 방식이 비슷한데, 버퍼는 Ref에 임시 저장하기 때문에 놓치는 데이터가 없이 실시간성을 끌어올려 업데이트가 가능해서 적합한 방법이었다.
const messageBuffer = useRef([]); // 메시지 버퍼
useEffect(() => {
const mqttClient = mqtt.connect(`${mqttHost}:${mqtt61850Port}/mqtt`, {
protocolVersion: 5,
});
// Client Connect
mqttClient.on('connect', () => {
console.log('Connected to mqtt');
setIsConnected(true);
mqttClient.subscribe('/LOG/#', (err) => {
if (!err) {
console.log('Subscribed to /LOG/#');
}
});
});
// Message Callback
mqttClient.on('message', (topic, payload) => {
const parsedPayload = JSON.parse(payload.toString());
messageBuffer.current.push(parsedPayload); // 버퍼에 메시지 추가
});
// 주기적으로 버퍼에서 데이터를 UI로 업데이트
const intervalId = setInterval(() => {
if (messageBuffer.current.length > 0) {
setLogData((prevState) => {
const newMessages = messageBuffer.current.splice(0, messageBuffer.current.length);
return [...newMessages, ...prevState].slice(0, 500); // 최대 500개의 메시지만 유지
});
}
}, 1000); // 1초마다 업데이트
// Error Callback
mqttClient.on('error', (error) => {
console.error('Connection failed:', error);
setIsConnected(false);
});
// Cleanup
return () => {
if (mqttClient) {
mqttClient.end();
}
clearInterval(intervalId);
};
}, []);
기능 | buffer | Throttle (쓰로틀링) | Debounce (디바운싱) |
---|---|---|---|
실행 주기 | 일정 주기마다 실행 (1초) | 지정된 시간마다 실행 | 호출 멈춘 후 실행 |
데이터 처리 | 여러 개를 한 번에 처리 | 개별 호출 빈도 제어 | 마지막 호출만 처리 |
실시간성 | 높음 (1초 주기 업데이트) | 높음 (간격에 따라 다름) | 낮음 (입력이 끝나야 실행) |
사용 목적 | 배치 업데이트 | 성능 최적화 (빈도 제한) | 불필요한 연속 호출 방지 |