스프링 리액터 시작하기 (4) flux Subscribe

brian Byeon·2022년 5월 1일
1

0. 자료의 출처

https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html#webflux-new-framework
오해가 없도록 어떤 text를 인용했는지 원문을 첨부합니다.🤗

1. BaseSubscriber

전편에서 우리는 lambda식으로 onNext(), onError(), onComplete(), onSubscribe()를 처리해보았다.

Flux<Integer> ints = Flux.range(1, 4);
ints.subscribe(i -> System.out.println(i),
    error -> System.err.println("Error " + error),
    () -> System.out.println("Done"),
    sub -> sub.request(10));

람다식 대신 클래스를 집어 넣을 수는 없을까?

Instances of BaseSubscriber (or subclasses of it) are single-use, meaning that a BaseSubscriber cancels its subscription to the first Publisher if it is subscribed to a second Publisher. That is because using an instance twice would violate the Reactive Streams rule that the onNext method of a Subscriber must not be called in parallel. As a result, anonymous implementations are fine only if they are declared directly within the call to Publisher#subscribe(Subscriber).

BaseSubscriber라는 extendable class가 있다. 이를 상속받아서 쓰면 된다. BaseSubscriber 인스턴스는 하나당 한군데에만 사용이 가능하다. 즉, 첫번째 publisher에 넣었다가 두번째 publisher에 넣는 순간 Reactive Streams의 규칙중 하나인 onNext 시에 subscriber가 parallel하게 불려서는 안된다는 규칙을 어기게 된다. 이 경우 두번째 publisher에게 넣을때 첫번째 publisher을 cancel시킨다.

SampleSubscriber<Integer> ss = new SampleSubscriber<Integer>();
Flux<Integer> ints = Flux.range(1, 4);
ints.subscribe(ss);

이런식으로 subscribe가 가능하다. 이제 BaseSubscriber는 onNext, OnsubScribe, 등에 대해 메서드를 오버라이드 할 수 있게 해놓은 것이다.

package io.projectreactor.samples;

import org.reactivestreams.Subscription;

import reactor.core.publisher.BaseSubscriber;

public class SampleSubscriber<T> extends BaseSubscriber<T> {

	public void hookOnSubscribe(Subscription subscription) {
		System.out.println("Subscribed");
		request(1);
	}

	public void hookOnNext(T value) {
		System.out.println(value);
		request(1);
	}
}

이 경우에는 하나씩 request를 내보내게 했는데,

Subscribed
1
2
3
4

와 같은 결과가 나온다.

BaseSubscriber also offers a requestUnbounded() method to switch to unbounded mode (equivalent to request(Long.MAX_VALUE)), as well as a cancel() method.

requestUnbounded를 이용해 request(Long.MAX_VALUE)) 이런식으로 받을 수 있다.

It also has additional hooks: hookOnComplete, hookOnError, hookOnCancel, and hookFinally (which is always called when the sequence terminates, with the type of termination passed in as a SignalType parameter)

You almost certainly want to implement the hookOnError, hookOnCancel, and hookOnComplete methods. You may also want to implement the hookFinally method. SampleSubscriber is the absolute minimum implementation of a Subscriber that performs bounded requests.

hookOnComplete, hookOnError, hookOnCancel, and hookFinally
들이 있다.

2. BackPressure과 Reshape Requests

When implementing backpressure in Reactor, the way consumer pressure is propagated back to the source is by sending a request to the upstream operator. The sum of current requests is sometimes referenced to as the current “demand”, or “pending request”. Demand is capped at Long.MAX_VALUE, representing an unbounded request (meaning “produce as fast as you can” — basically disabling backpressure).

reactor에서 소비자가 원하는 만큼만 publisher가 emit하게 하는 것은 request(number)을 전달하는 것이다. 현재의 request 개수의 합은 현재의 "demand" 수요로 여겨지는데, Long.MAX_VALUE로 이를 설정하면 as fast as you can 이라는 의미가 된다. (disabling backpressure)

