Reactor Java 3. Mono와 Flux는 어떻게 동작하는가?

xellos·2022년 4월 23일
0

JAVA-Reactor

목록 보기
3/11

Reactor는 Reactive Streams 명세를 기반으로한 JVM에서 Reactive Non-Blocking 애플리케이션을 생성하기 위한 Java 라이브러리 입니다.
앞에서는 Mono와 Flux를 생성하는 방법과 그들이 가지고 있는 데이터를 변환하는 방법에 대하여 알아보았습니다. 이번에는 Mono와 Flux 두 클래스가 어떻게 동작하는지 살펴보겠습니다.

지연

정의상, 모든 스트림은 Lazy 합니다. 이 뜻은 사용자가 스트림을 소비하기 전까지 스트림은 어떠한 동작도 실행하지 않는다는 것을 말합니다. Mono와 Flux는 subscribe() 라는 메서드를 통하여 스트림을 소비할 수 있습니다.

이러한 접근방식의 최대 강점은 명령형 방식으로 변환을 수행하는 것보다 메모리를 적게 사용하고 모든 작업이 Thread-safe 하다는 것입니다.

public void fluxAreLazy() {
	Instant beginning = Instant.now();
    
    Flux<Integer> flux = Flux.range(1, 5)
    	.flatMap(n -> {
        	try { Thread.sleep(100); return Mono.just(n * n); }
            catch (InterruptedException e) { return Mono.error(e); }
        });
    System.out.println("After step1, program runs for: " +
    	Utils.timeDifference(beginning, Instant.now()));
        
    flux.subscribe(System.out::println);
    
    System.out.println("The whole test last: " + Utils.timePastFrom(beginning));
}
  • 결과
After step1, program runs for: 516ms
1
4
9
16
25
The whole test last: 1059ms

불변성

Flux와 Mono는 변경할 수 없습니다. 이는 그들중 어떤 것의 인스턴스도 전혀 수정할 수 없다는것을 의미합니다. 어떤 메서드를 호출하면 Flux 또는 Mono의 새 인스턴스가 반환됩니다.

Flux<Integer> flux = Flux.range(1, 5);
flux.flatMap(n -> {
	try { Thread.sleep(100); return Mono.just(n * n); }
    catch(InterruptedException e) { return Mono.error(e); }
});

flux.subscribe(System.out::println);

  • 결과: 아래의 결과는 최초로 선언된 flux가 변경되지 않았다는 것을 의미한다.
1
2
3
4
5

수정한 Flux를 사용하기 위해서는 아래와 같이 데이터를 가공한 Flux를 받는 변수를 선언해서 사용해야 한다.

Flux<Integer> flux2 = Flux.range(1, 5);
Flux<Integer> flux3 = flux2.flatMap(n -> {
	try { Thread.sleep(100); return Mono.just(n * n); }
    catch (InterruptedException e) { return Mono.error(e); }
});
flux3.subscribe(System.out::println);

  • 결과:
1
4
9
16
26

Flux는 무한할 수 있다.

여기서 무한하다는 것은 종료되지 않는것을 의미합니다.

//이를 증명하기 위해서는 시계와 같이 flux ticking을 생성합니다.
Flux.interval(Duration.ofMillis(100))
	.map(i -> "Tick : " + i)
    .subscribe(System.out::println);
//여기서 Flux는 절대로 값을 사출하지 않으며 종료되지 않습니다.

Infinite Flux는 멈출 수 있다.

위의 무한한 Flux는 종료될 수 있는데 그 사용방법은 아래와 같다.

Disposable disposable = Flux.interval(Duration.ofMillis(100))
	.map(i -> "Tick : " + i)
    .subscribe(System.out::println);
    
try { Thread.sleep(1000); }
catch (InterruptedException e) { e.printStackTrace(); }

disposable.dispose();
System.out.println("Stopped flux");

Flux는 순차적으로 실행된다.

Flux.range(1, 3)
	.flatMap(n -> {
    	System.out.println("In flatMap n = " + n + " --- Thread is : " + 
        	Thread.currentThread.getName());
            
        try {
        	Thread.sleep(100);
            System.out.println("After Thread.sleep n = " + n);
           	return Mono.just(n):
        } catch (InterruptedException e) {
        	return Mono.error(e);
        }
    })
    .map(n -> { System.out.println("In map n = " + n); return n; })
    .subscribe(System.out::println);
    

0개의 댓글