reactor operator 참조
: https://projectreactor.io/docs/core/release/reference/#which-operator
- 흐름
=>Publisher → [Data 1] → OP1 → [Data 2] → OP2 → [Data 3] → Subscriber
- WebFlux의 기반인 Reactor에서는
Flux와Mono라는Publisher를 사용- Publisher에서 데이터를 변경하며, 새로운 Publisher / Subscriber로 생성하는 도구
- Flux 와 Mono의 많은 operator를 통해 데이터를 조작할 수 있다
ex)just()/range()/flatMap()/groupBy()등
시퀀스 생성 시 (Creating a New Sequence)
(
#앞에Mono혹은Flux가 없다면둘다 가능한 것)
- just
just(T data)- null이 존재할 경우 T :
Mono#justOrEmpty(T data)
- iterate 관련
- array :
Flux#fromArray(T[] array)- collection 또는 iterable :
Flux#fromIterable(Iterable it)- 정수 범위 :
Flux#range(int start, int count)- 각 Subscription에 제공된 Stream :
Flux#fromStream(Supplier<Stream>)
기존 시퀀스를 변경하는 경우(Transforming an Existing Sequence)
- 기존 데이터 변경
- 1 to 1 :
Flux#map()- 1 to N
- 순서 보장 X :
Flux#flatMap()- 순서 보장 O :
Flux#flatMapSequential()/Flux#concatMap()
- Flux를 합치고 싶은 경우
- list로 :
Flux#collectList()/Flux#collectSortedList()
(Mono<List>로 반환)- map으로 :
Flux#collectMap()/Flux#collectMultiMap()- 각 element 사이에 function 적용 :
reduce()- sequence 수 :
count()- 방출된 순서대로 결합 (기존 순서 X)
- 같은 타입 :
Flux#merge()- 다른 타입으로 변환 :
Flux#zip()/Flux#zipWith()
시퀀스 엿보기(Peeking into a Sequence)
- 최종 sequence를 수정하지 않고 다음을 수행
- 방출 시 :
doOnNext()- 완료 시 :
Flux#doOnComplete(),Mono#doOnSuccess()- 에러 종료 시 :
doOnError()- 취소시 :
doOnCancel()- subscription 이후 :
doOnSubscribe()- 요청 시 :
doOnRequest()- 로그를 보고 싶은 경우 :
log()
시퀀스 필터링 (Filtering a Sequence)
- 중복을 무시
- 전체 sequence에 대해 :
Flux#distinct()- 이후 방출된 item에 대해 :
Flux#distinctUntillChanged()
- 주어진 Predicate에 대해
- 동기 실행 :
filter()- 비동기 실행 :
filterWhen()
비동기 실행
- 개요
WebFlux도 아무런 동작을 하지 않으면 동기적으로 스트림을 처리 ->하나의 스레드Scheduler를 통해서 동시성(병렬처리)과 실행 순서를 적절하게 관리할 수 있다
subscribeOn(Scheduler scheduler)
- 스트림을 구독할 때 동작을 지정한 스레드 풀에서 수행할 수 있도록 설정하는 operator
- 즉, publisher가 subscribe() 하는 과정을 별도의 스레드에서 비동기적으로 처리
Flux.subscribeOn(Schedulers.single()).subscribe()
=> 지정한 하나의 스레드에서 구독을 수행Flux.subscribeOn(Schedulers.parallel()).subscribe()
=> 최대 CPU Core 개수 만큼의 worker를 만들어서 병렬로 처리
publishOn(Scheduler Scheduler)
subscriber가 처리하는 부분인onNext()/onComplete()/onError()를 지정한 스데르 풀에서 수행onSubscribe()/request()는 별도의 설정이 없으면기존 스레드에서 수행이후 더 많은 operator는 Mono / Flux docs에서 확인
Cold Publisher
- 기본적으로 특별하게 Hot을 취급하는 연산자(
operator)가 아닌 이상Flux,Mono는Cold로 동작- 각 구독(
subscribe)에 대해 항상데이터를 새로 생성- (
중요) 구독하지 않으면 데이터가 생성되지 X
Hot Publisher
- 구독(
subscribe)하기 전에데이터 스트림을동작하게 할 수 있다- 구독자 수에 의존하지 않고,
즉시 데이터 게시를 시작할 수 있다반복적으로 발생하는 데이터 스트림을미리 동작해서 공유하면,중복을 방지할 수 있다
=>Flux.subscribeOn(Schedulers.single())을 통해서하나의 스레드에서 수행하게 가능
=> 그리고,Flux.publishOn()을 통해서 각구독자 동작을별도의 스레드에서 수행하면,효율적인 병렬 처리가 가능!
Cold -> Hot 변환
Hot Publisher로 변환하려면,ConnectableFlux로 변환하면 된다
Flux#publish()
- Flux -> ConnectableFlux 로 변환하여 반환
ConnectableFlux#autoConnect()
- 최소 구독 개수를 만족하면 자동으로 connect()를 호출하는 역할
ConnectableFlux#refCount()
- 구독자 수를 세서, 하나도 없으면 기존 소스의 스트림 구독을 해제하는 역할
- 소스(source)가 무한으로 값을 생성할 때 구독을 막기 위한 용도로 사용
Flux.publish().refCount()를 추상화한 operator가 =>Flux.share()
@GetMapping("/fruit") Flux<FruitInfo> getFruit(){ /* 3개의 List<String> 데이터 준비 */ final List<String> basket1 = Arrays.asList(new String[]{"kiwi", "orange", "lemon", "orange", "lemon", "kiwi"}); final List<String> basket2 = Arrays.asList(new String[]{"banana", "lemon", "lemon", "kiwi"}); final List<String> basket3 = Arrays.asList(new String[]{"strawberry", "orange", "lemon", "grape", "strawberry"}); final List<List<String>> baskets = Arrays.asList(basket1, basket2, basket3); /* List -> Flux */ final Flux<List<String>> basketFlux = Flux.fromIterable(baskets); /* concatMap() 을 통해 하나의 stream씩 순차적으로 실행 */ Flux<FruitInfo> result = basketFlux.concatMap(basket -> { /* Hot Publisher인 source를 미리 만들어서 반복 작업을 최소화하고, 하나의 스레드에서 통일되게 사용되도고 Schedulers.single() 적용 */ final Flux<String> source = Flux.fromIterable(basket).publish().autoConnect(2).subscribeOn(Schedulers.single()); /* publishOn() 을 통해서 subscribe의 부분인 onNext() ~ onComplete() 까지를 별도의 스레드에서 수행*/ final Mono<List<String>> distinctFruits = source.publishOn(Schedulers.parallel()).distinct().collectList().log(); Mono<Map<String, Long>> countFruitsMono = source.publishOn(Schedulers.parallel()) .groupBy(fruit -> fruit) .concatMap(groupFlux -> groupFlux.count() .map(count -> { final Map<String, Long> fruitCount = new LinkedHashMap<>(); fruitCount.put(groupFlux.key(), count); return fruitCount; }) ) .reduce((accumulatedMap, currentMap) -> new LinkedHashMap<String, Long>() {{ putAll(accumulatedMap); putAll(currentMap); }}).log(); /* Flux.zip() 을 통해서 새로운 타입을 가지는 하나의 객체로 합친다 */ return Flux.zip(distinctFruits, countFruitsMono, (distinct, count) -> new FruitInfo(distinct, count)); }).log(); return result; }