[Reactive] 09. Sinks

Jimin Lim·2024년 3월 31일
0

Spring

목록 보기
16/18
post-thumbnail

9.1 Sinks란?

Sinks는 Processor의 기능을 개선해 Reactor 3.5.0 버전부터 지원되었다.

  • Processor: Publisher와 Subscriber 기능을 모두 지님

Flux, Mono는 onNext와 같은 Signal을 내부적으로 전송해 주는 방식이었고, Sinks는 프로그래밍 코드를 통해 명시적으로 Signal을 전송할 수 있다.

    public static void main(String[] args) throws InterruptedException {
        int tasks = 6;
        Flux
            .create((FluxSink<String> sink) -> {
                IntStream
                        .range(1, tasks)
                        .forEach(n -> sink.next(doTask(n)));
            })
            .subscribeOn(Schedulers.boundedElastic()) //작업 처리
            .doOnNext(n -> log.info("# create(): {}", n))
            .publishOn(Schedulers.parallel()) //result 가공
            .map(result -> result + " success!")
            .doOnNext(n -> log.info("# map(): {}", n))
            .publishOn(Schedulers.parallel()) //subscriber에게 가공된 결과 전달
            .subscribe(data -> log.info("# onNext: {}", data));

        Thread.sleep(500L);
    }

    private static String doTask(int taskNumber) {

        return "task " + taskNumber + " result";
    }

스레드를 지정할 때 subscribeOn, publishOn 구분하는 이유가 뭘까..

  • subscribeOn: Flux.create 내부 동작들에 대한 스레드 설정, 다운스트림에서 해당 스케줄링으로 받겠다 설정 같음
  • publishOn: 해당 스케줄링으로 publish 하겠다 의미 같음 ,,

위 코드에서 doTask() 메서드가 여러 개의 스레드에서 각각 다른 작업들을 처리한 후, 결과를 반환받으면 문제 발생할 수 있다. 따라서 Sinks를 사용할 수 있다.

9.2 Sinks 종류 및 특징

Sinks를 이용해 프로그래밍 방식으로 signal을 전송할 수 있는 방법은 Sinks.One, Sinks.Many가 있다.

Sinks.one

  • Sinks.one: 한 건의 데이터를 프로그래밍 방식으로 emit
Sinks.One<String> sinkOne = Sinks.one();
Mono<String> mono = sinkOne.asMono();

sinkOne.emitValue("Hello Reactor", FAIL_FAST);
sinkOne.emitValue("Hi Reactor", FAIL_FAST); //Drop

mono.subscribe(data -> log.info("# Subscriber1 {}", data));
mono.subscribe(data -> log.info("# Subscriber2 {}", data));
  • FAIL_FAST: emit 도중 에러가 발생할 경우 어떻게 처리할 것인지, 재시도하지 않고 즉시 실패 처리하여 교착 상태등을 미연에 방지한다.
  • emitValue: 처음 emit한 데이터는 정상적으로 emit, 나머지는 Drop

Sinks.Many

  • Sinks.Many: 여러 건의 데이터를 여러 가지 방식으로 전송
    • 여기서 여러 가지 방식은 UnicastSpec(1 to 1), MultiCastSpec(1 to Many, Hot Sequence), MulticastReplaySpec(1 to Many, 특정 시점으로 되돌림)가 있다.

UnicastSpec 예시

Sinks.Many<Integer> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
Flux<Integer> fluxView = unicastSink.asFlux();

unicastSink.emitNext(1, FAIL_FAST);
unicastSink.emitNext(2, FAIL_FAST);


fluxView.subscribe(data -> log.info("# Subscriber1: {}", data));

unicastSink.emitNext(3, FAIL_FAST);

  • Unicast 방식이므로 구독 하나를 추가한다면 IllegalStateException이 발생한다.

MulticastSpec 예시

        Sinks.Many<Integer> multicastSink = Sinks.many().multicast().onBackpressureBuffer();
        Flux<Integer> fluxView = multicastSink.asFlux();

        multicastSink.emitNext(1, FAIL_FAST);
        multicastSink.emitNext(2, FAIL_FAST);

        fluxView.subscribe(data -> log.info("# Subscriber1: {}", data));
        fluxView.subscribe(data -> log.info("# Subscriber2: {}", data));

        multicastSink.emitNext(3, FAIL_FAST);

  • hot publisher로 동작하며, 특히 onBackPressureBuffer()는 warm up 특징을 가져 첫번째 구독이 발생한 시점에 데이터가 전달된다.
    • warm up: 최초 구독이 발생해야 Publisher가 데이터를 emit
    • Hot: Subscriber 구독과 상관없이 데이터를 emit
  • multicast() 대신 replay()를 호출하면 emit된 데이터를 replay하여 구독 전에 emit된 데이터라도 Subscriber가 받을 수 있도록 할 수 있다.
profile
💻 ☕️ 🏝 🍑 🍹 🏊‍♀️

0개의 댓글