Chatper09. Sinks

김신영·2023년 7월 30일
0

Spring WebFlux

목록 보기
9/13
post-thumbnail

Sinks란?

리액티브 스트림즈의 Signal을 프로그래밍 방식으로 푸시할 수 있는 구조

  • Publisher와 Subscriber의 기능을 모두 지닌 Processor의 향상된 기능을 제공
  • Multi Thread 기반
  • 스레드 안정성 보장

Reactor에서 프로그래밍 방식으로 Singal을 전송하는 가장 일반적인 방법

  • generate() Operator
  • create() Operator
  • Single Thread 기반
  • 따라서 스레드 안정성 보장 ❌

코드 예시

  • create() Operator를 통한 Signal 전송
    public static void main(String[] args) throws InterruptedException {
        int tasks = 6;
        Flux
            .create((FluxSink<String> sink) -> {
                IntStream
                        .range(1, tasks)
                        .forEach(n -> sink.next(doTask(n)));
            })
            .subscribeOn(Schedulers.boundedElastic())
            .doOnNext(n -> log.info("# create(): {}", n))
            .publishOn(Schedulers.parallel())
            .map(result -> result + " success!")
            .doOnNext(n -> log.info("# map(): {}", n))
            .publishOn(Schedulers.parallel())
            .subscribe(data -> log.info("# onNext: {}", data));
    
        Thread.sleep(500L);
    }
    
    private static String doTask(int taskNumber) {
        // now tasking.
        // complete to task.
        return "task " + taskNumber + " result";
    }
    
    /*
    15:59:53.213 [boundedElastic-1] INFO - # create(): task 1 result
    15:59:53.215 [boundedElastic-1] INFO - # create(): task 2 result
    15:59:53.215 [boundedElastic-1] INFO - # create(): task 3 result
    15:59:53.215 [boundedElastic-1] INFO - # create(): task 4 result
    15:59:53.216 [boundedElastic-1] INFO - # create(): task 5 result
    15:59:53.219 [parallel-2] INFO - # map(): task 1 result success!
    15:59:53.220 [parallel-2] INFO - # map(): task 2 result success!
    15:59:53.220 [parallel-1] INFO - # onNext: task 1 result success!
    15:59:53.220 [parallel-2] INFO - # map(): task 3 result success!
    15:59:53.220 [parallel-1] INFO - # onNext: task 2 result success!
    15:59:53.220 [parallel-2] INFO - # map(): task 4 result success!
    15:59:53.220 [parallel-1] INFO - # onNext: task 3 result success!
    15:59:53.220 [parallel-2] INFO - # map(): task 5 result success!
    15:59:53.220 [parallel-1] INFO - # onNext: task 4 result success!
    15:59:53.220 [parallel-1] INFO - # onNext: task 5 result success!
    */
  • Sinks를 사용하는 코드 예제
    public static void main(String[] args) throws InterruptedException {
        int tasks = 6;
    
        Sinks.Many<String> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
        Flux<String> fluxView = unicastSink.asFlux();
        IntStream
                .range(1, tasks)
                .forEach(n -> {
                    try {
                        new Thread(() -> {
                            unicastSink.emitNext(doTask(n), Sinks.EmitFailureHandler.FAIL_FAST);
                            log.info("# emitted: {}", n);
                        }).start();
                        Thread.sleep(100L);
                    } catch (InterruptedException e) {
                        log.error(e.getMessage());
                    }
                });
    
        fluxView
                .publishOn(Schedulers.parallel())
                .map(result -> result + " success!")
                .doOnNext(n -> log.info("# map(): {}", n))
                .publishOn(Schedulers.parallel())
                .subscribe(data -> log.info("# onNext: {}", data));
    
        Thread.sleep(200L);
    }
    
    private static String doTask(int taskNumber) {
        // now tasking.
        // complete to task.
        return "task " + taskNumber + " result";
    }
    
    /*
    18:17:58.332 [Thread-0] INFO - # emitted: 1
    18:17:58.431 [Thread-1] INFO - # emitted: 2
    18:17:58.536 [Thread-2] INFO - # emitted: 3
    18:17:58.641 [Thread-3] INFO - # emitted: 4
    18:17:58.743 [Thread-4] INFO - # emitted: 5
    18:17:58.909 [parallel-2] INFO - # map(): task 1 result success!
    18:17:58.909 [parallel-2] INFO - # map(): task 2 result success!
    18:17:58.909 [parallel-1] INFO - # onNext: task 1 result success!
    18:17:58.909 [parallel-2] INFO - # map(): task 3 result success!
    18:17:58.910 [parallel-2] INFO - # map(): task 4 result success!
    18:17:58.910 [parallel-1] INFO - # onNext: task 2 result success!
    18:17:58.910 [parallel-2] INFO - # map(): task 5 result success!
    18:17:58.910 [parallel-1] INFO - # onNext: task 3 result success!
    18:17:58.910 [parallel-1] INFO - # onNext: task 4 result success!
    18:17:58.910 [parallel-1] INFO - # onNext: task 5 result success!
    */

