스프링 리액터 시작하기 (5) Scheduler

brian Byeon·2022년 5월 1일
0

0. 자료의 출처

https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html#webflux-new-framework
https://javacan.tistory.com/entry/spring-reactor-intro-list
오해가 없도록 어떤 text를 인용했는지 원문을 첨부합니다.🤗
https://javacan.tistory.com/entry/Reactor-Start-6-Thread-Scheduling

1. 쓰레드 스케줄링

Flux와 Mono는 기본적으로 main 쓰레드 하나에서 실행된다. 단, Scheduler를 사용해서 next, complete, error 신호를 별도 쓰레드로 처리할 수 있다.

1.1 PublishOn()

publishOn()을 통해서 map(), filter, flatMap() 변환 등을 각각의 쓰레드에서 실행시킬 수 있다.

CountDownLatch latch = new CountDownLatch(1);
Flux.just(1,2,3,4,5,6)
    .map(i -> i+10)
    .publishOn(Schedulers.newElastic("PUB"),2)
    .map(i -> i + 1) //publishOn에서 지정한 PUB 스케주러가 실행
    .subscribe(new BaseSubscriber<Integer>() {
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                logger.info("hookOnSubscribe");
                requestUnbounded();
            }

            @Override
            protected void hookOnNext(Integer value) {
                logger.info("hookOnNext: " + value); // publishOn에서 지정한 스케줄러가 실행

            }



            @Override
            protected void hookOnComplete() {
                logger.info("hookOnComplete"); // publishOn에서 지정한 스케줄러가 실행
                latch.countDown();

            }

        });

latch.await();
//https://javacan.tistory.com/entry/Reactor-Start-6-Thread-Scheduling

publishOn()은 두개의 인자를 받는다.
첫 인자로 지금 Schedulers.newElastic("PUB")가 들어가있다.
이는 Elastic 스케줄러를 생성해서 PUB라는 이름을 쓰레드에 부여한 것이고
두번째 인자인 2는 스케줄러가 신호를 처리하기전 (prefetch) 미리 가져올 데이터의 개수이다. 버퍼처럼 몇개를 쌓아두겠다는 이야기.

처음 publishOn이 실행된 이후부터는 (2개를 버퍼에 넣을 수 있게 map(i->i+10)이 두번 실행되어 1, 2가 넘어온 뒤부터) 계속 PUB 쓰레드가 실행을 한다.
즉 publishOn에 지정한 스케쥴러는 다음 publishOn()을 설정할때까지 적용된다.

13:01:03.026 [main] INFO schedule.ScheduleTest - hookOnSubscribe
13:01:03.029 [main] INFO schedule.ScheduleTest - map 1: 1 + 10
13:01:03.030 [main] INFO schedule.ScheduleTest - map 1: 2 + 10
13:01:03.031 [PUB-2] INFO schedule.ScheduleTest - map 2: 11 + 10
13:01:03.037 [PUB-2] INFO schedule.ScheduleTest - hookOnNext: 21
13:01:03.037 [PUB-2] INFO schedule.ScheduleTest - map 2: 12 + 10
13:01:03.037 [PUB-2] INFO schedule.ScheduleTest - hookOnNext: 22
13:01:03.037 [PUB-2] INFO schedule.ScheduleTest - map 1: 3 + 10
13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - map 1: 4 + 10
13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - map 2: 13 + 10
13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - hookOnNext: 23
13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - map 2: 14 + 10
13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - hookOnNext: 24
(중략)//https://javacan.tistory.com/entry/Reactor-Start-6-Thread-Scheduling
Flux.range(1,6)
.publishOn(Schedulers.newElastic("PUB1"),2)
.map(i-> {
   logger.info("map 1:{}+10",i);
   return i+10;
   })
.publishOn(Schedulers.newElastic("PUB2"))
.map(i-> {
     logger.info("map 1:{}+10",i);
      return i+10;
      })  
      //https://javacan.tistory.com/entry/Reactor-Start-6-Thread-Scheduling

이렇게 두면
첫 map 1 부분은 모두 PUB1이 처리하고, map2는 PUB2가 처리한다. 또 비동기식으로 작동해서 map1중 먼저 끝난 것들은(1이나 2처럼 앞의것) map2에 먼저 들어가고, map1중 늦게 끝나는 것들(5나 6)의 뒤에 들어간다.

OnNext()같은것은 map1, map2가 끝난 뒤에 수행되는 메서드인데, 그렇기 때문에 PUB2가 실행시킨다. OnError(), OnComplete도 PUB2가 실행시키게 된다.

1.2 SubscribeOn()

시퀀스의 request(1)이나 subscription의 onNext(),onError,onComplete 같은 신호처리를 실행시킨다. 그리고 publishOn()이 나타나기 전까지의 map 함수등의 실행을 이 쓰레드에서 진행한다.
주로 원본 시퀀스의 신호 발생을 처리하는 스케쥴러를 지정한다.

기본 스케줄러로는 다음 4가지를 제공한다

Schedulers.immediate(): 현재 쓰레드에서 실행
Schedulers.single(): 쓰레드가 한개인 쓰레드 풀에서 실행. 한 쓰레드 공유
Schedulers.elastic(): 블로킹IO를 리액터로 처리할 때 사용. 쓰레드가 필요하면 새로 생성. 일정 시간 이상 풀에 머무는 쓰레드는 제거
Schedulers.parallel(): 병렬작업에 적합하게 고정 크기 쓰레드 풀 이용해서 실행.

