리액티브 프로그래밍

김종준·2023년 3월 6일
0

모던자바

목록 보기
13/15

리액티브 프로그래밍

리액티브 매니패스토

반응성

리액티브 시스템은 빠를 뿐 아니라 더 중요한 특징으로 일정하고 예상할 수 있는 반응 시간을 제공한다는 것이다.

그렇기에 사용자가 기대치를 가질 수 있다.

회복성

장애가 발생해도 시스템은 반응해야 한다.

컴포넌트 실행 복제, 여러 컴포넌트의 시간과 공간 분리, 각 컴포넌트가 비동기적으로 작업을 다른 컴포넌트에서 위임하는 등 리액티브 매니페스토는 회복성을 달성할 수 있는 다양한 기법을 제시한다.

탄력성

애플리케이션의 생명주기 동안 다양한 작업 부하를 받게 되는데 이 다양한 작업 부하로 애플리케이션의 반응성이 위협받을 수 있다.

리액티브 시스템에서는 무서운 작업 부하가 발생하면 자동으로 관련 컴포넌트에 할당된 자원 수를 늘린다.

메시지 주도

회복성과 탄력성을 지원하려면 약한 결합, 고립, 위치 투명성 등을 지원할 수 있도록 시스템을 구성하는 컴포넌트의 경계를 명확하게 정의해야 한다.

비동기 메시지를 전달해 컴포넌트끼리의 통신이 이루어진다.

이 덕분에 회복성과 탄력성을 얻을 수 있다.

애플리케이션 수준의 리액티브

애플리케이션 수준 컴포넌트의 리액티브 프로그래밍의 주요 기능은 비동기로 작업을 수행할 수 있다는 점이다.

리액티브 프레임워크와 라이브러리는 스레드를 퓨처, 액터, 일련의 콜백을 발생시키는 이벤트 루프 등과 공유하고 처리할 이벤트를 변환하고 관리한다.

스레드를 다시 쪼개는 종류의 기술을 이용할 때메인 이벤트 루프 안에서 절대 동작을 블록 하지 않아야 한다는 중요한 전제 조건이 항상 따른다.

시스템 수준의 리액티브

리액티브 시스템은 여러 애플리케이션이 한 개의 일관적인, 회복할 수 있는 플랫폼을 구성할 수 있게 해줄 뿐 아니라 이들 애플리케이션 중 하나가 실패해도 전체 시스템은 계속 운영될 수 있도록 도와주는 소프트웨어 아키텍처다.

리액티브 애플리케이션은 비교적 짧은 시간 동안만 유지되는 데이터 스트림에 기반한 연산을 수행하며 보통 이벤트 주도로 분류된다.

반면 리엑티브 시스템은 애플리케이션을 조립하고 상호 소통을 조절한다.

리액티브 시스템은 주요 속성으로 메시지 주도를 꼽을 수 있다.

메시지는 정의된 목적지 하나를 향하지만, 이벤트는 관련 이벤트를 관찰하도록 등록한 컴포넌트가 수신한다는 점이 다르다.

리액티브 시스템에서는 수신자와 발신자가 각각 수신 메시지, 발신 메시지와 결합하지 않도록 이들 메시지를 비동기로 처리해야 한다.

각 컴포넌트를 완전히 고립하려면 이들이 결합하지 않도록 해야 하며 그래야만 시스템이 장애와 높은 부하에서도 반응성을 유지할 수 있다.

좀 더 정확히 말해 리액티브 아키텍처에서는 컴포넌트에서 발생한 장애를 고립시킴으로 문제가 주변 다른 컴포넌트로 전파되면서 전체 시스템 장애로 이어지는 것을 막음으로 회복성을 제공한다.

이런 맥락에서 회복성은 결함 허용 능력과 같은 의미를 지닌다.

시스템에 장애가 발생했을 때 서서히 성능이 저하되는 것이 아니라 문제를 격리함으로 장애에서 완전히 복구되어 건강한 상태로 시스템이 돌아온다.

이런 에러 전파를 방지하고 이들을 메시지로 바꾸어 다른 컴포넌트로 보내는 등 감독자 역할을 수행함으로 이루어진다.

이렇게 컴포넌트 자체로 문제가 한정되고 외부로는 안전성을 보장하는 방식으로 문제를 관리할 수 있다.

고립과 비결합이 회복성의 핵심이라면 탄력성의 핵심은 위치 투명성이다.

