Subject

지금까지 Observable, Single, Flowable 등의 데이터 스트림에 대해 알아보았다. 이번 포스팅에서 알아볼 녀석은 Subject 라는 녀석인데, 이는 관찰 가능한 데이터 스트림과 관찰자(구독자)의 성격을 모두 갖고 있는 특이한 녀석이다. 즉 Observable 과 Observer 를 모두 구현한 추상 타입으로, 하나의 소스로부터 다중의 구독자에게 멀티 캐스팅이 가능하다는 특징을 갖고 있다.

public abstract class Subject<T> extends Observable<T> implements Observer<T> {
    public Subject() {
    }

    public abstract boolean hasObservers();

    public abstract boolean hasThrowable();

    public abstract boolean hasComplete();

    @Nullable
    public abstract Throwable getThrowable();

    @NonNull
    public final Subject<T> toSerialized() {
        return (Subject)(this instanceof SerializedSubject ? this : new SerializedSubject(this));
    }
}

Observer 를 구현한다는 특징때문에, onNext(), onError(), onComplete() 등의 이벤트를 수동으로 발생하여 구독자들에게 전달해줄 수 있다.

그럼, 다양한 Subject 의 종류 중 몇 가지에 대해 하나씩 알아보도록 하자.


PublishSubject

PublishSubject가장 단순한 Subject 구현체중 하나이다. 구독자들에게 이벤트를 널리 전달할 수 있으며, Hot Observable 특성을 갖고 있어 구독한 시점부터 발생하는 데이터를 전달한다. 따라서 데이터가 모두 발행되고 난 뒤 구독을 하면 아무 데이터도 받아볼 수 없게 된다.

아래 코드를 보면 여러 구독자를 붙여줄 수 있고, 직접 Emitter 이벤트를 발생시킬 수 있는 점을 확인해볼 수 있다.

fun main() {
    val src = PublishSubject.create<Int>()
    src.subscribe {
        println("A : $it")
    }
    src.subscribe {
        println("B : $it")
    }

    src.onNext(10)
    src.onNext(20)
    src.onNext(30)
}
A : 10
B : 10
A : 20
B : 20
A : 30
B : 30

그리고 다른 데이터 스트림의 구독자로서의 행세도 할 수 있기 때문에, 구독자로서 전달받은 데이터를 발행하는 식의 동작도 구현할 수 있다. 다른 Observable 로부터 전달받은 데이터를 자신의 구독자에게 전달해주는 것이다.

fun main() {
    val src1 = Observable.interval(1, TimeUnit.SECONDS)
    val src2 = Observable.interval(500, TimeUnit.MILLISECONDS)
    val subject = PublishSubject.create<String>()

    src1.map { "A : $it" }.subscribe(subject)
    src2.map { "B : $it" }.subscribe(subject)
    subject.subscribe(System.out::println)

    Thread.sleep(3000)
}
B : 0
B : 1
A : 0
B : 2
B : 3
A : 1
B : 4
A : 2
B : 5

Observable 로 부터 데이터를 전달받는다는 점에서 Observablemerge 연산자와 비슷한 동작을 수행하는 것을 확인해볼 수 있다.


BehaviorSubject

BehaviorSubject 는 특이한 성질을 갖고 있다. PublishSubject 와 동일하게 동작하지만, 새로운 구독자가 들어온 경우 해당 구독자에게 구독 시점에 가장 마지막 데이터를 발행한다는 점이 특징이다. 따라서 가장 최신값을 가져오는 등의 동작을 구현할 때 유용하게 사용될 수 있다.

fun main() {
    val src = BehaviorSubject.create<Int>()
    src.subscribe { println("첫번째 $it") }
    src.onNext(1)
    src.subscribe { println("****두번째 $it") }
    src.onNext(2)
    src.onNext(3)
    src.subscribe { println("********세번째 $it") }
    src.onNext(4)
    src.onComplete()
}
첫번째 1
****두번째 1
첫번째 2
****두번째 2
첫번째 3
****두번째 3
********세번째 3
첫번째 4
****두번째 4
********세번째 4

구독을 한 이후에는 PublishSubject 와 동일하게 모두 수신할 수 있다.


ReplaySubject

PublishSubjectcache 연산자를 적용한 것과 유사한 동작을 수행한다. 새로운 구독자가 생겼을 경우 이전에 발행했던 데이터들을 모두 해당 구독자에게 전달해주는 특징을 갖고 있다.