single(), elastic(), parallel()은 매번 새로운 쓰레드 풀을 만들지 않고 동일한 쓰레드 풀을 리턴한다. 예를 들어 아래 코드에서 두 publishOn()은 같은 쓰레드 풀을 공유한다.



someFlux.publishOn(Schedulers.parallel())

            .map(...)

            .publishOn(Schedulers.parallel())

            .subscribe(...);
            //https://javacan.tistory.com/entry/Reactor-Start-6-Thread-Scheduling

newSingle(String name, boolean daemon)
newElastic(String name, int ttlSeconds, boolean daemon)
newParallel(String name, int parallelism, boolean daemon)

name:은 쓰레드 접두사
daemon: default는 false. 데몬 쓰레드가 아니면 job끝났을때 dispose() 호출해서 풀에 있는 쓰레드 종료 해야.
ttlSeconds: elastic 쓰레드풀 유휴시간
parallelism: 작업 쓰레드 개수- default는 Runtime.getRuntime().availableProcessors()가 리턴한 값. 즉 코어 개수.

// 비데몬 스케줄러 초기화

Scheduler scheduler = Schedulers.newElastic("SUB", 60, false);



// 비데몬 스케줄러 사용

someFlux.publishOn(scheduler)

            .map(...)

            .subscribe(...)

// 어플리케이션 종료시에 스케줄러 종료 처리

scheduler.dispose();
//https://javacan.tistory.com/entry/Reactor-Start-6-Thread-Scheduling

이렇게 해줘야 모든 쓰레드가 종료되어 JVM 이 종료된다. daemon쓰레드로 다 만들던가 dispose()를 해주어라.

1.3 Flux.interval일정 주기로 tick 발생

interval()은 Schedulers.parallel()을 사용해 신호를 주기적으로 발생. 다른 스케줄러는 interval(Duration,Scheduler) 메서드 사용

Flux.interval(Duration.ofSeconds(1)) 이런식으로.

2.Flux 병렬 처리

단순히 실행 순서별로 publishOn으로 병렬처리를 하는게 아니라, 우리는 elementwise 병렬 처리가 해보고 싶다. 우리는 flux.parallel 메서드와 flux.runOn 메서드로 이를 구현할 수 있다.

Flux.range(1, 20)

        .parallel(2) // 작업을 레일로 나누기만 함

        .runOn(Schedulers.newParallel("PAR", 2))  // 각 레일을 병렬로 실행

        .map(x -> {

            int sleepTime = nextSleepTime(x % 2 == 0 ? 50 : 100, x % 2 == 0 ? 150 : 300);

            logger.info("map1 {}, sleepTime {}", x, sleepTime);

            sleep(sleepTime);

            return String.format("%02d", x);

        })

        .subscribe(i -> logger.info("next {}", i) );



// nextSleepTime은 인자로 받은 두 정수 값 범위에 해당하는 임의의 값을 생성한다고 가정
//https://javacan.tistory.com/entry/Reactor-Start-7-Parallel

Flux.parallel(int parallelism)의 인자만큼 라운드 로빈으로 신호를 나눈다. 분리한 신호는 일종의 신호를 전달할 레일을 구성한다. 레일이란 작업 queue라고 보면 된다. [1,3,5,7...][2,4,6,8...] 이런식의 queue가 2개 생겨서 runOn()메서드에 다중 쓰레드를 사용하는 스케줄러를 전달해 각 레일에서의 신호를 처리할 수 있게 한다.

실제 쓰레드 개수보다 많이 queue를 나눠버리면 (parallelism 값을 크게 주면) 그냥 먼저 끝난 쓰레드가 맡아서 처리하게 된다.

한 레일당 몇개의 데이터?

Queues.SMALL_BUFFER_SIZE만큼 데이터를 저장해두는데, reactor.bufferSize.small이라는 system property를 사용해서, 이 값이 미지정되어 있다면 256, 16보다 작으면 16사용.

만약 채워놓을 개수를 변경하려면


Flux.range(1, 20)

        .parallel(4)

        .runOn(Schedulers.newParallel("PAR", 2), 2) // 레일에 미리 채울 값으로 2 사용

        .subscribe(x -> logger.info("next {}", x));
        

레일0: 1, 5
레일1: 2, 6
레일2: 3, 7
레일3: 4, 8

mono각각을 병렬로 처리할수 없나?

Mono m3 = Mono.just(3).map(x -> {

    logger.info("3 sleep");

    sleep(2000);

    return x;

}).subscribeOn(Schedulers.parallel());

Mono.zip(m1, m2, m3)

        .subscribe(tup -> logger.info("next: {}", tup);
        //https://javacan.tistory.com/entry/Reactor-Start-7-Parallel

zip으로 묶어서 처리하면 병렬처리가 된다.
onNext()값으로 [1,2,3]이 결과가 된다.

결과를 List 콜렉션으로 받을 수 없나?

List 콜렉션으로 모으기: collectList()
Flux는 데이터를 콜렉션으로 모을 수 있는 기능을 제공한다. 이 중에서 List로 모아주는 collectList()는 다음과 같이 사용한다.



Mono<List<Integer>> mono = someFlux.collectList();

mono.subscribe(lst -> System.out.println(lst));



collectList()의 리턴 타입은 Mono<List<T>>이므로 Mono를 구독해서 값을 사용하면 된다.

0개의 댓글