참고: Thread Safety

What Is Thread-Safety and How to Achieve It? | Baeldung

Sinks 종류 및 특징

Sinks.One

  • 한 건의 데이터를 전송하는 방법을 정의해 둔 기능 명세
  • Sinks.one()
  • emitValue()
    • data
      • EmitFailureHandler
      • EmitFailureHandler.FAIL_FAST
        • 에러가 발생했을 때 재시도를 하지 않고 즉시 실패 처리를 한다.
public interface EmitFailureHandler {
	EmitFailureHandler FAIL_FAST = (signalType, emission) -> false;

	boolean onEmitFailure(SignalType signalType, EmitResult emitResult);
}
  • asMono()
    • Mono 객체로 변환
      • Mono의 의미 체계를 가진다.
      • with Mono semantics
Sinks.One<String> sinkOne = Sinks.one();
Mono<String> mono = sinkOne.asMono();

sinkOne.emitValue("Hello Reactor", FAIL_FAST);
sinkOne.emitValue("Hi Reactor", FAIL_FAST);
sinkOne.emitValue(null, FAIL_FAST);

mono.subscribe(data -> log.info("# Subscriber1 {}", data));
mono.subscribe(data -> log.info("# Subscriber2 {}", data));

/*
22:07:57.893 [main] DEBUG- onNextDropped: Hi Reactor
22:07:57.895 [main] INFO - # Subscriber1 Hello Reactor
22:07:57.896 [main] INFO - # Subscriber2 Hello Reactor
*/

Sinks.Many

  • 여러 건의 데이터를 여러 가지 방식으로 전송하는 기능을 정의해 둔 기능 명세
  • Sinks.many()
    • ManySpec 을 리턴
      public interface ManySpec {
      		UnicastSpec unicast();
      
      		MulticastSpec multicast();
      
      		MulticastReplaySpec replay();
      	}
  • asFlux()
    • Flux 객체로 변환

UnicastSpec

  • 단 하나의 Subscriber에게 데이터를 emit 한다.
Sinks.Many<Integer> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
Flux<Integer> fluxView = unicastSink.asFlux();

unicastSink.emitNext(1, FAIL_FAST);
unicastSink.emitNext(2, FAIL_FAST);

fluxView.subscribe(data -> log.info("# Subscriber1: {}", data));

unicastSink.emitNext(3, FAIL_FAST);

/* 주석 제거 시, IllegalStateException 발생
Caused by: java.lang.IllegalStateException: UnicastProcessor allows only a single Subscriber */
// fluxView.subscribe(data -> log.info("# Subscriber2: {}", data)); 

/*
22:20:11.465 [main] INFO - # Subscriber1: 1
22:20:11.466 [main] INFO - # Subscriber1: 2
22:20:11.466 [main] INFO - # Subscriber1: 3
*/

MulticastSpec

  • 하나 이상의 Subscriber에게 데이터를 emit한다.
Sinks.Many<Integer> multicastSink =
        Sinks.many().multicast().onBackpressureBuffer();
Flux<Integer> fluxView = multicastSink.asFlux();

multicastSink.emitNext(1, FAIL_FAST);
multicastSink.emitNext(2, FAIL_FAST);

fluxView.subscribe(data -> log.info("# Subscriber1: {}", data));
fluxView.subscribe(data -> log.info("# Subscriber2: {}", data));

multicastSink.emitNext(3, FAIL_FAST);

/*
22:34:19.836 [main] INFO - # Subscriber1: 1
22:34:19.837 [main] INFO - # Subscriber1: 2
22:34:19.838 [main] INFO - # Subscriber1: 3
22:34:19.838 [main] INFO - # Subscriber2: 3
*/

MulticastReplaySpec

  • 하나 이상의 Subscriber에게 데이터를 emit한다.
  • emit된 데이터 중에서 특정 시점으로 되돌린 데이터부터 emit한다. (replay)
Sinks.Many<Integer> replaySink = Sinks.many().replay().limit(2);
Flux<Integer> fluxView = replaySink.asFlux();

replaySink.emitNext(1, FAIL_FAST);
replaySink.emitNext(2, FAIL_FAST);
replaySink.emitNext(3, FAIL_FAST);

fluxView.subscribe(data -> log.info("# Subscriber1: {}", data));

replaySink.emitNext(4, FAIL_FAST);

fluxView.subscribe(data -> log.info("# Subscriber2: {}", data));

/*
22:37:18.104 [main] INFO - # Subscriber1: 2
22:37:18.105 [main] INFO - # Subscriber1: 3
22:37:18.105 [main] INFO - # Subscriber1: 4
22:37:18.105 [main] INFO - # Subscriber2: 3
22:37:18.105 [main] INFO - # Subscriber2: 4
*/
profile
Hello velog!

0개의 댓글