Scheduler (스케줄러)

RxJava 는 다양한 문제를 해결할 수 있는 범용적인 솔루션이다. 특히 멀티 쓰레딩과 같은 비동기 작업을 효율적으로 구현할 수 있는 환경을 제공해준다. 이는 스케줄러라는 녀석을 활용하게 된다. 스케줄러는 데이터 스트림이 어떤 쓰레드에서 데이터를 발행하는지, 구독자는 어떤 쓰레드에서 이벤트 발생을 통보받는지에 대해 지정해줄 수 있다.

RxJava 에서는 Schedulers 클래스에서 제공하는 정적 패토리 메소드를 통해 스케줄러를 설정해줄 수 있다.

val singleSchedulers = Schedulers.single()
val ioSchedulers = Schedulers.io()
val newThreadSchedulers = Schedulers.newThread()
val computationSchedulers = Schedulers.computation()
val trampolineSchedulers = Schedulers.trampoline()

// 안드로이드에서만 제공하는 특별한 스케줄러
// val mainThread = AndroidSchedulers.mainThread()

Scheduler 종류

Single 스케줄러

val singleSchedulers = Schedulers.single()

Single 스케줄러는 단일 쓰레드를 생성하여 이를 계속 재사용하는 방식을 활용한다. RxJava 내부에서 쓰레드를 별도로 생성하여, 한 번 생성된 쓰레드를 활용하며 여러 작업을 처리하게 된다.

IO 스케줄러

val ioSchedulers = Schedulers.io()

이 녀석은 네트워킹 작업이나 DB 트랜잭션, 파일 시스템 환경 등 블로킹이 발생할 수 있는 곳에서 비동기적으로 작업을 처리하기 위해 사용되는 스케줄러이다. 쓰레드 풀을 사용하여 새로운 쓰레드가 필요할 때마다 쓰레드를 계속 생성하되, 이전에 생성했던 쓰레드가 존재한다면 이를 재사용한다. 내부적으로 CachedThreadPool 을 채택했다.

newThread 스케줄러

val newThreadSchedulers = Schedulers.newThread()

newThread 스케줄러는 매번 새로운 쓰레드를 생성하여 작업을 처리하도록 지정해주는 녀석이다.

Computation 스케줄러

val computationSchedulers = Schedulers.computation()

해당 스케줄러는 단순한 반복 작업, 콜백 처리 등등 컴퓨팅 및 계산적인 작업에 사용한다. CPU 에 대응하는 계산용 스케줄러이고, 내부적으로 쓰레드 풀을 활용한다. 기본적으로 쓰레드 개수는 프로세서 개수와 같다.

Trampoline 스케줄러

val trampolineSchedulers = Schedulers.trampoline()

새로운 쓰레드를 생성하지 않고, 현재 쓰레드에 무한한 크기의 큐를 생성한다. 큐의 특성인 FIFO 에 따라, 모든 작업을 들어온 순서대로 (순차적으로) 실행하는 것을 보장하게 된다.

mainThread 스케줄러 (RxAndroid 에만 포함)

val mainThread = AndroidSchedulers.mainThread()

RxAndroid 에서는 안드로이드 메인 쓰레드를 지정하는 스케줄러를 제공한다.


Scheduler 연산자

RxJava 에서 스케줄러를 활용하기 위해선, subscribeOn 메소드와 observeOn 메소드를 활용해볼 수 있다. 이것들만 있다면 정말 간단하게 멀티 쓰레딩을 구현해볼 수 있다.

우선, 0 부터 3까지 데이터를 발행하는 Observable 을 생성해보자. 아래와 같이 구현할 수 있을 것이다.

fun main() {
    Observable.create<Int> {
        for (i in 0..3){
            val threadName = Thread.currentThread().name
            println("#발행 [$threadName] : $i")
            it.onNext(i)
            Thread.sleep(100)
        }
    }.subscribe {
        val threadName = Thread.currentThread().name
        println("#구독 [$threadName] : $it")
    }
}

위 코드는 스케줄러 연산자를 사용하지 않았다. 그랬더니 결과는 다음과 같이 나온다.

#발행 [main] : 0
#구독 [main] : 0
#발행 [main] : 1
#구독 [main] : 1
#발행 [main] : 2
#구독 [main] : 2
#발행 [main] : 3
#구독 [main] : 3

스케줄러를 지정해주지 않는다면, 데이터 발행과 구독이 모두 메인 쓰레드에서 진행된다.


