Kevin의 알기 쉬운 Spring Reactive Web Applications: Reactor 1부

이진호·2023년 7월 1일
0

logs

목록 보기
4/4
post-thumbnail

Spring Reactive Web Applications 개요

리액티브 시스템과 리액티브 프로그래밍의 의미

리액티브 시스템(Reactive System)이란?

  • 리액티브 선언문
  • 리액티브 시스템의 특징
    • 높은 응답성
    • 탄력성 유연성
    • 메시지 기반

리액티브 프로그래밍(Reactive Programming)이란?

리액티브 스트림즈(Reactive Streams)란?

  • 리액티브 프로그래밍을 표준화 한 명세
  • Reactive Streams를 구현한 구현체
    • RxJava
    • Java 9 Flow API
    • Akka Streams
    • Reactor
    • 그 외 RxJs, RxScala, RxAndroid 등 리액티브 확장

Blocking I/O 방식과 Non-Blocking I/O방식 이해하기(1)

Blocking I/O 방식

  • 작업 쓰레드가 종료될때까지 요청을 한 쓰레드는 차단된다.
  • 이러한 단점을 보완하기 위해 멀티 쓰레딩 기법을 통해 추가 쓰레드를 할당 할 수 있다.
  • CPU 대비 많은 수의 쓰레드를 사용하는 애플리케이션은 비효율적이다.
    • 컨텍스트 스위칭Context Switching)으로 인한 쓰레드 전환 비용 발생
    • 메모리 사용에 있어서 오버 헤드 발생
    • 쓰레드 풀에서의 응답 지연 문제 발생

Non-Blocking I/O 방식

  • 작업 쓰레드의 종료와 상관없이 요청을 한 쓰레드는 차단되지 않는다.
  • 적은 수의 쓰레드를 사용하기 때문에 쓰레드 전환 비용이 적게 발생한다.
  • 따라서 CPU 대기 시간 및 사용량에 있어서도 효율적이다.
  • 그러나, CPU를 많이 사용하는 작업이 포함 되어 있을 경우에는 성능 향상에 악영향을 준다.
  • 또한, 사용자 요청에서 응답까지 과정에 Blocking I/O 요소가 포함되어 있을 경우 Non-Blocking의 이점을 발휘하기 힘들다.

Blocking I/O 방식과 Non-Blocking I/O방식 이해하기(2)

Spring MVC vs Spring WebFlux

  • Spring MVC
    • Blocking I/O 방식이다.
    • 요청 당 하나의 Thread를 사용한다.
    • 과도한 Thread 사용으로 인한 CPU 대기 시간이 늘어나고 메모리 사용에서 오버헤드가 발생한다.
    • 학습 시간이 상대적으로 짧다.
    • 10년 이상 주도적으로 사용된 명령형 프로그래밍 기반 기술이기 때문에 숙련된 개발자를 찾기가 쉽다.
    • 따라서 기업에서도 신규 프로젝트를 안정적으로 진행할 수 있는 가능성이 높다.
  • Spring WebFlux
    • Non-Blocking I/O 방식이다.
    • 하나의 Thread로 대량의 요청을 처리한다.
    • 적은 수 Thread를 사용하므로 CPU와 메모리를 효율적으로 사용한다.
    • CPU를 많이 사용하는 복잡한 계산을 요청할 경우 다른 요청들을 처리할 수 없다.
    • 요청에서 응답까지 Fully Non-Blocking이어야 진정한 효과를 발휘한다.
    • 학습 난이도가 비교적 높은편이다.
    • 선언형 방식의 Non-Blocking 프로그래밍 지식을 갖춘 숙련된 개발자를 찾는것이 어렵다.
    • 따라서 기업이나 개발 인력면에서나 기술적인 측면에서 위험 부담을 감수해야할 가능성이 높다.

Spring WebFlux 개요

Spring WebFlux란?

  • Spring 5부터 지원하는 리액티브 웹 프레임워크이다.
  • 비동기 Non-Blocking I/O 방식으로 적은 수의 쓰레드를 사용한다.
  • Reactive Streams의 구현체 중에 하나인 Reactor에 의존하여 비동기 로직을 구성하고 리액티브 스트림을 제공한다.
  • Reactor 기반이지만 RxJava 등 다른 리액티브 확장 라이브러리를 쉽게 적용할 수 있다.
  • Spring WebFlux 자체의 학습 비용보다 Reactor의 학습 비용이 더 많이 든다.

Spring MVC와 Spring WebFlux의 기술 스택 비교

Spring MVC와 Spring WebFlux의 밴다이어그램

Spring WebFlux의 Non-Blocking 프로세스

