[Reactive Programming] Project Reactor

DaeHoon·2023년 7월 23일
0

Project Reactor

  • Pivotal에서 개발
  • Spring Reactor에서 사용
  • Mono와 Flux Publisher 제공
  • Reactive Streams를 매우 잘 준수함

Project Reactor - Flux

  • 0..n개의 Item을 전달
  • 에러가 발생하면 error signal을 전달하고 종료
  • backpressure 지원

Flux - Example


// Subscripber
@RequiredArgsConstructor
public class SimpleSubscriber<T> implements Subscriber<T> {
    private final Integer count;
    @Override
    public void onSubscribe(Subscription s) {
        log.info("subscribe");
        s.request(count);
        log.info("request: {}", count);
    }
    @SneakyThrows
    @Override
    public void onNext(T t) {
        log.info("item: {}", t);
        Thread.sleep(100);
    }
    @Override
    public void onError(Throwable t) {
        log.error("error: {}", t.getMessage());
    }
    @Override
    public void onComplete() {
        log.info("complete");
    }
}
// Publisher
@Slf4j
public class FluxSimpleExample {
    public static void main(String[] args) {
        log.info("start main");
        getItems()
                .subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
    }
    
    private static Flux<Integer> getItems() {
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}
output
09:32:48 [main] - start main
09:32:48 [main] - subscribe
09:32:48 [main] - request: 2147483647
09:32:48 [main] - item: 1
09:32:48 [main] - item: 2
09:32:48 [main] - item: 3
09:32:49 [main] - item: 4
09:32:49 [main] - item: 5
09:32:49 [main] - complete
09:32:49 [main] - end main
  • 고정된 5개의 수를 Subscripber개를 전달함

Flux - subscribeOn

