스프링 리액터 시작하기 (3) flux,mono

brian Byeon·2022년 4월 30일
0

0. 자료의 출처

https://projectreactor.io/docs/core/release/reference/#getting-started-introducing-reactor

오해가 없도록 어떤 text를 인용했는지 원문을 첨부합니다.🤗

1. Introduction to Reactive Programming

Reactor is an implementation of the Reactive Programming paradigm, which can be summed up as follows:

Reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. This means that it becomes possible to express static (e.g. arrays) or dynamic (e.g. event emitters) data streams with ease via the employed programming language(s).
— https://en.wikipedia.org/wiki/Reactive_programming

Reactor는 reactive programming을 위한 구현체인 것이다.
Data stream들 (이벤트들의 연속이라던지 static한 배열, 리스트)과 변화에 맞추어 진행되는 비동기적 프로래밍을 위함이다.

Java에서는 RxJava를 통해 JVM에서의 reactive programming이 구현되었다. 과거의 명령형 프로그래밍 패턴에서는 for문과 같이 iterator가 쓰였지만 이제는 Publisher-Subscriber 관계가 쓰일 것이다. Publisher는 구독자에게 새 값이 도착했다고 알릴 것이며, 개발자는 이 값을 어떻게 쓸 것이라고 계산 로직을 부여해 놓기만 하면된다.

1.1 publisher, subscriber

Using an iterator is an imperative programming pattern, even though the method of accessing values is solely the responsibility of the Iterable. Indeed, it is up to the developer to choose when to access the next() item in the sequence. In reactive streams, the equivalent of the above pair is Publisher-Subscriber. But it is the Publisher that notifies the Subscriber of newly available values as they come, and this push aspect is the key to being reactive. Also, operations applied to pushed values are expressed declaratively rather than imperatively: The programmer expresses the logic of the computation rather than describing its exact control flow.

In addition to pushing values, the error-handling and completion aspects are also covered in a well defined manner. A Publisher can push new values to its Subscriber (by calling onNext) but can also signal an error (by calling onError) or completion (by calling onComplete). Both errors and completion terminate the sequence. This can be summed up as follows:

Publisher는 Subscriber의 onNext 함수를 불러서 다음 값을 push해줄 수 있으며, Subscriber의 onError를 불러서 에러를, onComplete를 불러서 완료됨을 알릴 수 있다. 그래서 보통 흐름은 이렇게 된다.
onNext x 0..N [onError | onComplete]

onNext시에 각 값마다 어떤 것을 수행해야 할지, error가 나서 스트림이 멈추거나 완료되서 스트림이 끝나면 어떤 동작을 수행해야 할지를 작성한다.

1.2 명령형 프로그래밍에서 Reactive Programming으로

Reactive libraries, such as Reactor, aim to address these drawbacks of “classic” asynchronous approaches on the JVM while also focusing on a few additional aspects:

Composability and readability

Data as a flow manipulated with a rich vocabulary of operators

Nothing happens until you subscribe

Backpressure or the ability for the consumer to signal the producer that the rate of emission is too high

High level but high value abstraction that is concurrency-agnostic

확실히 reactive 프로그래밍은 기존의 명령형으로 비동기 처리를 할때보다

  • readability,

  • subscribe전까지 아무일도 일어나지 않는다는 것

  • BackPressure ( 소비자가 producer의 속도를 조절한다. 기존 동기에선 역전 )

    이런 장점들이 존재 한다.

    리액터는 컨베이어벨트를 움직여가는 것처럼 프로그래밍한다고 생각해야 한다. 처음에 Publisher에게서 나와서, 우리가 막 조립을 하고, 닦고 조이고 기름치고 내보내면 Subscriber에게 도착하는 느낌이다.

 In Reactor, when you write a Publisher chain, data does not start pumping into it by default. Instead, you create an abstract description of your asynchronous process (which can help with reusability and composition).

By the act of subscribing, you tie the Publisher to a Subscriber, which triggers the flow of data in the whole chain. This is achieved internally by a single request signal from the Subscriber that is propagated upstream, all the way back to the source Publisher.

3.3.5. Backpressure
Propagating signals upstream is also used to implement backpressure, which we described in the assembly line analogy as a feedback signal sent up the line when a workstation processes more slowly than an upstream workstation.

The real mechanism defined by the Reactive Streams specification is pretty close to the analogy: A subscriber can work in unbounded mode and let the source push all the data at its fastest achievable rate or it can use the request mechanism to signal the source that it is ready to process at most n elements.

Intermediate operators can also change the request in-transit. Imagine a buffer operator that groups elements in batches of ten. If the subscriber requests one buffer, it is acceptable for the source to produce ten elements. Some operators also implement prefetching strategies, which avoid request(1) round-trips and is beneficial if producing the elements before they are requested is not too costly.

This transforms the push model into a push-pull hybrid, where the downstream can pull n elements from upstream if they are readily available. But if the elements are not ready, they get pushed by the upstream whenever they are produced.