subscribeOn()

그럼 이제 한번, subscribeOn() 메소드를 활용하여 스케줄러를 지정해보자.

fun main() {
    Observable.create<Int> {
        for (i in 0..3) {
            val threadName = Thread.currentThread().name
            println("#발행 [$threadName] : $i")
            it.onNext(i)
            Thread.sleep(100)
        }
    }.subscribeOn(Schedulers.io())
        .subscribe {
            val threadName = Thread.currentThread().name
            println("#구독 [$threadName] : $it")
        }
    Thread.sleep(500)
}

실행해보면 아래와 같이 결과를 출력한다.

#발행 [RxCachedThreadScheduler-1] : 0
#구독 [RxCachedThreadScheduler-1] : 0
#발행 [RxCachedThreadScheduler-1] : 1
#구독 [RxCachedThreadScheduler-1] : 1
#발행 [RxCachedThreadScheduler-1] : 2
#구독 [RxCachedThreadScheduler-1] : 2
#발행 [RxCachedThreadScheduler-1] : 3
#구독 [RxCachedThreadScheduler-1] : 3

위 결과에서 알 수 있듯, 해당 연산자는 Observable 데이터 스트림에 어떤 스케줄러를 사용하여 데이터를 발행할지 지정해주는 메소드이다. 만약 subscribeOn 이 메소드 체이닝되어 있는데 observeOn 이 체이닝되지 않은 경우 발행되는 데이터를 구독하는 쓰레드도 동일한 쓰레드에서 동작하도록 한다.


observeOn()

그렇다면 observeOn 은 발행되는 데이터를 구독하는 쓰레드를 지정해주는 연산자로 유추할 수 있다. 일단 코드에 적용해보자. 위 예제 코드에 observeOn() 을 덧붙여 아래와 같이 스케줄러를 지정해준 뒤 결과를 확인해보자.

fun main() {
    Observable.create<Int> {
        for (i in 0..3) {
            val threadName = Thread.currentThread().name
            println("#발행 [$threadName] : $i")
            it.onNext(i)
            Thread.sleep(100)
        }
    }.subscribeOn(Schedulers.io())
        .observeOn(Schedulers.computation())
        .subscribe {
            val threadName = Thread.currentThread().name
            println("#구독 [$threadName] : $it")
        }
    Thread.sleep(500)
}
#발행 [RxCachedThreadScheduler-1] : 0
#구독 [RxComputationThreadPool-1] : 0
#발행 [RxCachedThreadScheduler-1] : 1
#구독 [RxComputationThreadPool-1] : 1
#발행 [RxCachedThreadScheduler-1] : 2
#구독 [RxComputationThreadPool-1] : 2
#발행 [RxCachedThreadScheduler-1] : 3
#구독 [RxComputationThreadPool-1] : 3

결과를 보니 발행과 구독이 각기 다른 쓰레드에서 실행되고 있음을 확인할 수 있다. 비로소 멀티 쓰레딩을 구현하게 된 것이다.

observeOn 연산자를 활용하여 스케줄러를 지정해준다면, Observable 데이터 스트림에서 발행한 데이터를 가로채서 지정한 스케줄러에서 이를 구독한다. 따라서 위와 같은 결과가 나오게 된다.


Android 에서는

일반적으로 안드로이드에서 RxJava 를 사용 목적에 맞게 가장 많이 사용하는 부분은 백엔드 서버와 네트워킹 동작을 하거나, DB 쿼리 동작을 수행하는 부분이다. 따라서 이에 가장 적합한 IO 스케줄러subscribeOn() 을 통해 지정해줌으로써 IO 스케줄러 상으로 결과 데이터를 발행할 수 있도록 한다.

그리고 안드로이드에선 보통 위와 같은 비동기 동작의 결과물을 메인 쓰레드 (UI 쓰레드) 에서 UI 를 갱신하는 등 활용하기 때문에 이를 AndroidSchedulers.mainThread() 로 지정한다.

repository.getData()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())

거의 국룰이다시피 활용되는 스케줄러들이기 때문에, 아래와 같이 Observable 혹은 Single 데이터 스트림에 확장함수 형태로 스케줄러 지정 코드를 정의해두기도 한다. (필자도 애용한다)

fun <T> Single<T>.applySchedulers() =
    subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
fun <T> Observable<T>.applySchedulers() =
    subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
profile
어려울수록 기본에 미치고 열광하라

0개의 댓글