위치 투명성은 리액티브 시스템의 모든 컴포넌트가 수신자의 위치에 상관없이 다른 모든 서비스와 통신할 수 있음을 의미한다.

위치 투명성 덕분에 시스템을 복제할 수 있으며 현재 작업 부하에 따라 애플리케이션을 확장할 수 있다.

위치를 따지지 않는 확장성은 리액티브 애플리케이션과 리액티브 시스템의 또 다른 차이를 보여준다.

리액티브 스트림과 Flow API

리액티브 프로그래밍은 리액티브 스트림을 사용하는 프로그래밍이다.

리액티브 스트림은 잠재적으로 무한의 비동기 데이터를 순서대로 그리고 볼록하지 않은 역압력을 전재해 처리하는 표준 기술이다.

역압력발행-구독 프로토콜에서 이벤트 스트림의 구독자가 발행자 이벤트를 제공하는 속도보다 느린 속도록 이벤트를 소비하면서 문제가 발생하지 않도록 보장하는 장치다.

이런 상황이 발생했을 때 부하가 발생한 컴포넌트가 완전 불능이 되거나 예기치 않는 방식으로 이벤트를 잃어버리는 등의 문제가 발생하지 않는다.

부하가 발생한 컴포넌트는 이벤트 발생 속도를 늦추라고 알리거나, 얼마나 많은 이벤트를 수신할 수 있는지 알리거나, 다른 데이터를 받기 전에 기존의 데이터를 처리하는 데 얼마나 시간이 걸리는지를 업스트림 발행자에게 알릴 수 있어야 한다.

스트림 처리의 비동기적 특성상 역압력 기능의 내장은 필수다.

실제 비동기 작업이 실행되는 동안 시스템에는 암묵적으로 블록 API로 인해 역압력이 제공되는 것이다.

하지만 비동기 작업을 실행하는 동안에는 그 작업이 완료될 때까지 다른 유용한 작업을 실행할 수 없으므로 기다리면서 많은 자원을 낭비하게 된다.

반면 비동기 API를 이용하면 하드웨어 사용률을 극대화할 수 있지만 다른 느린 다운스트림 컴포넌트에 너무 큰 부하를 줄 가능성도 생긴다.

따라서 이런 상황을 방지할 수 있도록 역 압력이나 제어 흐름 기법이 필요하다.

이들 기법은 데이터 수신자가 스레드를 볼록하지 않고도 데이터 수신자가 처리할 수 없을 만큼 많은 데이터를 받는 일을 방지하는 프로토콜을 제공한다.

Flow 클래스

이 클래스는 정적 컴포넌트 하나를 포함하고 있으며 인스턴스화 할 수 없다.

리액티브 스트림 프로젝트의 표준에 따라 프로그래밍 발행-구독 모델을 지원할 수 있도록 Flow 클래스는 중첩된 인터페이스 네 개를 포함한다.

  • Publisher
  • Subscriber
  • Subscription
  • Processor

Publisher가 항목을 발행하면 Subscriber가 한 개씩 또는 한 번에 여러 항목을 소비하는데 Subscription이 이 과정을 관리할 수 있도록 Flow 클래스는 관련 인터페이스와 정적 메서드를 제공한다.

Publisher는 수많은 일련의 이벤트를 제공할 수 있지만 Subscriber의 요구사항에 따라 역압력 기법에 의해 이벤트 제공 속도가 제한된다.

Publisher는 자바의 함수형 인터페이스로, Subscriber는 Publisher가 발행한 이벤트의 리스너로 자신을 등록할 수 있다.

Subscription은 Publisher와 Subscriber 사이의 제어 흐름, 역압력을 관리한다.

@FunctionalInterface
public interface Publisher<T> {
  void subscribe(Subscriber<? super T> s);
}

반면 Subscriber은 Publisher가 관련 이벤트를 발행할 때 호출할 수 있도록 콜백 메서드를 정의한다.

public interface Subscriber<T> {
  void onSubscribe(Subscription s);
  void onNext(T t);
  void onError(Throwable t);
  void onComplete();
}

이들 이벤트는 다음 프로토콜에서 정의한 순서로 지정된 메서드 호출을 통해 발행되어야 한다.

onSubscribe onNext* (onError || onComplete)

Subscriber가 Publisher에 자신을 등록할 때 Publisher는 처음으로 onSubscribe 메서드를 호출해 Subscription 객체를 전달한다.

Subscription은 첫 번째 메서드(request)로 Publisher에 주어진 개수의 이벤트를 처리할 준비가 되었음을 알릴 수 있다.

