[RxJava] 스레드와 스케줄러

이영한·2022년 1월 2일
1

RxJava

목록 보기
3/3
post-thumbnail

1. subscribeOn, observeOn

RxJava에서는 데이터를 발행할 스레드와 구독할 스레드를 지정할 수 있다.

subscribeOn은 데이터를 발행하는 스레드를 지정하는 함수이다. 만약 subscribeOn을 따로 지정하지 않으면 subscribe함수를 호출한 스레드가 데이터 발행 스레드가 된다.

observeOn은 데이터를 소비하는 스레드를 지정하는 함수이다. 만약 ovserveOn에서 스레드를 따로 지정하지 않으면 subscribeOn에서 지정한 스레드, 즉 데이터 발행 스레드가 기본 스레드로 사용된다.

subscribeOn과 observeOn는 동작에 차이가 있다.

subscribeOn은 처음 호출했을 때만 작동하며 더 불러도 첫 호출시에 설정한 스레드에서 데이터가 발행된다.

반면에 observeOn은 호출할 때마다 매번 작동하여 진행되는 스레드가 바뀐다.

아래 코드를 살펴보자

Observable.just(1,2,3)
                .subscribeOn(Schedulers.computation())
                .subscribeOn(Schedulers.io())
                .subscribeOn(Schedulers.newThread())
                .subscribe(Log::i); 
        Thread.sleep(100);
        
        
        
public class Log {
    public static void i(Object logMessage) {
        System.out.println(Thread.currentThread().getName() + "| " + logMessage.toString());
    }
}

실행결과
RxComputationThreadPool-4| 1
RxComputationThreadPool-4| 2
RxComputationThreadPool-4| 3

subscribeOn을 세 번 호출하여 스레드를 바꿨음에도 여전히 처음에 지정한 Computation 스레드에서 실행되고 있음을 알 수 있다.

반면에 observeOn과 관련된 코드를 살펴보자

Observable.just(1,2,3)
                .observeOn(Schedulers.io())
                .observeOn(Schedulers.newThread())
                .observeOn(Schedulers.computation())
                .subscribe(Log::i);

실행결과
RxComputationThreadPool-1| 1
RxComputationThreadPool-1| 2
RxComputationThreadPool-1| 3

실행결과 가장 마지막에 호출한 computationThread에서 로그가 출력됨을 알 수 있다.


2. 스케줄러의 종류

1) IO 스케줄러

데이터 입출력, 파일 읽기 쓰기, 네트워크 동작 등 시간이 오래걸리는 작업에 적합한 스케줄러이다.

아래는 루트 폴더에 있는 파일이름을 io 스케줄러에서 읽어오는 코드이다.

String root = "/";
File[] files = new File(root).listFiles();
Observable.fromArray(files)
          .filter(it -> it.canRead())
          .map(it -> it.getName())
          .subscribeOn(Schedulers.io())
          .subscribe(Log::i);

실행결과
RxCachedThreadScheduler-1| home
RxCachedThreadScheduler-1| usr
RxCachedThreadScheduler-1| bin
...

2) Computation 스케줄러

빠른 계산이 필요한 경우 사용하는 스케줄러다. cpu를 많이 사용하는 작업에 사용하기 적합하다.

3) Trampoline 스케줄러

subscribeOn을 호출하는 스레드에 대기 큐를 만들어 데이터를 처리하는 스케줄러이다. 즉 스레드가 바뀌지 않지만 데이터의 처리가 FIFO로 이루어질 수 있도록 보장해준다.

아래 예시를 살펴보자

Observable trampolineSource = Observable.just(1,2,3);

trampolineSource.subscribeOn(Schedulers.trampoline()).map(it -> "First Observer " + it).subscribe(s -> {
            Thread.sleep(1000);
            Log.i(s);
        });
trampolineSource.subscribeOn(Schedulers.trampoline()).map(it -> "Second Observer " + it).subscribe(Log::i);

실행결과
main| First Observer 1
main| First Observer 2
main| First Observer 3
main| Second Observer 1
main| Second Observer 2
main| Second Observer 3

먼저 데이터의 subscribe가 subscribe을 호출한 스레드인 main스레드에서 이루어 짐을 알 수 있다.

그리고 FIFO가 보장되기 때문에 첫번째 subscribe에서 1초간 스레드를 멈춰도 두 번째 subscribe가 실행되지 않고 대기 상태에 있다.

4) SingleThead 스케줄러

하나의 스레드에서 subscribe가 진행되도록 하는 스케줄러이다.

Observable singleThreadSource = Observable.just(1,2,3);
        singleThreadSource.subscribeOn(Schedulers.single()).map(it -> "First Observer " + it).subscribe(Log::i);
        singleThreadSource.subscribeOn(Schedulers.single()).map(it -> "Second Observer " + it).subscribe(Log::i);

실행결과
RxSingleScheduler-1| First Observer 1
RxSingleScheduler-1| First Observer 2
RxSingleScheduler-1| First Observer 3
RxSingleScheduler-1| Second Observer 1
RxSingleScheduler-1| Second Observer 2
RxSingleScheduler-1| Second Observer 3

profile
간단하게 개발하고 싶습니다

0개의 댓글