이 글은 <RxJS 반응형 프로그래밍>을 토대로 제 생각을 적은 글입니다 🙂
RxJS은 반응형 프로그래밍과 함수형 프로그래밍의 영향을 받았습니다.
우리는 데이터 플로우에 있어 스트림의 전파를 이용함을 알았는데요.
이러한 전파는 파이프라인을 통해 여러 함수를 거치며 결과적으로 결과를 만들어내죠.
그리고, 이러한 순수함수를 우리는 '마치 연산하기 위한 연산자'같다고 하여 연산자라고 불렀습니다.
이 장의 주제는 연산자에요.
우선 옵저버블을 살펴보고, 옵저버블 시퀄스를 통해 데이터 플로우에서 어떻게 원하는 결과를 도출하는지를 이해해보려 합니다. 그럼, 다들 출발해보시죠!
C나 C++에서는 메모리 제어가 간편합니다. 직접 메모리를 할당할 수도 있고, 해제할 수도 있기 때문이죠.
하지만 자바스크립트에서 이러한 제어는 전적으로 엔진의 역할에 위임됩니다. 고로, 사용자가 핸들링할 수 있는 범위는 제한적입니다.
그렇기에 메모리 누수는 항상 골치아픈 이슈였어요.
자바스크립트에서 가비지 컬렉터가 정리하는 알고리즘의 핵심은 참조 카운팅인데요. 말 그대로 참조를 하고 있는지 체크하는 것을 의미합니다. 이때,
이러한 상황 속에서 메모리는 지속적으로 일부분을 할당해야 하고, 이러한 문제가 많아질 수록 성능은 저하됩니다. 이를 메모리 누수라고 하죠.
그렇기 때문에 이를 확장해서 RxJS
차원에서 해석해보도록 하죠.
데이터 스트림을 만약 우리가 전파를 해야 할 때, 원치 않는 이슈가 있을 수 있어요.
책의 내용을 살펴보면, 폴링하며 API 요청을 하는데, API가 이제 사용할 수 없게 되는 상황이 있겠죠.
이러한 오류가 생긴다면, 결과적으로 메모리는 쓸데없는 곳에 할당이 되는 것이고, 이는 리소스의 낭비입니다.
이러한 배경에서,
RxJS
는 스트림의 평가와 취소를 좀 더 안정적으로 관리할 수 있게끔 해주었다고 합니다.
RxJS
는 옵저버블이 구독될 때, 비로소 스트림의 수명이 시작돼요.
이 말이 의미하는 것이 무엇일까요?
스트림은,
subscribe
가 호출될 때까지, 없는 것처럼 해석이 된다는 의미입니다.
이는 지연평가를 통해 가능합니다. 값을 미리 할당하지 않고, 필요할 때만 데이터를 꺼내서 쓰며 마치 lazy
하게 평가하는 것이죠.
예컨대, arr.length
가 1억이라고 합시다.
하지만, 우리가 원하는 데이터는 arr.slice(0, 2)
였다고 가정해봅시다.
겨우 2개의 값만 꺼내면 끝난 거였는데, 1억을 호출하고 참조했다니! 엄청난 손해지요.
이는 즉시평가를 통해 해결할 수 없지만, 지연평가를 통해 해결할 수 있습니다.
간단합니다. 그냥 2개까지만 꺼내서 쓴다고 선언해주고, 제너레이터 등을 활용하여 꺼내면 되기 때문입니다.
다만 지연평가의 단점은, 다소 로직을 짤 때 즉시평가에 비해 복잡하다는 것인데요. 이 역시
RxJS
를 통해 좀 더 간편하게 해결해줄 수 있습니다.
이는 크게 2가지가 존재합니다.
아까 앞서 말했던 지연평가를 RxJS
에 대입한 개념인 것 같군요.
관심 있는 이벤트가 될 때까지, 옵저버블을 휴면 상태로 유지시키는 것입니다.
결과적으로 휴면상태에 있는 옵저버블은 아직 평가를 시작하지 않습니다.
이후 subcribe
메서드가 호출되어야 활성화가 되어, 데이터를 할당하기 시작합니다.
이러한 이유가 가능한 이유가 무엇일까요?
옵저버블이라는 것이, 본질적으로 데이터를 보유하지 않는다는 것을 의미합니다.
옵저버블은 단지 시퀀스에 따라서 흐를 뿐이지, 정적이지 않습니다. 이렇게 옵저버블에 의해 추상화된 이벤트는 처리와 동시에 해제되므로 메모리 누수를 막습니다.
SPA에서 메모리 관리는 매우 중요해졌습니다.
계속해서 전역에 위치하여, 애플리케이션의 전체 수명 주기 동안 참조하며 원치 않는 메모리 누수가 발생할 수 있기 때문입니다.
따라서 지연 구독 외에도 사용자가 명시적으로 구독을 취소할 수 있는 방법이 필요했습니다.
그것이 바로 RxJS
의 unsubscribe
메서드입니다. 해당 메서드를 통해, 전체 옵저버블에서 메모리를 안정적으로 관리할 수 있어요.
RxJS
와 다른 API 간의 취소 불일치항상 RxJS
의 옵저버블과, 다른 모든 API의 디자인이 일치하지는 않습니다.
대표적으로 Promise
인데요. 책에 있는 예제를 살펴봅니다.
const promise = new Promise((resolve, reject) => {
setTimeout(() => {
resolve(42);
}, 10000);
});
promise.then((val) => {
console.log(`In then(): ${val}`);
});
const subscription$ = Rx.Observable.fromPromise(promise).subscribe(val => {
console.log(`In subscribe(): ${val}`);
});
subscription$.unsubscribe();
이 결과는, 분명히 구독을 취소하였음에도 불구하고 10초 후 42
라는 값이 나옵니다.
왜일까요? 그 이유는 Promise
객체의 한계 때문입니다.
본질적으로 Promise
는 abort
에 대한 설계가 고려되어 있지 않습니다. 따라서 구독의 여부와 관계없이 Promise
는 응답의 결과를 도출했습니다.
이처럼 항상
API
를 고려하지 않고 전적으로RxJS
만을 의존한다면, 원치 않는 결과가 나올 수 있음을 유의해야겠어요.
우리는 구독 취소 등을 통해 메모리를 직접적으로 관리할 수 있는 방법과 지연 평가와 같은 구독 매커니즘을 이해하게 됐군요.
하지만 아직 멀었어요. 결과적으로 어떤 처리를 할 것인지에 대한 이야기를 하지 않았기 때문이죠.
이것이 연산자의 역할입니다. 연산자는 쉽게 말해서 옵저버블 파이프라인에 삽입 가능한 로직이며, 선언적 함수 체인의 일부입니다. 또한 순수함수이며, 고차함수여야 합니다.
이것이 의미하는 것은 무엇일까요? 어떤 옵저버블 객체가 있다면, 그 객체는 파이프라인의 연산자를 걸쳐서, 옵저버블 인스턴스를 반환해야 합니다. 왜냐하면 this
를 통한 체이닝이 가능해야 하기 때문이죠.
연산자들은 결과적으로 이벤트가 데이터 소스를 떠난 후 소비자에게 도달하기 전에 이벤트를 검사, 변경, 생성, 지연하는 데 사용됩니다.
배열의 map
과 같이 스트림을 함수의 로직에 맞춰 mapping한다고 보시면 됩니다.
결과적으로 함수를 적용하게 되어, 데이터 스트림은 이에 맞춰 함수의 로직을 거쳐 변환됩니다.
const addFivePercent = x => x + 0.05
Rx.Observable.of(1,2,3,4)
.map(addFivePercent)
.subscribe(console.log); // 1.05, 2.05, 3.05, 4.05
map
은 원본을 변경시키지 않습니다. 즉, 소스 옵저버블을 변경시키지 않습니다.
원본 내부의 값이 바뀐다 한들, 원본인 객체 자체를 변경시킨 것은 아닙니다. 고로 불변성을 유지한다고 할 수 있습니다.
이러한 변환 연산을 통해, 결과적으로 소비자를 이해시킬 데이터 타입으로 호환할 수 있게 됩니다.
원하지 않는 요소는 filter
로 필터링이 가능합니다.
예컨대 다음 책에 기술된 예제와 같이 filter
로 숫자 범위 키에 대한 필터링이 가능합니다.
const isNumericalKeyCode = code => code >= 48 && code <= 57;
const input = document.querySelector('#input');
Rx.Observable.fromEvent(input, 'keyup')
.pluck('keyCode')
.filter(isNumericalKeyCode)
.subscribe(code => console.log(`User typed: ${String.fromCharCode(code)}`));
이러한 필터링은, 모든 데이터가 필요하지 않고, 특정 기준만을 토대로 효율적으로 처리하기 위해 사용합니다.
각 항목이 독립적으로 처리되는 것이 아닌, 전체적으로 컬렉션을 순회 형식으로 연산할 때가 있습니다. 이때 사용하는 것이 reduce
입니다.
특이한 점이 있습니다. reduce
는 배열에서는 결과적으로 반환되는 값이 원본이라고 할 수 없습니다. 즉, 단일 값을 생성하는데요.
반면 Observable
에서의 reduce
연산자는 새로운 싱글톤 옵저버블을 반환합니다.
const candidates = [{ name: 'jy' }, { name: 'jengyoung' }];
Rx.Observable.from(candidates)
.reduce((acc, obj) => [...acc, obj.name], []) // 새로운 싱글톤 옵저버블을 반환했으므로 체이닝이 가능하다.
.subscribge(console.log);
scan
은 리듀스와 비슷한 것처럼 동작하지만, scan
연산자는 리덕션 프로세스 진행 동안 각각의 중간 결과를 반환합니다.
즉, 누산되기까지의 과정을 디버깅하는 데 유용한 메서드일 수 있겠군요!
지연 함수 체인을 통해 전체를 가져오지 않고도 하위 집합이나 누계, 그리고 기본값을 가져오는 집계 함수가 존재합니다.
이때, 핵심은 시퀀스가 독립적이어야 한다는 것입니다.
함수 체인은 고차 함수를 기반으로 제공자 역할을 해요.
이때 파이프라인은 제한된 컨텍스트 내에서 부가작용이 없어야 해요.
즉, 외부의 상태로 인해, 내부의 로직에 영향을 미치지 않으며, 독립적으로 연산이 가능해야 한다는 것이죠.
또한, 연산을 그룹화하여, 로직을 선언적으로 표현하며 일련의 체인으로 만들 수 있죠. 이를 연산자 체인, 또는 플루언트 프로그래밍이라고 합니다.
또한 같은 이벤트가 전달될 때, 옵저버블은 항상 같은 결과를 만들어내야 합니다. 이를 참조 투명성이라고 하죠.
옵저버블은 항상 독립적으로 유지되어야 하며, 한 옵저버블의 결과가 다른 옵저버블에 영향을 미쳐서는 안 됩니다.
RxJS
는 중간 데이터 구조를 생성하지 않습니다.
이것이 매우 당연한 것처럼 보이지만, 실제로는 당연하지 않습니다.
예컨대, 기존의 배열에서 filter
과 map
을 체이닝하면, 이는 새로운 배열 인스턴스들을 반환합니다.
그러나 이 일련의 배열 인스턴스들은 한 번만 사용되기 때문에 체이닝이 매우 깊어질 수록 비효율적입니다.
그렇기 때문에 RxJS
는 다음과 같이 여분의 데이터 구조가 생성되지 않게 하고, 메서드의 반환 타입이 옵저버블로 고정될 수 있도록 합니다.
Rx.Observable.from(og)
.filter(x => {
console.log(`filtering ${x}`)
return x % 2
})
.map(x => {
console.log(`mapping ${x}`);
return x ** 2;
})
.subscribe();
/*
* filtering 1
* mapping 1
* filtering 2
* filtering 3
* mapping 3
*/
이를 통해, 설사 reduce
라 하더라도 옵저버블로 고정되어 있기 때문에 연속적인 체이닝이 가능해집니다.
실제로 RxJS
를 사용할 때 사용할 만한 유용한 집계 연산자들이니, 참고해도 좋을 것 같아요. 😉
take(count)
: 지정된 count
의 연속적인 요소를 의미해요.first, last
: take
함수의 기능 추가 버전이에요. 첫 번째와 마지막을 반환해요.min, max
: 최대와 최소를 반환하는 숫자를 내보내는 옵저버블에서 작업해요.do
: 시퀀스의 각 요소에 대한 동작을 호출하는 용도로 사용해요.이번 파트는 살짝 쉬어가는 내용이었던 것 같아요.
약간 기존에 배웠던 내용들과 많이 겹친다는 느낌이 있었는데요.
다시 생각해보니, 점차 많이 알게 되고 있다는 것과 같다는 생각이 들었어요.
결국, 역시 꾸준히 하다 보면 언젠가 성장하는 것 같아요.
그럼, 다들 즐거운 공부하시길 바라요. 이상! 🌈!