  • Subscripber 다른 스레드에서 실행하고 싶을 때 사용
@Slf4j
public class FluxSimpleSubscribeOnExample {
    @SneakyThrows
    public static void main(String[] args) {
        log.info("start main");
        getItems()
                .map(i -> {
            log.info("map {}", i);
            return i;
        })
        .subscribeOn(Schedulers.single())
                .subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
        Thread.sleep(1000);
    }
    private static Flux<Integer> getItems() {
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}
output
09:57:21 [main] - start main
09:57:21 [main] - subscribe
09:57:21 [main] - request: 2147483647
09:57:21 [main] - end main
09:57:21 [single-1] - map 1
09:57:21 [single-1] - item: 1
09:57:21 [single-1] - map 2
09:57:21 [single-1] - item: 2
09:57:22 [single-1] - map 3
09:57:22 [single-1] - item: 3
09:57:22 [single-1] - map 4
09:57:22 [single-1] - item: 4
09:57:22 [single-1] - map 5
09:57:22 [single-1] - item: 5
09:57:22 [single-1] - complete
  • single-1 스레드에서 Subscripbe가 실행됨.

Flux - subscribe

@Slf4j
public class FluxNoSubscribeExample {
    public static void main(String[] args) {
        log.info("start main");
        getItems();
        log.info("end main");
    }
    private static Flux<Integer> getItems() {
        return Flux.create(fluxSink -> {
            log.info("start getItems");
            for (int i = 0; i < 5; i++) {
            fluxSink.next(i);
        }
            fluxSink.complete();
            log.info("end getItems");
        });
    }
output
09:21:03 [main] - start main
09:21:03 [main] - end main
  • CompletionFuture의 문제점 중 하나는 반환하는 함수를 실행하는 순간 지연 로딩을 할 수 없는 채로 함수가 실행된다는 것.
  • 반면 Flux는 subscribe하지 않으면 아무 일도 일어나지 않는다
  • getItems()를 호출헤서 Flux를 반환받긴 했지만, subscribe 하지 않아 아무일도 일어나지 않았음.

Flux - backPressure

@Slf4j
@RequiredArgsConstructor
public class SimpleSubscriber<T> implements Subscriber<T> {
    private final Integer count;
    @Override
    public void onSubscribe(Subscription s) {
        log.info("subscribe");
        s.request(count);
        log.info("request: {}", count);
    }
}
@Slf4j
public class FluxSimpleRequestThreeExample {
    public static void main(String[] args) {
        getItems().subscribe(new SimpleSubscriber<>(3));
    }
    private static Flux<Integer> getItems() {
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}
output
09:02:41 [main] - subscribe
09:02:41 [main] - request: 3
09:02:41 [main] - item: 1
09:02:41 [main] - item: 2
09:02:41 [main] - item: 3
  • backpressure 개수인 3개만 요청 받고 종료됨

Flux - backPressure (Continuous Request)

// Subscriber
@Slf4j
public class ContinuousRequestSubscriber<T>
implements Subscriber<T> {
    private final Integer count = 1;
    private Subscription subscription = null;
    @Override
    public void onSubscribe(Subscription s) {
        this.subscription = s;
        log.info("subscribe");
        s.request(count);
        log.info("request: {}", count);
    }
    @SneakyThrows
    @Override
    public void onNext(T t) {
        log.info("item: {}", t);
        Thread.sleep(1000);
        subscription.request(1);
        log.info("request: {}", count);
    }
}
// Publisher
@Slf4j
public class FluxContinuousRequestSubscriberExample {
    public static void main(String[] args) {
        getItems().subscribe(new ContinuousRequestSubscriber<>());
    }
    private static Flux<Integer> getItems() {
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}
output
09:27:29 [main] - subscribe
09:27:29 [main] - request: 1
09:27:29 [main] - item: 1
09:27:30 [main] - request: 1
09:27:30 [main] - item: 2
09:27:31 [main] - request: 1
09:27:31 [main] - item: 3
09:27:32 [main] - request: 1
09:27:32 [main] - item: 4
09:27:33 [main] - request: 1
09:27:33 [main] - item: 5
09:27:34 [main] - request: 1
09:27:34 [main] - complete
  • onNext에서 1개씩 요청함

Flux - error

@Slf4j
public class FluxErrorExample {
    public static void main(String[] args) {
        log.info("start main");
        getItems().subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
    }
    private static Flux<Integer> getItems() {
        return Flux.create(fluxSink -> {
            fluxSink.next(0);
            fluxSink.next(1);
            var error = new RuntimeException("error in flux");
            fluxSink.error(error);
        });
    }
}
output
10:08:09 [main] - start main
10:08:09 [main] - subscribe
10:08:09 [main] - request: 2147483647
10:08:09 [main] - item: 0
10:08:09 [main] - item: 1
10:08:10 [main] - error: error in flux
10:08:10 [main] - end main

Flux - complete

@Slf4j
public class FluxCompleteExample {
    public static void main(String[] args) {
        log.info("start main");
        getItems().subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
    }
    private static Flux<Integer> getItems() {
        return Flux.create(fluxSink -> {
            fluxSink.complete();
        });
    }
}
output
10:08:54 [main] - start main
10:08:54 [main] - subscribe
10:08:54 [main] - request: 2147483647
10:08:54 [main] - complete
10:08:54 [main] - end main

Project reactor - Mono

  • 0..1개의 item을 전달
  • 에러가 발생하면 error signal 전달하고 종료
  • 모든 item을 전달했다면 complete signal 전달하고 종료
  • onNext를 호출 이후 자동으로 complete를 호출

Flux에서 하나의 값만 넘겨주면 되는데 왜 Mono를 사용할까?

Mono

@Slf4j
public class MonoSimpleExample {
    @SneakyThrows
    public static void main(String[] args) {
        log.info("start main");
        getItems()
                .subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
        Thread.sleep(1000);
    }
    private static Mono<Integer> getItems() {
        return Mono.create(monoSink -> {
            monoSink.success(1);
        });
    }
}
  • 1개의 item만 전달하기 때문에 next 하나만 실행하면 complete가 보장된다.
  • 혹은 전달하지 않고 complete를 하면 값이 없다는 것을 의미함
  • 하나의 값이 있거나 없다. (Optinal)

Mono와 Flux