Subscriber이 없으면 그냥 이런 로직을 탄다 라는 함수를 만들어 놓은것에 불과. 함수 호출은 subscribing이 이뤄지는 것. Publisher를 SUbscriber에 묶는 것이다.
BackPressure를 제어할 수 있도록 Publisher가 너무 빨리 데이터를 만들지 않게 몇개의 데이터만 만들어서 buffer에 저장해 놓도록 할 수 있다.

2. Reactor Core Features

Reactor project에서는 reactor-core 라이브러리를 활용한다. JAVA 8 부터 사용가능하다.Publisher를 우리는 Flux와 Mono를 이용해 구현한다.

Flux object는 0~N개의 아이템이 만들어내는 reactive sequence를 제공하고, Mono object는 한개나 없는 (single-value-or-empty) reactive sequence를 제공한다.

왜 이런식으로 만들었을까?

This distinction carries a bit of semantic information into the type, indicating the rough cardinality of the asynchronous processing. For instance, an HTTP request produces only one response, so there is not much sense in doing a count operation. Expressing the result of such an HTTP call as a Mono<HttpResponse> thus makes more sense than expressing it as a Flux<HttpResponse>, as it offers only operators that are relevant to a context of zero items or one item.

Operators that change the maximum cardinality of the processing also switch to the relevant type. For instance, the count operator exists in Flux, but it returns a Mono<Long>.

예를들면, 하나의 HTTP 요청을 보냈을 때, 하나의 응답이 오는 것이 당연하다. 그럼 이런 응답을 받아서 count 요청을 처리하는 건 그냥 당연히 1이 나오니까 의미가 없다. 그런 하나의 결과만 나오는 결과들은 Mono 와 같이 처리해서 Flux 타입으로 처리하는 경우보다 직관적이게 만들 수 있다.
Flux 요청에 대해 .count() 하면 Mono 을 주는 이런느낌.

3. Flux: 0~N 개 아이템에 대한 비동기 sequence

Flux 는 표준 Publisher 형태로, 0~N개의 item을 내뱉는 비동기적 sequence이고, 3개의 시그널을 변환해서 subscriber의 3개 함수에 매핑시킨다.

subscriber의

  • onNext() : 다음 sequence가 존재할 때 계속 하나씩 받으라는 시그널

    • onNext( value -> Sys.out.println(value) ) 이런식
  • onComplete() : 이 Sequence가 끝났다고 Flux가 알려주는 시그널

  • onError() : 이 sequence를 처리하다 error가 났다는 것을 알린다.

    모든 event는 처리해도 되고 처리 안해도 된다. onComplete는 empty finite sequence를 뜻하지만 (종료되서 더이상 할게 없는) onComplete를 처리 안하면 infinite empty sequence가 된다.

    4. Mono: Asynchronous 0-1 Result


    Mono도 Publisher인데 최대 하나의 item만 onNext 에 내보내는 거다. 하나의 onComplete, onError 시그널을 보내게 된다.
    일반적으로는 Mono.onNext() 부르자마자 onComplete부르는게 맞다. (Mono.never() 은 아무 시그널도 내보내지 않는 Mono이다)

    Mono는 Flux에서 제공되는 operator들의 일부만 사용할 수 있는데 일부 operator는 flux 형태로 리턴이 되기도 한다.
    Mono#concatWith(Publisher) 하면 Publisher(flux일수도 있고, Mono일 수도 있는)와 결합되어서 Flux를 리턴한다.
    Mono#then(Mono)는 Mono를 리턴한다.

    Mono를 이용하면 그냥 아무 값도 안내보내지만 async 프로세스가 끝났다는 것을 알 수 있는 시그널을 보내는 용도로 쓸 수 있다.

    5. Mono와 Flux 만들고 subscribe하기

    Sequence of String을 만드려면 collection에 넣어서 만들거나, enumerate하면 된다.

    Flux<String> seq1 = Flux.just("foo", "bar", "foobar");
    List<String> iterable = Arrays.asList("foo", "bar", "foobar");
    Flux<String> seq2 = Flux.fromIterable(iterable);

    just는 임의로 초기값을 만들어서 Flux나 mono를 만들때 사용한다.

    Mono<String> noData = Mono.empty(); 
    Mono<String> data = Mono.just("foo");
    Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3); 

    Mono.empty();를 했는데 Mono에 담을 수 있다. factory method는 generic type (여기서 < > 안의 부분)을 따라간다.
    range함수는 첫 인자가 시작점이고, 3은 몇개의 아이템을 produce할 것인지 이다.

    5.1 Subscribe

    lambda를 사용해 subscribe를 진행한다.

  1.  subscribe(); 
  2. subscribe(Consumer<? super T> consumer);
3. subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer); 

4. subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer); 
5. subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer,
          Consumer<? super Subscription> subscriptionConsumer); 