두 번째 메서드(cancel)로 Subscription을 취소, 즉 Publisher에 더 이상 이벤트를 받지 않음을 통지한다.

public interface Subscription {
  void request(long n);
  void cancel();
}

자바 9 Flow 명세서에는 이들 인터페이스 구현이 어떻게 서로 협력해야 하는지 설명하는 규칙 집합을 정의한다.

  • Publisehr는 반드시 Subscription의 request 메서드에 정의된 개수 이하의 요소만 Subscriber에 전달해야 한다.
    하지만 Publisher는 지정된 개수보다 적은 수의 요소를 onNext로 전달 할 수 있으며
    동작이 성공적으로 끝났으면 onComplete를 호출하고
    문제가 발생하면 onError를 호출해 Subscription을 종료할 수 있다.
  • Subscriber는 요소를 받아 처리할 수 있음을 Publisher에 알려야 한다.
    이런 방식으로 Subscriber는 Publisher에 압력을 행사할 수 있고 Subscriber가 관리할 수 없이 너무 많은 요소를 받는 일을 피할 수 있다.
    더욱이 onCompleteonError 신호를 처리하는 상황에서 Subscriber는 Publisher나 Subscription의 어떤 메서드도 호출할 수 없으며 Subscription이 취소되었다고 가정해야 한다.
    마지막으로 Subscriber는 Subscription.request() 메서드 호출이 없이도 언제든 종료 시그널을 받을 준비가 돼야 했으며 Subscription.cancel()이 호출된 이후에라도 한 개 이상의 onNext를 받을 준비가 되어있어야 한다.
  • Publisher와 Subscriber는 정확하게 Subscription을 공유해야 하며 각각이 고유한 역할을 수행해야 한다.
    그러면 onSubscribeonNext 메서드에서 Subscriber는 request 메서드를 동기적으로 호출할 수 있어야 한다.

자바 9 플로 API/리액티브 스트림 API에서는 Subscriber 인터페이스의 모든 메서드 구현이 Publisher를 볼록하지 않도록 강제하지만, 이들 메서드가 이들 메서드가 이벤트를 동기적으로 처리해야 하는지 아니면 비동기적으로 처리해야 하는지 지정하지 않는다.

하지만 이들 인터페이스에 정의된 모든 메서드 void를 반환하므로 온전히 비동기 방식으로 이들 메서드를 구현할 수 있다.

첫 번째 리액티브 애플리케이션 만들기

Flow 클래스에 정의된 인터페이스는 대부분 직접 구현하도록 의도된 것이 아니다.

그럼에도 자바 9 라이브러리는 이들 인터페이스를 구현하는 클래스를 제공하지 않는다.

자바 9 Flow 명세는 이들 라이브러리가 준수해야 할 규칙과 다양한 리액티브 라이브러리를 이용해 개발된 리액티브 애플리케이션이 서로 협동하고 소통할 수 있는 공용어를 제시한다.

우선 예시 애플리케이션을 살펴보자.

위에서부터 Domain, Subscription, Subscriber, Publisher를 구현한 코드들이다.

@Getter
public class TempInfo {

  public static final Random random = new Random();

  private final String town;
  private final int temp;

  public TempInfo(String town, int temp) {
    this.town = town;
    this.temp = temp;
  }

  public static TempInfo fetch(String town) {
    if (random.nextInt(10) == 0) {
      throw new RuntimeException("Error!");
    }
    return new TempInfo(town, random.nextInt(100));
  }

  @Override
  public String toString() {
    return town + " : " + temp;
  }

Subscriber가 요청할 때마다 Subscription이 특정한 값을 Subscriber에게 전달.

Subscriber의 요청 : subscription.request(1)

Subscription의 전달 : subscriber.onNext(TempInfo.fetch(town))

public class TempSubscription implements Subscription {

  private static final ExecutorService executor = Executors.newSingleThreadExecutor();

  private final Subscriber<? super TempInfo> subscriber;
  private final String town;

  public TempSubscription(Subscriber<? super TempInfo> subscriber, String town) {
    this.subscriber = subscriber;
    this.town = town;
  }

  @Override
  public void request(long n) {
    executor.submit(() -> {
      for (long i = 0L; i < n; i++) {
        try {
          subscriber.onNext(TempInfo.fetch(town));
        } catch (Exception e) {
          subscriber.onError(e);
          break;
        }
      }
    });
  }