  • Mono<T>: Optional<T>
    • 없거나 혹은 하나의 값
    • Mono<Void>로 특정 사건이 완료되는 시점을 가리킬 수도 있다
  • Flux<T>: List<T>
    • 무한하거나 유한한 여러 개의 값

Flux를 Mono로

@Slf4j
public class FluxToMonoExample {
    public static void main(String[] args) {
        log.info("start main");
        Mono.from(getItems()).subscribe(
                new SimpleSubscriber<>(Integer.MAX_VALUE)
        );
        log.info("end main");
    }
    private static Flux<Integer> getItems() {
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}
output
10:46:05 [main] - start main
10:46:06 [main] - subscribe
10:46:06 [main] - request: 2147483647
10:46:06 [main] - item: 1
10:46:06 [main] - complete
10:46:06 [main] - end main
  • Mono.from으로 Flux를 Mono로 변경하면 첫 번쨰 값만 전달된다.
  • 그러면 저 리스트를 통째로 받는 방법이 있을까?

Flux를 Mono로 (collectList)

@Slf4j
public class FluxToListMonoExample {
    public static void main(String[] args) {
        log.info("start main");
        getItems()
                .collectList()
                .subscribe(
                        new SimpleSubscriber<>(Integer.MAX_VALUE)
                );
        log.info("end main");
    }
    private static Flux<Integer> getItems() {
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}
output
10:41:41 [main] - start main
10:41:42 [main] - subscribe
10:41:42 [main] - request: 2147483647
10:41:42 [main] - item: [1, 2, 3, 4, 5]
10:41:42 [main] - complete
10:41:42 [main] - end main
  • Flux의 값들을 collect 하고 complete 이벤트가 발생하는 시점에 모은 값들을 전달

Mono를 Flux로 (flux())


@Slf4j
public class MonoToFluxExample {
    public static void main(String[] args) {
        log.info("start main");
        getItems().flux()
                .subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
    }
    private static Mono<List<Integer>> getItems() {
        return Mono.just(List.of(1, 2, 3, 4, 5));
    }
}
output
10:52:25 [main] - start main
10:52:25 [main] - subscribe
10:52:25 [main] - request: 2147483647
10:52:25 [main] - item: [1, 2, 3, 4, 5]
10:52:25 [main] - complete
10:52:25 [main] - end main
  • flux() 함수로 Mono를 next 한 번 호출하고 onComplete를 호출하는 Flux로 변환했다.

Mono를 Flux로 (flatMapMany())

@Slf4j
public class ListMonoToFluxExample {
    public static void main(String[] args) {
        log.info("start main");
        getItems()
                .flatMapMany(value -> Flux.fromIterable(value))
        .subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
    }
    private static Mono<List<Integer>> getItems() {
        return Mono.just(List.of(1, 2, 3, 4, 5));
    }
}
output
10:54:52 [main] - start main
10:54:52 [main] - Using Slf4j logging framework
10:54:52 [main] - subscribe
10:54:52 [main] - request: 2147483647
10:54:52 [main] - item: 1
10:54:52 [main] - item: 2
10:54:52 [main] - item: 3
10:54:52 [main] - item: 4
10:54:53 [main] - item: 5
10:54:53 [main] - complete
10:54:53 [main] - end main
  • flatMapMany() 함수로 Mono의 값으로 여러 개의 값을 전달하는 Flux를 만들고 연결함.
profile
평범한 백엔드 개발자

1개의 댓글

comment-user-thumbnail
2023년 7월 23일

많은 도움이 되었습니다, 감사합니다.

답글 달기