Reactive Streams #1

soon world·2021년 12월 31일
0

Reactive

목록 보기
1/3

준비물 : JAVA 9 이상 or reactive library

Publisher 와 Subscriber

Publisher 와 Subscriber 의 관계는 이미 메시지큐 미들웨어 등에서 많이 접할 수 있는 개념이긴 합니다. (저의 경우는 RabbitMQ 를 접하면서 접했었쥬..ㅎㅎ)

이를 옵저버 패턴과 비교 해 보자면,
Publisher 는 Observable 에 해당되고 Subscriber 는 Observer 에 해당 됩니다.

더 간단하게 얘기하면 Publisher 는 정보를 제공 해 주는 역할을 하며, Subscriber 는 정보를 받는 역할을 합니다. 아주 쉬운 예로 신문의 구독을 예로 들 수 있는데

신문사(Publisher) -------------------> 구독자(나) Subscriber

로 예를 들 수 있습니다.

Publisher 는 Subscriber 에게 반드시 전해줘야 하는 메서드가 있습니다.

근데 여기서 한가지 개념이 더 나오는데 그 개념이 바로 Subscription 입니다.
Subscription 이란 Publisher 와 Subscriber 간 중개 역할을 하는 인터페이스 인데,
여기서 말하는 중개 역할은 Publisher 와 Subscriber 간 데이터 처리 속도가 다를 경우 이를 맞춰주기 위한 중개를 말합니다.

예를 들어 Publisher 의 데이터 제공 속도가 건당 1초가 걸리는데 Subscriber 는 데이터를 받아 처리 하는시간이 건당 10초가 걸린다고 가정 해 봤을 때 Publisher 와 Subscriber 간 처리 속도가 달라 Subscriber 입장에서 굉장한 부담이 됩니다. 이를 맞춰주기 위해 중개 역할을 해 주는 건데 이러한 개념을 BackPressure 라고 합니다.

Subscriber 는 Subscription 에게 중개 요청을 request 라는 메서드 에서 진행 하게 됩니다.

그럼 이제 간단한 Publisher 구현을 먼저 보겠습니다.
1~5까지 들어있는 iterable 을 순회하며 제공하는 publisher 입니다.

 Iterable<Integer> itr = Arrays.asList(1,2,3,4,5);

        Publisher p = new Publisher() {
            @Override
            public void subscribe(Subscriber subscriber) {
                Iterator<Integer> it = itr.iterator();

                subscriber.onSubscribe(new Subscription() {
                 //Subscriber 가 Subscription 에 있는 request 에 데이터 전송 시 여기서 받게 됩니다. 
                    @Override
                    public void request(long n) {
                        while(n-- >0) {
                            if (it.hasNext()) {
                                subscriber.onNext(it.next());
                            } else {
                                subscriber.onComplete();
                                break;
                            }
                        }
                    }

                    @Override
                    public void cancel() {

                    }
                });
            }
        };

다음으로 Subscriber 의 형태를 보도록 하겠습니다.

Subscriber<Integer> s = new Subscriber<Integer>() {
            Subscription subscription;
            @Override
            public void onSubscribe(Subscription subscription) {
                System.out.println("onSubscribe");
                this.subscription = subscription;
                this.subscription.request(1); //1개씩 처리하겠다고  publisher 에게 요청하는 의미
// Long.MAX_VALUE 로 요청 시 한번에 Publisher 가 제공하는 모든 데이터를 받겠다는 의미                
            }

            @Override
            public void onNext(Integer item) { // 옵저버의 update 와 같은 메서드
                System.out.println("onNext " + item);
                this.subscription.request(1); // 응답 받으면 다시 publisher 에게 1개 달라고 요청 한다는 의미
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println("onError");
            }

            @Override
            public void onComplete() { // 데이터를 모두 처리 했을 경우
                System.out.println("oonComplete");
            }
        };
        
p.subscribe(s);

실행 시 onSubscribe() 메서드를 먼저 호출 한 뒤 onNext 를 호출 하고 데이터를 모두 꺼냈으면 onComplete() 메서드를 호출 하며 subscribe 가 종료 됩니다.

[실행결과]
onSubscribe
onNext 1
onNext 2
onNext 3
onNext 4
onNext 5
oonComplete

이상으로 Publisher 와 Subscriber 의 기본 형태에 대해 알아봤습니다!

참고 : https://www.youtube.com/watch?v=8fenTR3KOJo&list=PLv-xDnFD-nnmof-yoZQN8Fs2kVljIuFyC&index=10

 
profile
Hello SoonWorld

0개의 댓글