Reactive Streams

이상민·2023년 3월 27일
0

Reactive Streams?

  • 리액티브 스트림은 자바와 다른 언어에서 사용되는 프로그래밍 패러다임으로, 데이터 스트림을 논블로킹(non-blocking) 및 비동기(asynchronous) 방식으로 처리하는 데 중점을 둔다
  • 이 접근 방식은 대량의 데이터 또는 높은 동시성 수준을 처리할 때 리소스를 보다 효율적으로 사용하고 성능을 향상시킬 수 있다
  • 리액티브 프로그래밍은 실시간 데이터(예: 실시간 피드 또는 사용자 상호 작용)를 다룰 때 특히 유용하다.
  • 전통적인 순차 또는 명령형 프로그래밍에서는 코드가 단계별로 실행되며, 이전 작업이 완료될 때까지 각 작업이 대기한다. 이는 I/O 바운드 작업이나 오래 실행되는 작업을 처리할 때 비효율적일 수 있다.
  • 반면에 논블로킹(non-blocking) 실행은 작업이 완료될 때까지 기다리는 동안 다른 작업을 계속 처리할 수 있도록 한다. 이는 콜백(callbacks), 프로미스(promises) 또는 다른 비동기 프로그래밍 구성 요소를 사용하여 이루어진다

Java Reactive Streams 구현체

  • 자바에서는 리액티브 스트림 API가 java.util.concurrent.Flow 패키지의 일부로 구현되어 있으며, 네 가지 주요 인터페이스가 포함되어 있다:

    • Publisher: 데이터의 원천으로, 구독자에게 항목을 발행
    • Subscriber: 데이터 소비자로, 발행자에게 구독하고 발행된 항목을 처리
    • Subscription: 발행자와 구독자 간의 연결을 나타내며, 구독자가 더 많은 항목을 요청하거나 구독을 취소할 수 있게 함
    • Processor: 발행자와 구독자의 기능을 모두 수행할 수 있다. 주로 첫 발행자와 최종 구독자 사이에서 중간 로직을 처리하기 위해 사용된다
  • 자바에서 리액티브 스트림을 사용하려면 Project Reactor 또는 RxJava와 같은 인기 라이브러리를 사용할 수 있으며, 이러한 라이브러리는 리액티브 데이터 스트림을 처리하는 데 사용되는 고수준 추상화 및 유틸리티 메소드를 제공한다

  • ReactorFlux를 사용한 간단한 예시:

public class ReactiveExample {
    public static void main(String[] args) {
        Flux<Integer> numbers = Flux.range(1, 10); // 1부터 10까지의 숫자 스트림 생성
        numbers.subscribe(System.out::println); // 스트림에 구독하고 각 숫자를 출력
    }
}
  • 리액티브 프로그래밍 패러다임을 받아들이고 논블로킹 코드 실행을 사용함으로써, 보다 확장성 있고 빠르며 리소스 효율적인 애플리케이션을 구축할 수 있다

내부 동작

numbers.subscribe()는 구독자 또는 콜백 집합(예: System.out::println)을 사용하여 데이터 스트림 수명 주기의 다양한 측면을 처리하는 메서드이다.

numbers.subscribe(System.out::println)를 호출하면 System.out.println에 대한 메서드 참조를 전달한다. 이는 onNext() 메서드를 System.out.println으로 설정한 구독자를 정의하는 것과 동일하다. onNext() 메서드는 발행자가 새 항목을 내보낼 때마다 호출된다.

다음은 내부에서 발생하는 동작이다:

  1. subscribe를 호출하면 제공된 콜백(이 경우 onNext를 위한 System.out::println)을 사용하여 구독자 객체가 생성된다.
  2. 구독자는 발행자(이 경우 Flux 인스턴스)에 구독한다.
  3. 발행자는 발행자와 구독자 사이의 연결을 나타내는 구독 객체를 생성한다. 이 구독 객체는 구독자의 onSubscribe 메서드에 전달된다.
  4. 구독자는 구독 객체에서 request(n)을 호출하여 발행자로부터 항목을 요청한다. 여기서 n은 처리할 준비가 된 항목 수이다. 기본적으로 ReactorFlux는 제한 없는 요청 전략을 사용하여 Long.MAX_VALUE 항목을 요청한다.
  5. 발행자는 구독자에게 항목을 내보낸다. 각 항목에 대해 구독자의 onNext 메서드(이 경우 System.out::println)를 호출한다. 이로 인해 숫자가 콘솔에 출력된다.
  6. 발행자가 모든 항목을 내보낸 후에는 구독자의 onComplete 메서드를 호출하여 스트림이 끝났음을 알린다. 처리 중 오류가 발생하면 발행자는 관련 예외와 함께 구독자의 onError 메서드를 호출한다.
  • 리액티브 프레임워크는 이 작업이 논블로킹, 비동기 방식으로 이루어지도록 보장하여 데이터 스트림 처리를 효율적으로 수행한다.