Spring WebFlux를 사용하기 적합한 시스템

  • Blocking I/O 방식으로 처리하는데 한계가 있는 대량의 요청 트래픽이 발생하는 시스템
  • 마이크로서비스 기반 시스템
  • 스트리밍 시스템 또는 실시간 시스템
  • 네트워크 접속이 느린 클라이언트의 요청 처리

Project Reactor 개요

리액터(Reactor)란?

리액터(Reactor)란?

  • 리액티브 프로그래밍을 위한 리액티브 라이브러리이다.
  • Reactive Streams 스펙을 구현한 구현체 중 하나이다.
  • Spring 에코 시스템에서 Reactive Stack의 기반이 되며 Spring WebFlux 프레임워크에 포함이 되어 있다.

리액터의 구성 요소 및 용어 정리

Reactor 용어 정의

  • Publisher: 발행자, 게시자, 생산자, 방출자(Emitter)
  • Subscriber: 구독자, 소비자
  • Emit: Publisher가 데이터를 내보내는 것(방출하다. 내보내다. 통지하다.)
  • Sequence: Publisher가 emit하는 데이터의 연속적인 흐름을 정의 해 놓은 것. Operator 체인형태로 정의 된다.
  • Subscribe: Subscriber가 Sequence를 구독하는 것
  • Dispose: Subscriber가 Sequence 구독을 해지 하는 것

마블 다이어그램으로 이해하는 Publisher

마블 다이어그램으로 Flux와 Mono 이해하기

Mono(an Asynchronous 0-1 Result)

  • 0개 또는 1개의 데이터를 emit하는 Publisher이다.(Compare with RxJava Maybe)
  • 데이터 emit 과정에서 에러가 발생하면 onError signal을 emit한다.

Flux(an Asynchronous Sequence of 0-N Items)

  • 0개 ~ N개의 데이터를 emit하는 Publisher이다.
  • 데이터 emit 과정에서 에러가 발생하면 onError signal을 emit한다.

Cold Sequence와 Hot Sequence

Cold Sequence & Hot Sequence 이해하기

Cold Sequence

Hot Sequence

Backpressure

Backpressure 이해하기

Publisher와 Subscriber간의 프로세스

Backpressure란?

  • Publisher에서 emit되는 데이터를 Subscriber쪽에서 안정적으로 처리하기 위한 제어 기능

Reactor에서의 Backpressure 처리 방법

  • 요청 데이터의 개수를 제어하는 방법
    • Subscriber가 적절히 처리할 수 있는 수준의 데이터 개수를 Publisher에게 요청.
  • Backpressure 전략을 사용하는 방법
    • Reactor에서 제공하는 Backpressure 전략을 사용

Backpressure 전략

  • IGNORE 전략: Backpressure를 적용하지 않는다.

  • ERROR 전략: Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, Exception을 발생시키는 전략

  • DROP 전략: Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기하는 먼저 emit 된 데이터부터 Drop 시키는 전략

  • LATEST 전략: Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기하는 가장 최근에(나중에) emit된 데이터부터 버퍼에 채우는 전략.

  • BUFFER 전략: Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 안에 있는 데이터를 Drop 시키는 전략


Sinks

Sinks 이해하기

Sinks란?

  • React Streams에서 발생하는 signal을 프로그래밍적으로 push할 수 있는 기능을 가지고 있는 Publisher의 일종이다.
  • Thread-Safe 하지 않을 수 있는 Processor보다 더 나은 대안이 된다.
  • Sinks는 Thread-Safe하게 signal을 발생 시킨다.
  • Sinks는 Sinks.Many 또는 Sinks.One interface를 사용해서 Thread-Safe하게 signal을 발생 시킨다.

Schedulers

Scheduler를 위한 전용 Operator

  • publishOn(): Operator 체인에서 Downstream Operator의 실행을 위한 쓰레드를 지정한다.
  • subscribeOn(): 최상위 Upstream Publisher의 실행을 위한 쓰레드를 지정한다. 즉, 원본 데이터 소스를 emit 하기 위한 스케줄러를 지정한다.
  • parallel(): Downstream에 대한 데이터 처리를 병렬로 분할 처리하기 위한 쓰레드를 지정한다.

ParallelFlux의 동작 방식