일반적인 모든 subscribing은 unbounded request를 보내게 된다. 해당 메서드들은 이렇다

  • subscribe()와 lambda식 포함하고 있는 애들 (Consumer 있는 애는 빼고)
  • block(), blockFirst() and blockLast()
  • toIterable()이나 toStream()으로 iterating 한다면

그런데 이제 만약 BaseSbuscriber를 활용하면 request 수를 지정해야 한다. 아니면(request(1); 하지 않으면) emit하지 않는다.
이제 람다식이 아니라 BaseSubscriber를 활용해보면,

  Flux.range(1, 10)
    .doOnRequest(r -> System.out.println("request of " + r))
    .subscribe(new BaseSubscriber<Integer>() {

      @Override
      public void hookOnSubscribe(Subscription subscription) {
        request(1);
      }

      @Override
      public void hookOnNext(Integer integer) {
        System.out.println("Cancelling after having received " + integer);
        cancel();
      }
    });

가 된다.

 request of 1
Cancelling after having received 1

하나의 request가 불리고, request 개수는 1개이고, onNext시에 cancel 때문에 끝났다.

3. Cold Sequence 와 Hot Sequence

구독 시점부터 데이터를 새로 생성하는 cold sequence가 있고, 구독자 수에 상관없이 데이터를 생성하는 hot sequence가 있다.

Flux<Integer> seq = Flux.just(1,2,3);
seq.subscribe(val->Sys.out.println(val));
seq.subscribe(val->Sys.out.println("2"+val));

하면

1
2
3
21
22
23

이 나오는데, 각 구독마다 새로 데이터가 생성.

4. spring flux와 mono 기초

https://javacan.tistory.com/entry/spring-reactor-intro-list
블로그를 참조합니다. 정리가 잘 되어 있어 보기가 편리합니다.

Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)
Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator,
                           Consumer<? super S> stateConsumer)

Generate 메서드는 첫 인자를 initial state를 제공받을 수 있는 함수를 받는다.
generator는 SynchronousSink 클래스를 입력받는데, SynchronousSink는 해당 인스턴스의 .next가 불릴때마다 하나의 state를 리턴해주는 메서드를 가지고 있다. ( SynchronousSink.next( { return시킬값} ) )

 Flux<String> flux = Flux.generate(
       () -> { // Callable<S> stateSupplier
           return 0;
       },
       (state, sink) -> { // BiFunction<S, SynchronousSink<T>, S> generator
           sink.next("3 x " + state + " = " + 3 * state);
           if (state == 10) {
               sink.complete();
           }
           return state + 1;
       });

그래서 이런식으로 Flux를 만들면, flux가 subscribe 되어서 request(1)요청을 보낼 때마다 synchronous의 sink.next("값") 이 불려서 "값"이 emit된다. 그러고 저 람다식 (화살표식)에서 return된 state값은 다음 request(1)일때 state로 쓰인다.

근데 이 경우(generate로 만들어진 SynchronousSink를 사용하는 Flux의 경우), sink.next()를 두 번 이상 부를 수 없다. 우리는 이를 pull 방식이라고 한다. 우리가 요청할 때마다 값을 가져오기 때문이다. publisher가 요청에 관계없이 값을 emit해대는 것을 push 방식이라고 할 수 있다.

Flux.create는 FluxSink를 사용한다.

Flux<Integer> flux = Flux.create( (FluxSink<Integer> sink) -> {

    sink.onRequest(request -> { //request가 request(10)하면 10, request(5) 하면 5

        for (int i = 1; i <= request; i++) {

            sink.next(i); // Flux.generate()의 경우와 달리 한 번에 한 개 이상의 next() 신호 발생 가능

        }

    });

});

