Publisher 와 Subscriber 의 관계는 이미 메시지큐 미들웨어 등에서 많이 접할 수 있는 개념이긴 합니다. (저의 경우는 RabbitMQ 를 접하면서 접했었쥬..ㅎㅎ)
이를 옵저버 패턴과 비교 해 보자면,
Publisher 는 Observable 에 해당되고 Subscriber 는 Observer 에 해당 됩니다.
더 간단하게 얘기하면 Publisher 는 정보를 제공 해 주는 역할을 하며, Subscriber 는 정보를 받는 역할을 합니다. 아주 쉬운 예로 신문의 구독을 예로 들 수 있는데
로 예를 들 수 있습니다.
Publisher 는 Subscriber 에게 반드시 전해줘야 하는 메서드가 있습니다.
근데 여기서 한가지 개념이 더 나오는데 그 개념이 바로 Subscription 입니다.
Subscription 이란 Publisher 와 Subscriber 간 중개 역할을 하는 인터페이스 인데,
여기서 말하는 중개 역할은 Publisher 와 Subscriber 간 데이터 처리 속도가 다를 경우 이를 맞춰주기 위한 중개를 말합니다.
예를 들어 Publisher 의 데이터 제공 속도가 건당 1초가 걸리는데 Subscriber 는 데이터를 받아 처리 하는시간이 건당 10초가 걸린다고 가정 해 봤을 때 Publisher 와 Subscriber 간 처리 속도가 달라 Subscriber 입장에서 굉장한 부담이 됩니다. 이를 맞춰주기 위해 중개 역할을 해 주는 건데 이러한 개념을 BackPressure 라고 합니다.
Subscriber 는 Subscription 에게 중개 요청을 request 라는 메서드 에서 진행 하게 됩니다.
그럼 이제 간단한 Publisher 구현을 먼저 보겠습니다.
1~5까지 들어있는 iterable 을 순회하며 제공하는 publisher 입니다.
Iterable<Integer> itr = Arrays.asList(1,2,3,4,5);
Publisher p = new Publisher() {
@Override
public void subscribe(Subscriber subscriber) {
Iterator<Integer> it = itr.iterator();
subscriber.onSubscribe(new Subscription() {
//Subscriber 가 Subscription 에 있는 request 에 데이터 전송 시 여기서 받게 됩니다.
@Override
public void request(long n) {
while(n-- >0) {
if (it.hasNext()) {
subscriber.onNext(it.next());
} else {
subscriber.onComplete();
break;
}
}
}
@Override
public void cancel() {
}
});
}
};
다음으로 Subscriber 의 형태를 보도록 하겠습니다.
Subscriber<Integer> s = new Subscriber<Integer>() {
Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("onSubscribe");
this.subscription = subscription;
this.subscription.request(1); //1개씩 처리하겠다고 publisher 에게 요청하는 의미
// Long.MAX_VALUE 로 요청 시 한번에 Publisher 가 제공하는 모든 데이터를 받겠다는 의미
}
@Override
public void onNext(Integer item) { // 옵저버의 update 와 같은 메서드
System.out.println("onNext " + item);
this.subscription.request(1); // 응답 받으면 다시 publisher 에게 1개 달라고 요청 한다는 의미
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError");
}
@Override
public void onComplete() { // 데이터를 모두 처리 했을 경우
System.out.println("oonComplete");
}
};
p.subscribe(s);
실행 시 onSubscribe() 메서드를 먼저 호출 한 뒤 onNext 를 호출 하고 데이터를 모두 꺼냈으면 onComplete() 메서드를 호출 하며 subscribe 가 종료 됩니다.
[실행결과]
onSubscribe
onNext 1
onNext 2
onNext 3
onNext 4
onNext 5
oonComplete
이상으로 Publisher 와 Subscriber 의 기본 형태에 대해 알아봤습니다!
참고 : https://www.youtube.com/watch?v=8fenTR3KOJo&list=PLv-xDnFD-nnmof-yoZQN8Fs2kVljIuFyC&index=10