publishOn()과 subscribeOn()의 동작 이해

  • Operator 체인에서 최초의 쓰레드는 subscribe()가 호출되는 scope에 있는 쓰레드이다.

  • Operator 체인에서 publishOn()이 호출되면 publishOn() 호출 이후의 Operator 체인은 다음 publishOn()을 만나기전까지 publishOn()에서 지정한 Thread에서 실행이 된다.

  • subscribeOn()은 최상위 Upstream Publisher의 실행 쓰레드를 subscribe() 호출 scope의 쓰레드에서 subscribeOn()dㅔ서 지정한 쓰레드로 바꾼다.

  • subscribeOn( )과 publishOn( )이 같이 있다면, publishOn( )을 만나기 전 까지의 Upstream Operator 체인은 subscribeOn( )에서 지정한 쓰레드에서 실행되고, publishOn( )을 만날때마다 publishOn( ) 아래의 Operator 체인 downstream은 publishOn( )에서 지정한 쓰레드에서 실행된다.

  • subscribeOn( )이 publishOn() 뒤에 위치하든 상관없이 publishOn( )을 만나기 전 까지의 Upstream Operator 체인은 subscribeOn( )에서 지정한 쓰레드에서 실행된다

Scheduler의 종류

  • Schedulers 클래스의 정적 메서드로 제공 됨

Scheulers.immeediate()

  • 별도의 쓰레드를 추가 할당하지 않고, 현재 쓰레드에서 실행된다.

Schedulers.single()

  • 하나의 쓰레드를 재사용한다.
  • 저지연(low latency) 일회성 실행에 최적화 되어 있다.

Schedulers.boundedElastic()

  • 쓰레드 풀을 생성하여 생성된 쓰레드를 재사용한다.
  • 생성할 수 있는 쓰레드 수에는 제한이 있다.(Default, CPU 코어 수 X 10)
  • 긴 실행 시간을 가질 수 있는 Blocking I/O 작업에 최적화 되어 있따.

Schedulers.parallel()

  • 여러 개의 쓰레드를 할당해서 동시에 작업을 수행할 수 있다.
  • Non-Blocking I/O 작업에 최적화 되어 있다.

Schedulers.fromExecutorService()

  • 기존의 ExecutorService를 사용하여 쓰레드를 생성한다.
  • 의미있는 식별자를 제공하기 때문에 Metric에서 주로 사용된다.

Schedulers.newXXXX()

  • 다양한 유형의 새로운 Scheduler를 생성할 수 있다.(newSingle(), newParallel(), newBoundedElastic())
  • Scheduler의 이름을 직접 지정할 수 있다.

Context

Context란?

  • Reactor Sequence 상에서 상태를 저장할 수 있고, 저장된 상태 값을 Operator 체인에서 공유해서 사용할 수 있는 인터페이스이다.
  • Context에 저장할 상태 값은 key, value 형태로 저장이 된다.
  • Context에 값을 저장하기 위해서는 contextWrite()을 사용한다.
  • Context에서 값을 읽어오기 위해서는 읽기 전용 뷰인 ContextView를 사용한다.
  • ContextView는 Reactor Sequence에서 deferContextual() 또는 transfromDeferredContextual()을 통해서 제공된다.

자주 사용되는 Context API

자주 사용되는 Context API

  • put(key, value) : key/value 형태로 Context에 값을 쓴다.
  • Context.of(key1, value1, key2, value2, ...) : key/value 형태로 Context에 여러개의 값을 쓴다.
  • putAll(ContextView) : 파라미터로 입력된 ContextView를 merge 한다.
  • delete(key) : Context에서 key에 해당하는 value를 삭제한다.

자주 사용되는 ContextView API

  • get(key) : ContextView에서 key에 해당하는 value를 반환한다.
  • getOrEmpty(key) : ContextView에서 key에 해당하는 value를 Optional로 래핑해서 반환한다.
  • getOrDefault(key, default value) : ContextView에서 key에 해당하는 value를 가져온다. key에 해당하는 value가 없으면 default value를 가져온다.
  • hasKey(key) : ContextView에서 특정 key가 존재하는지를 확인한다.
  • isEmpty() : Context가 비어있는지 확인한다.
  • size() : Context내에 있는 key/value의 개수를 반환한다.

Context의 특징

  • Context는 각각의 Subscriber를 통해 Reactor sequence에 연결 되며 체인에서 각각의 Operator들이 실행 쓰레드가 달라도 연결된 Context에 접근할 수 있다.
  • Context는 체인의 맨 아래에서부터 위로 전파된다.
    • Context는 Downstream에서 Upstream으로 전파 된다.
    • Operator 체인에서 Context read 메서드가 Context write 메서드 밑에 있을 경우에는 write된 값을 read 할 수 없다.
    • 따라서 일반적으로 Context에 write 할 때에는 Operator 체인의 마지막에 둔다.
  • 동일한 키에 대해서 write 할 경우, 값을 덮어쓴다.
  • 메인 Operator 내부에서 Sequence를 생성하는 flatMap() 같은 Operator내에서 write 된 Context의 값은 Inner Sequence 내부에서만 유효하고, 외부 Operator 체인에서는 보이지 않는다.

Debugging

Debug 모드를 사용한 Debugging

