Reactor: Execution Control 3 - Broadcasting

xellos·2022년 6월 11일
0

JAVA-Reactor

목록 보기
11/11

네트워킹에서 broadcating은 다수의 receiver에게 동시에 이벤트를 발행하는 것이다. Reactive Stream 의 관점에서 이는 다수의 구독자에게 발행자가 동시에 이벤트를 발행하는 것을 의미한다.
Hot Publisher가 broadcasting 이벤트를 발행하는 것 처럼 보일 수 있으나, 근본적인 차이점은 이벤트 실행 스트림을 생성하고 시작하는 것에 있다. Reactor는 ConnectableFlux 를 생성하고 특정 수의 구독자를 달성할 때 까지 이벤트 발행을 연기한다. 이후 모든 구독자에게 이벤트를 발행한다.

1) replay operator

Reactor는 FluxConnectableFlux로 변경하기 위하여 replay 연산자를 제공한다. ConnectableFlux는 발행되는 이벤트를 버퍼에 담아서 보관합니다. 버퍼는 특정 구독자 수를 달성하기 전까지 계속 이벤트를 보관할 수 있도록 하거나 특정 시간 동안 담을 수 있도록 설정할 수 있습니다. 버퍼에 담긴 이벤트만 replay하여 구독자에게 전달하게 됩니다.

ConnectableFlux 발행 전에 설정한 구독자 수를 달성해야 하며, 아래의 연산자를 통하여 구독자를 설정할 수 있습니다.

  • Connect: connect 연산자는 충분한 구독자를 모은 이후 실행되어야 합니다. 반드시 구독자 수를 개발자 스스로 관리해야 합니다. 구독 취소도 개발자가 직접 추적해야 합니다.
  • Auto-Connect: 구독자 수를 설정할 수 있는 연산자 입니다. 발행자에게 동적으로 이루어지는 구독을 추적할 수 있습니다. 최선의 방법은 autoConnect 연산자를 사용하고, 구독의 관리를 Reactor 에게 맡기는 것입니다.
static void print(String text) {
	System.out.println("[" + Thread.currentThread().getName() + "] " + text);
}

Flux<Long> fibonacciGenerator = Flux.generate(
	() -> Tuples.<Long, Long> of(0L, 1L),
    (state, sink) -> {
    	if(state.getT1() < 0) sink.complete();
        else sink.next(state.getT1());
        
       	print("Generating next of " + state.getT2());
        return Tuples.of(state.getT2(), state.getT1() + state.getT2());
    });
    
@Test
void replay() {
	Flux<Long> broadcastGenerator = fibonacciGenerator
    	.doFinally(x -> { System.out.println("Closing "); })
        .replay()
        .autoConnect(2);
    
    fibonacciGenerator.subscribe(x -> System.out.println("[Fib] 1st : " + x));
    fibonacciGenerator.subscribe(x -> System.out.println("[Fib] 2st : " + x));
    
    broadcastGenerator.subscribe(x -> System.out.println("1st : " + x));
    broadcastGenerator.subscribe(x -> System.out.println("2st : " + x));
}	
  • 결과
1st : 0
2nd : 0
[Test worker] Generating next of 1
1st : 1
2nd : 1
[Test worker] Generating next of 1
1st : 1
2nd : 1

위의 코드를 실행하면 fibonacciGenerator는 구독이 이루어질 때마다 값을 반환한다. 그러나 broadcastGenerator가 발행된 겂을 모아서 한 번에 구독자에게 일괄적으로 이벤트를 통지한다. 위에서 언급connectautoConnect 연산자는 구독 이벤트를 계속 추적만 한다. 그러다 특정 수의 구독을 달성하면 이벤트 프로세스를 시작한다. 이는 발행자가 종료 이벤트를 발행하기 전까지 계속 이벤트를 발생하며 구독 취소는 추적하지 않기 때문에 구독자가 취소 이벤트를 요청해도 계속 이벤트를 발행한다.


refCount 연산자

위의 상황에 대비하여 Reactor는 refCount 연산자를 제공한다. 해당 연산자는 구독을 추적하다가 모든 구독자가 구독을 취소한 경우에 이벤트 발행을 멈춘다.

@Test
void refCount() throws InerruptedException() {
	Flux<Long> broadcastGenerator = fibonacciGenerator
    	.doFinally(x -> System.out.println("Closing "))
        .replay()
        .refCount(2);
        
    broadcastGenerator.subscribe(new BaseSubscriber<Long>() {
    	@Override
        protected void hookOnSubscribe(Subscription subscription) {
        	request(1);
        }
        
        @Override
        protected void hookOnNext(Long value) {
        	System.out.println("1st: " + value);
            cancel();
        }
    });
    
    broadbaseGenerator.subscribe((value) -> {
    	System.out.println("2nd : " + value);
        cancel();
    });
    
    Thread.sleep(500);
}
  • 결과
1st: 0
2nd : 0
Closing 

2) publish operator

Reactor는 publish 연산자를 제공하여 ConnectedFlux 를 생성합니다. 첫 번째 구독자에게 발행되는 이벤트를 버퍼에 저장하는 replay와 다르게, 해당 연산자는 source stream 으로부터 이벤트를 가져옵니다.
연산자는 구독자로부터 발생하는 수요를 계속 추적합니다. 만약 어떠한 구독자라도 수요를 일으키지 않으면, 이벤트 생성을 중단하며 이는 구독자들 중 수요를 일으킬 때까지 지속됩니다.

replay 연산자와 마찬가지로, 구독자 관리가 필요하다. 여기서는 위에서 소개한 connect, autoConnect, refCount 가운데 원하는 것을 골라서 옵션을 설정할 수 있다.

@Test
void publish() throws InterruptedException {
	Flux<Long> fibonacciGenerator = Flux.generate(
    	() -> Tuples.<Long, Long> of(0L, 1L),
        (state, sink) -> {
        	if(state.getT1() < 0) sink.complete();
            else sink.next(state.getT1());
            
            System.out.println("generating next of " + state.getT2());
            return Tuples.of(state.getT2(), state.getT1() + state.getT2());
        });
        
    fibonacciGenerator = fibonacciGenerator
    	.doFinally(x -> System.out.println("Closing "))
        .publish()
        .autoConnect(2);
        
    fibonacciGenerator.subscribe(new BaseSubscriber<Long>() {
    	@Override
        protected void hookOnSubscribe(Subscription subscription) {
        	request(1);
        }
        
        @Override
        protected void hookOnNext(Long value) {
        	System.out.println("1st: " + value);
        }
    });
    
    fibonacciGenerator.subscribe(x -> System.out.println("2nd : " + x));
    Thread.sleep(500);
}
  • 결과
1st: 0
2nd : 0

0개의 댓글