구현 예제

  • 아래와 같이 Publisher와 Subscriber의 간단한 예시 구현을 해볼 수 있다
import java.util.concurrent.Flow.*;
import java.util.stream.IntStream;

public class ReactiveExample {
    public static void main(String[] args) {
        // 발행자 생성
        CustomPublisher publisher = new CustomPublisher(IntStream.range(1, 11));

        // 구독자 생성
        CustomSubscriber subscriber = new CustomSubscriber();

        // 구독자를 발행자에 구독
        publisher.subscribe(subscriber);
    }
}

class CustomPublisher implements Publisher<Integer> {
    private final IntStream data;

    CustomPublisher(IntStream data) {
        this.data = data;
    }

    @Override
    public void subscribe(Subscriber<? super Integer> subscriber) {
        // 구독을 생성해서 구독자에 전달
        CustomSubscription subscription = new CustomSubscription(subscriber, data.iterator());
        subscriber.onSubscribe(subscription);
    }
}

class CustomSubscription implements Subscription {
    private final Subscriber<? super Integer> subscriber;
    private final IntStream.IntIterator dataIterator;

    CustomSubscription(Subscriber<? super Integer> subscriber, IntStream.IntIterator dataIterator) {
        this.subscriber = subscriber;
        this.dataIterator = dataIterator;
    }

    @Override
    public void request(long n) {
        try {
            while (n-- > 0 && dataIterator.hasNext()) {
                subscriber.onNext(dataIterator.nextInt());
            }

            if (!dataIterator.hasNext()) {
                subscriber.onComplete();
            }
        } catch (Exception e) {
            subscriber.onError(e);
        }
    }

    @Override
    public void cancel() {
        // 취소 로직 
    }
}

class CustomSubscriber implements Subscriber<Integer> {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1); // 첫 아이템 요청
    }

    @Override
    public void onNext(Integer item) {
        System.out.println(item);
        subscription.request(1); // 다음 아이템 요청 
    }

    @Override
    public void onError(Throwable throwable) {
        System.err.println("Error occurred: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Stream completed");
    }
}

이 예제에서는:

  1. IntStream을 데이터 소스로 사용하는 CustomPublisher라는 사용자 지정 발행자를 생성했다.
  2. 발행자와 구독자를 연결하는 CustomSubscription이라는 사용자 지정 구독을 생성했다. 이 구독은 구독자의 요청을 처리하고 데이터 항목을 적절하게 내보낸다.
  3. 발행자에게 구독하는 CustomSubscriber라는 사용자 지정 구독자를 생성했다. onSubscribe() 메서드에서는 구독을 통해 발행자로부터 첫 번째 항목을 요청한다. onNext() 메서드에서는 각 받은 항목을 출력하고 다음 항목을 요청한다.
  4. 메인 메서드에서 1부터 10까지의 정수 범위를 가진 CustomPublisher를 인스턴스화하고, CustomSubscriber를 생성한 후 발행자에 구독시켰다.
  • 이 예제는 발행자, 구독, 구독자의 사용자 지정 구현을 사용하여 리액티브 스트림 API가 저수준에서 어떻게 작동하는지 보여준다. 실제로는 Reactor 또는 RxJava와 같은 라이브러리를 사용하여 더 높은 수준의 추상화와 유틸리티 메서드를 제공하므로, 리액티브 스트림을 사용하기가 더 쉬워진다.
profile
편하게 읽기 좋은 단위의 포스트를 추구하는 개발자입니다

0개의 댓글