Reactor에서의 Debugging 방법

  • Debug 모드를 활성화 하는 방법(Globally).
  • checkpoint() Operator를 사용하는 방법(Locally).
  • log() operator를 사용해서 Reactor Sequence에서 발생하는 signal을 확인하는 방법

Debugging시 알아두면 좋은 몇가지 용어

  • stacktrace: 호출된 메서드에 대한 Strac Frame에 대한 리포트
  • assembly: 새로운 Flux가 선언된 지점
  • traceback: 실패한 operator의 stacktrace를 캡처한 정보

Debug 모드를 사용한 Debugging

  • Debug 모드 시, Operator의 stactrace capturing을 통해 디버깅에 필요한 정보를 측정한다.
  • Hooks.onOperatorDebug()를 통해서 Debug 모드를 활성화 할 수 있다.
  • Hooks.onOperatorDebug()는 Operator 체인이 선언되기 전에 수행되어야 한다.
  • Debug 모드를 활성화 하면 Operator 체인에서 에러 발생 시, 에러가 발생한 Operator의 위치를 알려준다.
  • 사용이 쉽지만 애플리케이션 내 모든 Operator의 assembly(New Flux or Mono)를 캡처하기 때문에 비용이 많이 든다.

checkpoint를 사용한 Debugging

checkpoint() Operator를 사용하는 방법

  • 특정 Operator 체인 내에서만 assembly stacktrace를 캡쳐한다.
  • checkpoint(description)를 사용하면 에러 발생 시, checkpoint(description)를 추가한 지점의 assembly stacktrace를 생략하고 description을 통해 에러 발생 지점을 예상할 수 있다.
  • checkpoint(description, true) = checkpoint() + checkpoint("description")
    • 에러 발생 시, 고유한 식별자 등의 description과 assembly stacktrace(traceback)를 모두 출력한다.

log Operator를 사용한 Debugging

log() operator 사용

  • Flux 또는 Mono에서 발생하는 signal event를 출력해준다(onNext, onError, onComplete, subscriptions, cancellations, requets)
  • 여러개의 log()를 사용할 수 있으며, Operator 마다 전파되는 signal event를 확인할 수 있다.
  • Custom Category를 입력해서 Operator 마다 출력되는 signal event를 구분할 수 있다.
  • 에러 발생 시, Stacktrace도 출력해준다.
  • debug mode 라면 traceback도 출력해준다.

Testing

StepVerifier를 이용한 Testing

  • Flux나 Mono로 선언된 Operator 체인을 구독 시, 동작 방식을 테스트하기 위한 가장 일반적인 테스트 방식
  • expectXXXX()를 이용해서 Sequence 상에서 예상되는 Signal의 기대값을 assertion할 수 있다.
  • verify() Operator를 호출함으로써 전체 Operator 체인의 테스트를 트리거 한다.
  • verifyXXXX()를 이용해서 전체 Operator 체인의 테스트를 트리거 + 종료 또는 에러 이벤트 검증을 수행할 수 있다.
  • 실제 수행되는 시간과 관련된 테스트를 수행할 수 있다.
  • Backpressure 테스트
    • hasDropped(), hasDiscarded() 등을 이용해서 backpressure 테스트를 수행할 수 있다.
  • Context 테스트
    • expectAccesibleContext()를 이용해서 접근 가능한 Context가 있는지 테스트 할 수 있다.
    • hasKey()를 사용하여 Context의 key가 존재하는지 검증할 수 있따.
  • 기록된 데이터를 이용한 테스트
    • recordWith()를 사용하여 emit 된 데이터를 기록할 수 있다.
    • consumeRecordedWith()를 사용하여 기록된 데이터들을 소비하며 검증할 수 있다.
    • expectRecordedMatches()를 사용하여 기록된 데이터의 컬렉션을 검증할 수 있다.

TestPublisher와 PublisherProbe를 이용한 Testing

TestPublisher를 이용한 Testing

  • Testing 목적에 사용하기 위한 Publisher이다.
  • 개발자가 직접 프로그래밍을 통해 signal을 발생시킬 수 있다.
  • 주로 특정한 상황을 재현하여 테스트하고 싶은 경우 사용할 수 있다.
  • 리액티브 스트림즈 사양을 준수하는지의 여부를 테스트할 수 있다.

PublisherProbe를 이용한 Testing

  • Operator 체인의 실행 경로를 검증할 수 있다.
  • 주로 조건에 따른 분기로 인해서 Sequence가 분기 되는 경우, 실행 경로를 추적해서 정상적으로 실행이 되었는지 확인할 수 있다.
  • 해당 실행 경로대로 정상적으로 실행되었는지의 여부는 assertWasSubscribed(), assertWasRequested(), assertWasCancelled()를 통해 검증할 수 있다.

출처

0개의 댓글