  @Override
  public void cancel() {
    subscriber.onComplete();
  }
}
public class TempSubscriber implements Subscriber<TempInfo> {

  private Subscription subscription;

  @Override
  public void onSubscribe(Subscription subscription) {
    this.subscription = subscription;
    subscription.request(1);
  }

  @Override
  public void onNext(TempInfo tempInfo) {
    System.out.println(tempInfo);
    subscription.request(1);
  }

  @Override
  public void onError(Throwable t) {
    System.err.println(t.getMessage());
  }

  @Override
  public void onComplete() {
    System.out.println("Done!");
  }

}

우선 특정한 Subscription을 가지고 있는 Publisher을 구현한다.

subscriber -> subscriber.onSubscribe(new TempSubscription(subscriber, town))

이후 Publisher에 Subscriber를 구독한다.

.subscribe(new TempSubscriber())


public class Main {

  public static void main(String[] args) {
    getTemperatures("New York").subscribe(new TempSubscriber());
  }

  private static Publisher<TempInfo> getTemperatures(String town) {
    return subscriber -> subscriber.onSubscribe(new TempSubscription(subscriber, town));
  }
}
@FunctionalInterface
public interface Publisher<T> {  
  void subscribe(Subscriber<? super T> s);
}

Subscriber를 인수로 받는 함수형 인테페이스이다.

그렇기에 위의 getTemperatures 에서 처럼 subscriber를 인자로 받는 람다로 구현하여 Publisher를 생성할 수 있는 것이다.

이렇게 생성한 Publisher는 subscribe 메서드를 통해 TempSubscription를 구독하고 있는 TempSubscriber를 구독한다.

이때 우리는 TempSubscription의 request에 의해 실행되는 결과를 받는다.

New York : 70
Error!

이때 위의 결과는 TempInfo에서 제공하는 화씨 온도 데이터인데 이를 섭씨 온도로 변화하고 싶으면 어떻게 해야할까?

이럴때 Processor을 이용하여 데이터를 변환할 수 있다.


public class TempProcessor implements Processor<TempInfo, TempInfo> {

  private Subscriber<? super TempInfo> subscriber;

  @Override
  public void subscribe(Subscriber<? super TempInfo> subscriber) {
    this.subscriber = subscriber;
  }

  @Override
  public void onNext(TempInfo temp) {
    subscriber.onNext(new TempInfo(temp.getTown(), (temp.getTemp() - 32) * 5 / 9));
  }

  @Override
  public void onSubscribe(Subscription subscription) {
    subscriber.onSubscribe(subscription);
  }

  @Override
  public void onError(Throwable throwable) {
    subscriber.onError(throwable);
  }

  @Override
  public void onComplete() {
    subscriber.onComplete();
  }
}
public class MainCelsius {

  public static void main(String[] args) {
    getCelsiusTemperatures("New York").subscribe(new TempSubscriber());
  }

