네트워킹에서 broadcating은 다수의 receiver에게 동시에 이벤트를 발행하는 것이다. Reactive Stream 의 관점에서 이는 다수의 구독자에게 발행자가 동시에 이벤트를 발행하는 것을 의미한다.
Hot Publisher가 broadcasting 이벤트를 발행하는 것 처럼 보일 수 있으나, 근본적인 차이점은 이벤트 실행 스트림을 생성하고 시작하는 것에 있다. Reactor는 ConnectableFlux
를 생성하고 특정 수의 구독자를 달성할 때 까지 이벤트 발행을 연기한다. 이후 모든 구독자에게 이벤트를 발행한다.
Reactor는 Flux
를 ConnectableFlux
로 변경하기 위하여 replay 연산자를 제공한다. ConnectableFlux
는 발행되는 이벤트를 버퍼에 담아서 보관합니다. 버퍼는 특정 구독자 수를 달성하기 전까지 계속 이벤트를 보관할 수 있도록 하거나 특정 시간 동안 담을 수 있도록 설정할 수 있습니다. 버퍼에 담긴 이벤트만 replay하여 구독자에게 전달하게 됩니다.
ConnectableFlux
발행 전에 설정한 구독자 수를 달성해야 하며, 아래의 연산자를 통하여 구독자를 설정할 수 있습니다.
autoConnect
연산자를 사용하고, 구독의 관리를 Reactor 에게 맡기는 것입니다.static void print(String text) {
System.out.println("[" + Thread.currentThread().getName() + "] " + text);
}
Flux<Long> fibonacciGenerator = Flux.generate(
() -> Tuples.<Long, Long> of(0L, 1L),
(state, sink) -> {
if(state.getT1() < 0) sink.complete();
else sink.next(state.getT1());
print("Generating next of " + state.getT2());
return Tuples.of(state.getT2(), state.getT1() + state.getT2());
});
@Test
void replay() {
Flux<Long> broadcastGenerator = fibonacciGenerator
.doFinally(x -> { System.out.println("Closing "); })
.replay()
.autoConnect(2);
fibonacciGenerator.subscribe(x -> System.out.println("[Fib] 1st : " + x));
fibonacciGenerator.subscribe(x -> System.out.println("[Fib] 2st : " + x));
broadcastGenerator.subscribe(x -> System.out.println("1st : " + x));
broadcastGenerator.subscribe(x -> System.out.println("2st : " + x));
}
1st : 0
2nd : 0
[Test worker] Generating next of 1
1st : 1
2nd : 1
[Test worker] Generating next of 1
1st : 1
2nd : 1
위의 코드를 실행하면 fibonacciGenerator는 구독이 이루어질 때마다 값을 반환한다. 그러나 broadcastGenerator가 발행된 겂을 모아서 한 번에 구독자에게 일괄적으로 이벤트를 통지한다. 위에서 언급connect
와 autoConnect
연산자는 구독 이벤트를 계속 추적만 한다. 그러다 특정 수의 구독을 달성하면 이벤트 프로세스를 시작한다. 이는 발행자가 종료 이벤트를 발행하기 전까지 계속 이벤트를 발생하며 구독 취소는 추적하지 않기 때문에 구독자가 취소 이벤트를 요청해도 계속 이벤트를 발행한다.
위의 상황에 대비하여 Reactor는 refCount
연산자를 제공한다. 해당 연산자는 구독을 추적하다가 모든 구독자가 구독을 취소한 경우에 이벤트 발행을 멈춘다.
@Test
void refCount() throws InerruptedException() {
Flux<Long> broadcastGenerator = fibonacciGenerator
.doFinally(x -> System.out.println("Closing "))
.replay()
.refCount(2);
broadcastGenerator.subscribe(new BaseSubscriber<Long>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(1);
}
@Override
protected void hookOnNext(Long value) {
System.out.println("1st: " + value);
cancel();
}
});
broadbaseGenerator.subscribe((value) -> {
System.out.println("2nd : " + value);
cancel();
});
Thread.sleep(500);
}
1st: 0
2nd : 0
Closing
Reactor는 publish
연산자를 제공하여 ConnectedFlux
를 생성합니다. 첫 번째 구독자에게 발행되는 이벤트를 버퍼에 저장하는 replay
와 다르게, 해당 연산자는 source stream 으로부터 이벤트를 가져옵니다.
연산자는 구독자로부터 발생하는 수요를 계속 추적합니다. 만약 어떠한 구독자라도 수요를 일으키지 않으면, 이벤트 생성을 중단하며 이는 구독자들 중 수요를 일으킬 때까지 지속됩니다.
replay
연산자와 마찬가지로, 구독자 관리가 필요하다. 여기서는 위에서 소개한 connect
, autoConnect
, refCount
가운데 원하는 것을 골라서 옵션을 설정할 수 있다.
@Test
void publish() throws InterruptedException {
Flux<Long> fibonacciGenerator = Flux.generate(
() -> Tuples.<Long, Long> of(0L, 1L),
(state, sink) -> {
if(state.getT1() < 0) sink.complete();
else sink.next(state.getT1());
System.out.println("generating next of " + state.getT2());
return Tuples.of(state.getT2(), state.getT1() + state.getT2());
});
fibonacciGenerator = fibonacciGenerator
.doFinally(x -> System.out.println("Closing "))
.publish()
.autoConnect(2);
fibonacciGenerator.subscribe(new BaseSubscriber<Long>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(1);
}
@Override
protected void hookOnNext(Long value) {
System.out.println("1st: " + value);
}
});
fibonacciGenerator.subscribe(x -> System.out.println("2nd : " + x));
Thread.sleep(500);
}
1st: 0
2nd : 0