fun main() {
    val src = ReplaySubject.create<Int>()
    src.onNext(1)
    src.onNext(2)
    src.onNext(3)

    src.subscribe(System.out::println)

    src.onNext(4)
}
1
2
3
4

데이터 '1, 2, 3' 이 발행되고난 뒤 새로운 구독자가 들어왔을 때, 이미 발행했던 '1, 2, 3' 이 다시금 반복되는 것이다. 그러고나서는 PublishSubject 과 다름없이 '4'가 정상적으로 발행되는 것을 확인할 수 있다.

ReplaySubject 를 사용할때는 수많은 혹은 무한한 데이터를 발행하는 소스에 대해 적용하면 자칫 OOM (OutOfMemoryException) 이 발생할 가능성이 높다. 조심히 사용하도록 하자!


AsyncSubject

AsyncSubject 는 onComplete() 호출 직전에 발행된 아이템만 구독자들에게 전달한다. 즉, onComplete() 가 발생할 때까지는 아무 데이터도 전달하지 않다가 onComplete() 가 발생하면 가장 마지막에 발행된 데이터를 전달하는 것이다.

fun main() {
    val src = AsyncSubject.create<Int>()
    src.subscribe {
        println("A : $it")
    }
    src.onNext(10)
    src.subscribe {
        println("B : $it")
    }
    src.onNext(20)
    src.subscribe {
        println("C : $it")
    }
    src.onNext(30)
    src.onComplete()
}
A : 30
B : 30
C : 30

구독자는 연달아 3번에 거쳐 들어오게 되고, 그 사이 데이터는 3회 발행됐다. 끝으로 onComplete() 가 호출되자 3마리 구독자에게 가장 마지막으로 발행된 '30'이라는 데이터가 전달됐다.


UnicastSubject

다른 Subject 들과 비슷하지만, 핵심적인 차이점을 갖고있다. 어떤 구독자가 UnicastSubject 를 구독하기 전까지는 발행하는 데이터들을 계속 버퍼에 저장해뒀다가 구독을 시작할 때 버퍼의 데이터들을 싹 발행하고 버퍼를 깨끗이 비워낸다.

그렇다면 처음으로 들어온 구독자가 모든 데이터들을 소비할 것이고, 두 번째로 들어온 구독자들은 아무 데이터도 받아볼 수 없을 것이다. 따라서 구독자를 딱 하나만 둘 수 있고, 때문에 'Unicast' 라는 용어가 제격인 것이다. (만약 두 개 이상 구독자가들어올 시 IllegalStateException 을 발생시킨다)

fun main() {
    val src = UnicastSubject.create<Long>()
    Observable.interval(1, TimeUnit.SECONDS)
        .subscribe(src)

    Thread.sleep(3000)

    src.subscribe {
        println("A : $it")
    }
    Thread.sleep(2000)
}
A : 0
A : 1
A : 2
A : 3
A : 4

3초가 흘러갈 동안 Observable 은 '0, 1, 2' 이렇게 3개의 데이터를 발행했다. UnicastSubject 는 이들을 버퍼에 쌓아두고, 첫번째 구독자가 들어왔을 때 한 번에 발행하여 '0, 1, 2' 가 동시에 출력되는 것을 확인할 수 있다. 이후 발행되는 데이터들은 그대로 전달받게 된다.


참고자료

https://duzi077.tistory.com/178
https://chanhyeok.tistory.com/15
도서 '아키텍처를 알아야 앱 개발이 보인다' - 옥수환 저

profile
어려울수록 기본에 미치고 열광하라

4개의 댓글

comment-user-thumbnail
2021년 10월 27일

Thank you, many people might find this article very useful. At first, I thought it was written about talking to a person on the internet in general. Actually, I can give people some advice - there are tons of different dating apps, and you never know whether this person is real or fake, so you can use https://www.bizimmekan.xyz/ or https://www.esohbet.net/ to video chat with girls.

답글 달기
comment-user-thumbnail
2021년 12월 3일

bedava arkadaslik sitesi olan https://www.hayatarkadasim.net ve https://kafiyebul.com web sitemiz ile online arkadaş bulabilirsiniz. https://www.bizimmekanlar.com yeni açılan sohbet odalarımıza istediğiniz zaman katılabilirsiniz.

1개의 답글