  public static Publisher<TempInfo> getCelsiusTemperatures(String town) {
    return subscriber -> {
      TempProcessor processor = new TempProcessor();
      processor.subscribe(subscriber);
      processor.onSubscribe(new TempSubscription(processor, town));
    };
  }
}

바뀐 MainCelsius를 확인하면 TempProcessor가 TempSubscription를 구독하여 이를 onNext에서 원하는 값(화씨 -> 섭씨)로 변환한다.

리액티브 라이브러리 RxJava 사용하기

RxJava는 Flow, Publisher를 구현하는 두 클래스를 제공한다.

RxJava 문서를 읽다 보면 자바 9에서 리액티브 당김 기반 역압력 기능이 있는 Flow를 포함하는 Flowable 클래스를 확인할 수 있다.

역압력은 Publisher가 너무 빠른 속도록 데이터를 발행하면서 Subscriber가 이를 감당할 수 없는 상황에 이르는 것을 방지하는 기능이다.

나머지 클래스는 역압력을 지원하지 않는 기존 버전의 RxJava에서 제공하던 Observable 클래스다.

이 클래스는 단순한 프로그램, 마우스 움직임 같은 사용자 인터페이스 이벤트에 더 적합하다.

Observable 만들고 사용하기

Observable, Flowable 클래스는 다양한 종류의 리액티브 스트림을 편리하게 만들 수 있도록 여러 팩토리 메서드를 제공한다.

(Observable과 Flowable은 Publisher를 구현하므로 팩토리 메서드는 리액티브 스트림을 만든다.)

just() 팩토리 메서드는 한 개 이상의 요소를 이용해 이를 방출하는 Observable로 반환한다.

사용자와 실시간으로 상호작용하면서 지정된 속도로 이벤트를 방출하는 상황에서 유용하게 사용할 수 있는 다른 Observable 팩토리 메서드도 있다.

RxJava에서 Observable이 Flow API의 Publisher 역할을 하며 Observer는 Flow의 Subscriber 인터페이스 역할을 한다.

RxJava Observer 인터페이스는 자바 9 Subscriber와 같은 메서드를 정의하면서 onSubscribe 메서드가 Subscription 대신 Disposable 인수를 갖는다는 점만 다르다.

그리고 Observable은 역압력을 지원하지 않으므로 Subscription의 request 메서드를 포함하지 않는다.

public interface Observer<T> {
  void onSubscribe(Disposable d);
  void onNext(T t);
  void onError(Throwable t);
  void onComplete();
}

RxJava의 API는 자바 9 네이티브 Flow API 보다 유연하다.

예를 들어 다른 세 메서드는 생략하고 onNext 메서드의 시그니처에 해당하는 람다 표현식을 전달해 Observable을 구독할 수 있다.

즉 이벤트를 수신하는 onNext 메서드만 구현하고 나머지 완료, 에러 처리 메서드는 아무것도 하지 않는 기본 동작을 가진 Observer를 만들어 Observable에 가입할 수 있다.

이제 본격적으로 Observable을 만들어 보자.

아래는 1초마다 한 개의 온도를 방출하는 Observable이다.

public static Observable<TempInfo> getTemperature(String town) {
  return Observable.create(emitter- > // Observable 만들기
                          Observable.interval(1, TimeUnit.SECONDS)
                          .subscribe(i -> {
                            if(!emitter.isDisposed()) { // Observer가 아직 폐기되지 않았으면 어떤 작업을 수행
                              if(i>=5) {
                                emitter.onComplete();
                              } else {
                                try {
                                  emitter.onNext(TempInfo.fetch(town)); // Observer로 보고
                                } catch (Exception e) {
                                  emitter.onError(e); // 에러가 발생하면 Observer에 알림
                                }
                              }
                            }
                          }));
}

필요한 이벤트를 전송하는 ObservableEmitter를 소비하는 함수로 Observable을 만들어 반환했다.

RxJava의 ObservableEmitter 인터페이스는 RxJava의 기본 Emitter를 상속한다.

public interface Emitter<T> {
  void onNext(T t);
  void onError(Throwable t);
  void onComplete();
}

Emitter는 새 Disposable을 설정하는 메서드와 시퀀스가 이미 다운스트림을 폐기했는지 확인하는 메서드 등을 제공한다.

아래는 위의 Observable에 가입시킬 Observer다.

public class TempObserver implements Observer<TempInfo> {

  @Override
  public void onComplete() {
    System.out.println("Done!");
  }

  @Override
  public void onError(Throwable throwable) {
    System.out.println("Got problem: " + throwable.getMessage());
  }

  @Override
  public void onSubscribe(Disposable disposable) {}

  @Override
  public void onNext(TempInfo tempInfo) {
    System.out.println(tempInfo);
  }

}

RxJava의 Observable은 역압력을 지원하지 않으므로 전달된 요소를 처리한 다음 추가 요소를 요청하는 request() 메서드가 필요없어 더 간단하다.

이는 Main에서 아래와 같이 구독한다.

public class Main {

  public static void main(String[] args) {
    Observable<TempInfo> observable = getTemperature("New York");
    observable.subscribe(new TempObserver());

    try {
      Thread.sleep(10000L);
    }
    catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }
}

Observable을 변환하고 합치기

RxJava나 기타 리액티브 라이브러리는 자바 9 Flow API에 비해 스트림을 합치고, 거르는 등 풍부한 도구상자를 제공하는 것이 장점이다.

한 스트림을 다른 스트림의 입력으로 사용할 수도 있다.

또 스트림에서 관심 있는 요소만 거른 다른 스트림을 만들거나 매핑 함수로 요소를 변환하거나 두 스트림을 다양한 방법으로 합치는 등의 작업을 할 수 있다.

이를 활용하면 위에서 Processor를 활용하여 변환하였던 것을 아래처럼 간단히 변환할 수 있다.

public static Observable<TempInfo> getCelsiusTemperature(String town) {
  return getTemperature(town).map(temp -> new TempInfo(temp.getTown(), (temp.getTemp() - 32) * 5 / 9));
}

0개의 댓글