Scheduler 의 Operator

알파로그·2023년 11월 10일
0

Spring WebFlux

목록 보기
10/13

✏️ Scheduler

  • Thread 를 관리하는 관리자 역할을 수행할 수 있다.
  • 구독시점에 data 가 emit 되는 영역과 emit 된 data 를 operator 로 가공처리하는 영역을 분리해 편리하게 멀티 쓰레드를 관리할 수 있다.

📍 Reactor 의 Scheduler 의 종류

  1. Operator 체인에서 Scheduler 를 전환하는 역할을 하는 전용 Operator
  2. Scheduler 를 통해 생성되는 Thread 실행 모델을 지정하는 Operator

✏️ 1. Scheduler 를 위한 전용 Operator

📍 publishOn()

  • Downstream Operator 의 실행을 위한 Thread 를 지정한다.
    • publishOn() 의 인자에 지정하고자 하는 Scheduler 를 입력할 수 있다.
    • Scheduler 는 Thread 를 생성해 Downstream 의 실행 Thread 를 지정하게 된다.
  • Operator 체인에서 publishOn() 이 호출되면 publishOn() 호출 이후의 Operator 체인은 다음 publishOn() 을 만나기 전까지 publishOn() 에서 지정한 Thread 에서 실행이 된다.
public static void main(String[] args) throws InterruptedException {
    Flux.fromArray(new Integer[]{1, 3, 5, 7})
            .doOnNext(data -> print("fromArray : " + data))
            .publishOn(Schedulers.parallel())
            .filter(data -> data > 3)
            .doOnNext(data -> print("filter : " + data))
            .map(data -> data * 10)
            .doOnNext(data -> print("map : " + data))
            .subscribe(PublishOn::print);

    Thread.sleep(500L);
}
  • 아래와 같이 publishOn() 이후의 Thread 가 main 에서 변경된것을 확인할 수 있다.
[main] INFO -- fromArray : 1
[main] INFO -- fromArray : 3
[main] INFO -- fromArray : 5
[main] INFO -- fromArray : 7
[parallel-1] INFO -- filter : 5
[parallel-1] INFO -- map : 50
[parallel-1] INFO -- 50
[parallel-1] INFO -- filter : 7
[parallel-1] INFO -- map : 70
[parallel-1] INFO -- 70

📍 subscribeOn()

  • 최상위 Upsteam Publisher 의 실행을 위한 Thread 를 지정한다.
    • 즉, 원본 데이터 소스를 emit 하기 위한 스케줄러를 지정한다.
    • subscribeOn() 이 호출되면 원본 publisher 의 실행 Thread 는 subscribeOn() 에서 지정한 Thread 로 바뀐다.
public static void main(String[] args) throws InterruptedException {
    Flux.fromArray(new Integer[]{1, 3, 5, 7})
            .subscribeOn(Schedulers.boundedElastic())
            .doOnNext(data -> print("fromArray : " + data))
            .filter(data -> data > 3)
            .doOnNext(data -> print("filter : " + data))
            .publishOn(Schedulers.parallel())
            .map(data -> data * 10)
            .doOnNext(data -> print("map : " + data))
            .subscribe(SubscribeOn::print);

    Thread.sleep(500L);
}
  • publisher 의 Thread 가 지정된 Thread 에서 실행되고 있다.
[boundedElastic-1] INFO -- fromArray : 1
[boundedElastic-1] INFO -- fromArray : 3
[boundedElastic-1] INFO -- fromArray : 5
[boundedElastic-1] INFO -- filter : 5
[boundedElastic-1] INFO -- fromArray : 7
[boundedElastic-1] INFO -- filter : 7
[parallel-1] INFO -- map : 50
[parallel-1] INFO -- 50
[parallel-1] INFO -- map : 70
[parallel-1] INFO -- 70

📍 parallel()

  • Downstream 에 대한 데이터 처리를 병렬로 분할 처리하기 위한 Thread 를 지정한다.
  • parallel() 의 동작 방식은 아래와 같다
    1. Flux.parallel() 호출
      • parallel() 을 호출해 병렬처리를 하기위한 객체인 ParallelFlux 를 생성한다.
      • ParallelFlux 는 워크로드를 분할할 수 있는 객체이다.
        • 워크로드는 Flux 가 수행할 작업들을 뜻한다.
    2. ParallelFlux.runOn(Scheduler) 호출
      • 워크로드를 분할하는 메서드로 인자에 지정한 Scheduler 에 의해 분할된 워크로드가 작업될 Thread 가 할당된다.
    3. Rail area
      • 분할된 워크로드가 각각의 Thread 내의 Rail 이라고하는 논리적 작업 단위를 통해 병렬처리된다.
public static void main(String[] args) throws InterruptedException {
    Flux.fromArray(new Integer[]{1, 3, 5, 7, 9, 11, 13, 15, 17, 19})
            .parallel()
            .runOn(Schedulers.parallel())
            .subscribe(Parallel::print);

    Thread.sleep(100L);
}
  • 위 코드를 실행하면 각각 다른 Thread 를 통해 병렬로 실행된 것을 확인할 수 있다.
    • Thread 를 잘 보면 논리 프로세서의 수인 8개 까지만 운영되는 것을 확인할 수 있다.
  • 만약 parallel() 의 인자에 int 를 입력하면 그 숫자만큼의 Thread 를 사용할 수 있다.
[parallel-7] INFO -- onNext(): 13
[parallel-1] INFO -- onNext(): 1
[parallel-5] INFO -- onNext(): 9
[parallel-4] INFO -- onNext(): 7
[parallel-6] INFO -- onNext(): 11
[parallel-3] INFO -- onNext(): 5
[parallel-8] INFO -- onNext(): 15
[parallel-2] INFO -- onNext(): 3
[parallel-1] INFO -- onNext(): 17
[parallel-2] INFO -- onNext(): 19
profile
잘못된 내용 PR 환영

0개의 댓글