1: Subscribe and trigger the Sequence : 퍼블리셔에게 등록을 해준지점. 이제 publisher가 작동할 것이다. (컨베이어 벨트 시작)
2. 각 produced value에 대해서 어떤 행동을 한다
3. 각 item별 행동 + error시 어떻게 한다
4. 각 item별 행동+ error시 행동 + 완료되었을때 행동
5. 각 item별 행동+ error시 행동 + 완료되었을때 행동 + 이 subscription 자체에 대해서도 뭔가 한다.

이런 함수들은 subscription에 대한 reference를 주어서, 도중에 subscription을 취소할 수 있다. cancel하면 publisher는 stop producing values하고 만들어진 resource들을 clean up 한다. Disposable interface 에 의해 cancel-clean-up이 구현되어 있다.

5.2.1 Subscribe Example

간단한 subscribe()를 해보자

Flux<Integer> ints = Flux.range(1,3);
ints.subscribe(); 

publisher가 작동하는 그냥 가장 간단한 방법. subscribe()가 붙어서 1, 2, 3을 flux가 내보낸다. (produce 3 values) Lambda함수를 제공하면 value가 visible하다.

Flux<Integer> ints = Flux.range(1,3);
ints.subscribe(i->System.out.println(i));

결과값:

1
2
3

이제 error시 어떻게 한다를 추가해보자.

Flux<Integer> ints = Flux.range(1,4)
	.map(i -> {
       if ( i<=3 ) return i;
       throw new RuntimeException("error on 4"); 
       });
ints.subscribe(i -> Sys.out.println(i), 
       error -> Sys.err.println("Error: " + error));
1
2
3
Error: java.lang.RuntimeException: error on 4

이제 완료되었을 때의 행동을 추가해보자.

Flux<Integer> ints = Flux.range(1,4)
	.map(i -> {
       if ( i<=3 ) return i;
       throw new RuntimeException("error on 4"); 
       });
ints.subscribe(i -> Sys.out.println(i), 
       error -> Sys.err.println("Error: " + error),
       () -> System.oput.println("DONE") );

3번째 인자는 completion signal이다. Error와 completion signal은 terminal (종료하는) event로, 하나와 다른 하나는 exclusive하다. (서로 동시에 발생할 수 없다)

completion callback은 input이 없다. Runnable interface의 run method를 실행시킨다 (내부 구현이 이렇게 구성되어 있다).

1
2
3
4
DONE

이렇게 결과가 나온다

이제 5. 각 item별 행동+ error시 행동 + 완료되었을때 행동 + 이 subscription 자체에 대해서도 뭔가 한다.

Consumer <subscription>

이 형태의 인자를 받는다. 이 subscription에 대해 request(long) 이나 cancel()을 하는 형태로 인자를 주지 않으면 Flux는 hang한다. (멈춰버린다)

Flux<Integer> ints = Flux.range(1, 4);
ints.subscribe(i -> System.out.println(i),
    error -> System.err.println("Error " + error),
    () -> System.out.println("Done"),
    sub -> sub.request(10)); 

이렇게 하면 subscribe하는 동시에 Subscription이 생기는 셈인데, sub.request로 publisher에게 10개의 element씩만 받겠다고 말하는 셈이다. 하지만 실제로는 4개만 emit하고 끝나게 된다. (When we subscribe we receive a Subscription. Signal that we want up to 10 elements from the source (which will actually emit 4 elements and complete).

5.2 subscribe()를 취소하기

ints.subscribe( ) 의 return value는 "Disposable" 타입이다. 이 return type의 dispose() 메서드를 사용해서 subscription이 취소되었음을 알릴 수 있다.

Flux나 Mono에게는 publisher가 그만 element를 produce (emit) 하도록 한다. 근데 immediate하다고 할 수는 없다. 우리가 cancel을 보낸 시점이 이미 publisher의 모든 element를 emit한 뒤일 수도 있다.

Some utilities around Disposable are available in the Disposables class. Among these, Disposables.swap() creates a Disposable wrapper that lets you atomically cancel and replace a concrete Disposable. This can be useful, for instance, in a UI scenario where you want to cancel a request and replace it with a new one whenever the user clicks on a button. Disposing the wrapper itself closes it. Doing so disposes the current concrete value and all future attempted replacements.

disposable.swap과 disposable.composite가 Disposable 타입중에는 잘 쓰이는 메서드이다.

swap은 현재의 disposable을 취소시키고 새로운 disposable로 대체시킨다. UI 중에 버튼을 눌렀을때 현재의 요청을 취소하고 지금의 새 값으로 새로운 요청을 보내야 한다면, 기존 subscription을 취소하고 새 subscription을 등록해 disposable을 받아와야 하는 것이 아니라, swap을 통해 disposable을 다른 것으로만 바꿔주면 항상 disposable 값은 최신의 요청을 대상으로 할 것이다.

Disposable.composite는 여러개의 disposable을 한번에 cancle(dispose) 할 수 있게 한다. 여러개를 묶어낸다는 느낌이다.

0개의 댓글