이 글은 <RxJS 반응형 프로그래밍>을 읽고 제 나름대로 생각을 정리한 글입니다!
이벤트에서 시간은 매우 중요합니다.
기본적으로 이벤트 루프를 통해 전달되는 비동기 로직은, 대기 시간 등의 변수에 따라 결과적으로 나타내는 값이 달라지기 때문인데요.
오늘은 이러한 시간의 개념을 이해하고, 시간을 어떻게 프로그래밍에 반영할지, 나아가 버퍼링으로 이벤트 데이터를 분석하는 방법까지 알아보겠습니다.
이 파트를 끝내면, 기본적으로 저자가 '기초'라고 판단했던 부분들을 어느정도 해결했다고 볼 수 있습니다. 즉, 본격적인 출발이 기다리고 있겠네요!
사용자가 애플리케이션에 응답할 때까지 기다리는 시간에는 일종의 법칙이 있다고 합니다.
이를 정리하면 다음과 같습니다.
개인적으로 생각해보건대, 여기서 주목할 것은 1초~10초입니다.
사용자는 어느 순간 임계치 이상의 상황이 주어지면, 불안함을 느끼는데요.
이때의 불안함은 결코 단일적인 것이 아닙니다. 복합적인 상황을 고려해야 한다는 것에서 혼란을 느끼는 것입니다. 즉, 사용자는
즉, 사용자도 알고 있습니다.
이 문제는 단지 애플리케이션만의 문제가 아닐 수도 있음을 말이죠.
우리는 그렇기 때문에 시간 기반 함수를 잘 다뤄야 합니다.
사용자가 컴퓨터가 이상해서라는 답을 내리고 떠나기 전에, 어느 정도 제어함으로써 애플리케이션이 당신의 제어를 들어줄 수 있음을 알려줘야 합니다.
그렇기 때문에 RxJS
는 적합하다고 할 수 있습니다.
시간에 따른 함수의 비순수성에 불구하고, 연산자 체인을 통한 시퀀스는 순차적인 동기 처리를 통해 사이드 이펙트를 최소화하기 때문입니다.
비동기 애플리케이션은 다음과 같은 요소에 의존하고 있습니다.
그리고 이러한 요소들은 CPU에서 실행되기 때문에 병목 현상이 발생할 수 있어요.
그런 상황 속에서 비동기 로직은 어떻게 작동할까요?
실행이 되지 않을 수도 있고, 반대로 실행이 되더라도 처리 순서가 잘못될 수 있습니다.
RxJS
는 오케스트레이션 계층을 통해 일련의 작업을 직렬화할 수 있어요.
따라서 사이드 이펙트를 최소화하며 개발할 수 있도록 도와요!
대표적으로 콜백패턴이 있어요. 언제 가져올지를 정확히 명시하지 않습니다.
다만, 컨텍스트 상에서 "이 로직이 끝날 때 이것을 처리해줘"라는 말만 있을 뿐이지요.
그렇기 때문에 그 결과에 있어서 신뢰할 수 없습니다. 왜냐하면 프로세서의 가용성에 의존성을 갖고 있기 때문입니다.
명시적 타이밍은 반대로 시점에 대해 구체적이고 명시적입니다.
그리고 에러 및 스트림 취소가 없다면 무조건적으로 실행되지요.
이는 크게 사용자와 리소스를 기준으로 진행됩니다.
쉽게 말하면
setTimeout
과setInterval
을 생각하면 될 것 같습니다.
물론 이 친구들 역시 이벤트 루프 상에서 시간을 완전히 보장해주지 않지만 말이죠.
그런 의미에서
setTimeout
과setInterval
을 설명하고 있습니다.
이에 대한 내용은 너무나 저명한 예시들이 많으므로 생략하겠습니다.
RxJS
에서의 타이머RxJS
는 setTimeout
역시 일회성 옵저버블로 바라봅니다.
그리고 이러한 타이머를 더 쉽게 구현할 수 있게 연산자를 이미 갖고 있는데요.
timer
연산자를 이용하면 이러한 setTimeout
과 동일한 로직을 간단히 처리할 수 있습니다.
Rx.Observable
.timer(1000)
.subscribe(() => document.querySelecotr('#app').style.backgroundColor = 'black')
마찬가지로 RxJS
는 interval
이라는 연산자가 존재합니다.
공식문서의 예제를 갖고 와보겠습니다.
const numbers = interval(1000);
const takeFourNumbers = numbers.pipe(take(4));
takeFourNumbers.subscribe(x => console.log('Next: ', x));
옵저버블은 타이밍 연산자를 옵저버블 스트림과 결합합니다.
이것의 장점은 무엇일까요?
이전에는 상태를 타이머를 통해 조작하였다면, 이제는 스트림을 타이머와 결합했기 때문에 더욱 안정적으로 관리할 수 있게 되었습니다.
결과적으로 사이드 이펙트를 최소화했기에, 이제는 데이터를 얼마 주기로 어떤 로직을 처리해 보내주느냐에 집중할 수 있게 되는 거죠.
또한, do
와 timeInterval
을 통해 더욱 정확한 경과시간을 출력할 수 있습니다.
Rx.Observable.interval(2000)
.timeInterval()
.skip(1)
.take(5)
.do(int => console.log(`Checking every ${int.interval} milliseconds...`))
.map(int => new USDMoney(newRandomNumber()))
.map(usd => usd.toString())
.forEach(console.log)
// Checking every 2000 milliseconds...
// Checking every 2002 milliseconds...
// Checking every 1999 milliseconds...
// Checking every 2001 milliseconds...
// Checking every 2000 milliseconds...
또한, delay
라는 연산자 역시 존재합니다.
이 친구는 전체적인 시퀀스의 동작 타이머를 지연시키는 역할을 하죠.
Rx.Observable.timer(1000)
.delay(2000)
.timeInterval()
.map(int => Math.floor(int.interval / 1000))
.subscribe(sec => console.log(sec))
이는 timer
의 1초에서, 이후 2초를 지연시키며 3초를 나타냅니다.
결과적으로 타이머 연산자는 마치 동기적으로 동작한다는 느낌을 주는군요!
시간의 지연은 전파에 영향을 줍니다.
예컨대, delay
전체의 시퀀스를 일정 시간만큼 이동시키죠.
따라서 우리는 버퍼링을 고려해야 합니다.
이 버퍼링은 경계 상한(bounded upper limit)이라는 고정된 상수값을 토대로 유지가 되는데요.
이는 다음과 같습니다.
경계상한 = 전달 받은 이벤트 수 / 시간
버퍼는 항상 상한을 초과하지 않습니다.
또한, delay 연산자는 순차적으로 작동한다는 점을 항상 염두해야 합니다.
비시간 연산자는 이를 이벤트마다 파이프라인을 통해 전파해나가지만, 이 친구는 항상 순차적으로 지연시킨다는 점을 유념해야겠어요! 🤯
디바운싱은 사실 저명한 최적화 방법이죠! 설명은 생략합니다.
RxJS
에서 디바운싱은 debounceTime
이라는 연산자로 처리가 가능합니다.
이에 대한 상대시간만 지정해준다면, 어떤 다음 연산자 로직에 대한 디바운싱을 지원해줘요. 이렇게 RxJS
로 처리된 디바운싱은, 결과적으로 결과처리가 가능해지기 때문에 애플리케이션 개발에 확장성을 더해줍니다.
쓰로틀링 역시 사실상 꽤나 많은 설명이 있기에 설명을 생략할게요!
RxJS
에서 쓰로틀은 throttleTime
이라는 연산자로 처리해요.
사용법은 디바운싱과 동일합니다!
RxJS
는 이벤트를 캐싱해서, 이를 한 번에 방출할 수 있어요.
결과적으로 버퍼링 연산자는 과거 데이터를 일시적으로 저장해주는 역할을 합니다.
옵저버블의 인스턴스는 이러한 버퍼링을 지원해주는 메서드들을 갖고 있어요.
이름 | 설명 |
---|---|
buffer(observable) | 들어오는 옵저버블의 값을 버퍼링합니다. 이때 버퍼를 방출하고 다음 방출을 기다리며 새로운 버퍼를 시작합니다. |
bufferCount(number) | 소스 옵저버블로부터 값을 버퍼링한 후, 전체를 방출하고 지웁니다. 새로운 버퍼가 내부적으로 초기화됩니다. |
bufferWhen(selector) | 버퍼를 열고 selector가 호출하여 반환된 옵저버블이 값을 방출했을 때 버퍼를 닫습니다. 이때 새 버퍼가 즉시 열리며 이러한 프로세스가 반복됩니다. |
bufferTime(time) | 일정기간 소스의 이벤트를 버퍼링합니다. 시간이 지나면 데이터를 방출하고 내부적으로 초기화합니다. |
예제로 주어진 코드를 통해 알아보죠!
Rx.Observable.timer(0, 50)
.buffer(Rx.Observable.timer(500))
.subscribe((v) => console.log(`buffer: ${v}`))
// [0,1,2,3,4,5,6,7,8,9]
먼저, 버퍼는 옵저버블이 이벤트를 방출할 때까지 옵저버블로부터 받은 이벤트를 수집해요. 이때, buffer 연산자는 버퍼링된 데이터를 지우고 내부적으로 새 버퍼를 시작합니다.
이후에는 타이머 50ms마다 후속값들을 방출하게 해요. 이때, 시간은 500ms를 주었기 때문에 총 10개의 이벤트가 나오겠군요. 결국 500ms 후에는 10개의 이벤트가 나오는 로직입니다.
이 친구는 카운트를 받습니다.
즉, 얼마나 데이터를 받을 것인지를 명시적으로 알 수 있죠!
만약 데이터 크기가 주어진 카운트과 일치한다면, 이를 바로 방출하고, 다시 새 버퍼를 시작하게 되는 것입니다.
이 역시 예제로 살펴보죠!
const amountTextBox = document.querySelector('#amout');
const warningMsg = document.querySelector('#amount-warning');
Rx.Observable.fromEvent(amountTextBox, 'keyup')
.bufferCount(5)
.map(e => e[0].target.value)
.map(v => parseInt(val, 10))
.filter(v => !Number.isNaN(v))
.subscribe(amount => warningMsg.setAttribute('style', 'display: inline;'))
일단 먼저 keyup
이벤트에 대한 이벤트를 방출하여, 버퍼에 담습니다.
5개가 담겨지면 이제 이 각 value에 대해 계산을 하며, 결과적으로 5자리의 금액을 썼다면 경고를 날릴 수 있도록 하는 것이죠.
뭔가 예제를 보고 나니, 어떤 느낌인지 이제 버퍼링이라는 개념에 대해 이해가 되기 시작합니다!
값을 방출할 때까지 이벤트를 캐싱해놓습니다.
그리고 만약 방출시기가 정해진다면, 이를 알리기 위한 옵저버블을 생성하는 셀렉터 함수를 사용합니다.
즉 완전히 시기를 정한다기보다는, 사용자가 입맛에 맞게 방출할 수 있도록 암시적으로 타이밍을 지정해준다고 생각하면 편할 것 같군요!
const field = document.querySelector('.form-field');
const showHistoryButton = document.querySelector('#show-history');
const historyPanel = document.querySelector('#history');
const showHistory = Rx.Observable.fromEvent(showHistoryButton, 'click');
Rx.Observable.fromEvent(field, 'keyup')
.debounceTime(700)
.pluck('target', 'value')
.filter(R.compose(R.not, R.isEmpty)) // R: Ramda.js
.bufferWhen(() => showHistory$)
.do(history => history.pop())
.subscribe(history => {
let contents = '';
if (history.length) {
for (const item of history) {
contents += `<li>${item}</li>`
}
historyPanel1.innerHTML = contents;
}
})
bufferTime
은 일정기간 옵저버블 시퀀스의 데이터를 유지했다가 방출합니다.
이 예제에서는 combineLatest
라는 연산자를 사용하는데요. 이 친구는 스트림들을 하나로 결합하는 데 사용합니다.
const password = document.getElementById('password-field');
const submit = document.getElementById('submit');
const outputField = document.getElementById('output');
const password$ = Rx.Observable.fromEvent(password, 'keyup')
.map(({ keyCode }) => keyCode - 48);
const submit$ = Rx.Observable.fromEvent(submit, 'click');
Rx.Observable.combineLatest(
password$.bufferTime(7000).filter(R.compose(R.not, R.isEmpty)),
submit$
)
.take(3)
.do(([maybePassword]) => console.log(`Password is: ` + maybePassword.join('-')))
.subscribe(([maybePassword]) => {
if (maybePassword.join('') === '1337') {
outputField.innerHTML = 'Correct Password!'
} else {
outputField.innerHTML = 'Wrong Password!'
}
},
null,
() => outputField.innerHTML += '\n No more tries accepted!');
예제는 1337이라는 비밀번호가 맞는지 총 3번의 기회를 주는 로직입니다.
이때, bufferTime
은 keyup
에 걸려 있군요. 즉, 7초간의 입력에 대한 시간이 끝나면 버퍼를 지워버리는 것임을 알 수 있죠!
이렇게 버퍼에 관한 연산자까지 모두 살펴보았는데요.
버퍼에 관해서는 주의사항이 있습니다.
오랜 시간 버퍼를 유지시킨다는 것은 곧 메모리를 많이 사용한다는 것입니다. 따라서 메모리 사용에 유념하며, 버퍼를 얼마나 유지해야 할지 고민하며 핸들링해야 한다는 것이 중요해요. 😉
사실, 이번 파트 꽤나 이해가 가지 않는 부분이 많았어요.
버퍼는 정말 많이 애를 먹었기도 했고, 특히 시간 연산자의 전파 부분이 많이 이해가 가질 않았었습니다. (애증의 delay
...)
그렇지만, 결국 그 힘든 시간을 견디고 이 파트 역시 완주할 수 있었네요.
아무래도 이제 RxJS
를 토이프로젝트와 엮으며 시작해보려 합니다.
책을 읽으며 느꼈는데, 이 책은 실무적인 것 보다는 흐름을 이해하는 데 더 적합하다고 느꼈거든요.
결국 모든 것은 실제로 사용할 수 있어야 하기에, 아무래도 조만간 간단한 프로젝트로 RxJS를 실제로 느껴보고자 합니다.
역시 결국 모든 것은 실제로 써볼 수 있어야 하는 것 같아요.
글이 아닌, 코드로도 찾아볼 수 있도록 또 공부하러 가요. 이상! 🌈