파라미터가 Consumer<? super FluxSink >로, FluxSink를 받기 때문에 해당 메서드의 onRequest() 메서드로 subscriber가 데이터를 요청했을 때 처리해줄 수 있다.

사실 push처럼 처리하려면, publisher와 관계없이 미리 sink가 sink.next()를 찍어내고 있으면 된다.

sink.onRequest( request -> for (int i=0;i<100;i++) sink.next(i));

라고 해보자, 그럼 그냥 request 하나에도 100개씩 데이터가 쌓여있는 셈인데, Flux.create()로 생성한 Flux는 초과로 발생한 데이터를 버퍼에 저장한다.
사실 옵션으로 IGNORE, ERROR, DROP , LATEST, BUFFER 방식으로 처리할 수 있다. 두번째 인자로 이들을 전달하면 된다.

IGNORE : Subscriber의 요청 무시하고 발생(Subscriber의 큐가 다 차면 IllegalStateException 발생)
ERROR : 익셉션(IllegalStateException) 발생
DROP : Subscriber가 데이터를 받을 준비가 안 되어 있으면 데이터 발생 누락
LATEST : 마지막 신호만 Subscriber에 전달
BUFFER : 버퍼에 저장했다가 Subscriber 요청시 전달. 버퍼 제한이 없으므로 OutOfMemoryError 발생 가능
//https://javacan.tistory.com/entry/Reactor-Start-3-RS-create-stream

Flux seq= Flux.fromStream(java8 Stream)
이나,
Fluxseq = Flux.fromIterable(java Iterable)
을 통해 Flux를 생성할 수 있다.

5.Flux와 Mono 활용

flux.
map: 각 항목에 대해 어떤 람다함수 적용
flatMap: 각 항목에 대해 Flux 생성 ex.)
Flux.just(1,2,3).flatMap(i->Flux.range(1,i))
1 12 123 생성
filter: true인 item만 다음으로 전달
defaultIfEmpty: 빈 시퀀스인경우 특정값을 기본값으로 설정. (Mono와 Flux 둘다)
switchIfEmpty: 시퀀스가 끝나서 다른 시퀀스를 사용하고 싶다면
startWith: 시퀀스가 특정 값부터 시작하도록한다.
concatWithValues: 시퀀스를 특정 값으로 끝나게 만든다. (가장 마지막 값을 특정한다)

concatWith: 여러 flux를 이어서 하나의 flux로
mergeWith: 시퀀스가 발생하는 순서대로 나왔으면 좋겠다면?

Flux<String> tick1 = Flux.interval(Duration.ofSeconds(1)).map(tick -> tick + "초틱");
Flux<String> tick2 = Flux.interval(Duration.ofMillis(700)).map(tick -> tick + "밀리초틱");
tick1.mergeWith(tick2).subscribe(System.out::println);
//https://javacan.tistory.com/entry/Reactor-Start-4-tbasic-ransformation

zipWith: 두 시퀀스 값이 묶여서 나온다. 즉 신호 느리게 나오는 애에 맞춰진다.
combineLatest: 가장 최근의 두쌍을 묶어서 데이터가 무조건 두개씩 나오게 한다. 즉 10초마다 한번씩 값을 낸다면 1초짜리 flux는 해당 값을 10번은 재사용하게 된다.

take,takelast: 처음 몇개의 data만 유지한다.

Flux<Integer> seq2 = someSeq.take(Duration.ofSeconds(10)); // 최초 10초 동안 데이터 유지 //https://javacan.tistory.com/entry/Reactor-Start-4-tbasic-ransformation
takeWhile()과 Predicate(참/거짓 람다)을 인자로 받아서 true를 리턴하는 동안 데이터를 포함한다.
takeUntil()은 처음 true를 리턴할 때까지 데이터를 내보낸다.

skip, skipLast: 처음 n개 데이터, 마지막 n개 데이터를 거른다.
skipWhile(), skipUntil()이 있다.

0개의 댓글