[RxSwift][TIL] ReplaySubject

Uno·2022년 1월 22일
0

RxSwift

목록 보기
6/9

ReplaySubject


  • 결론

BehaviorSubject + 정해준 버퍼 크기만큼의 next이벤트를 defaultValue로 저장후 방출

https://reactivex.io/documentation/ko/subject.html 에서 제공하는 그림입니다.(마블다이어그램)

맨 위에 사각형이 subject입니다, 그리고 아래 두 개의 선이 각각의 옵저버입니다. 서브젝트를 처음 구독한 빨간색 공을 보면 그냥 일반적인 PublishSubject처럼 동작하고 있죠. 그런데 세 번째 선이 구독하는 시점에서 보면 바로 빨간공과 녹색공이 방출됩니다. 즉, ReplaySubject에서 이전의 이벤트들을 저장하고 있다가, 방출해줬다고 볼 수 있겠죠. 이름처럼 Replay를 해주는 겁니다. 이전의 이벤트를 다시 실행해줌으로써 옵저버에게 이벤트를 전달하는 것이죠. 이후에 추가된 파란공은 일반적인 subject처럼 동작합니다. 이벤트가 방출되면 옵저버에게 전달하는 방식으로요.

그러면 언제 사용하면 좋을까요? 최근검색어를 5 개 정도를 보여줘야하는 UI와 데이터를 연결해야할 때가 생각나네요. 최근검색어 이므로 최근을 순서대로 데이터를 알려줘야합니다. 그리고 특정 갯수 제한된 것만큼 보여주면 되겠죠. 이 상황을 코드로 유사하게 작성해볼게요.

(UI를 다룰 때는 다른식으로 바인딩하지만, 이 글에서는 subject의 이해가 중점이므로 해당 부분은 다른 글에서 설명하겠습니다.)

let recentSeachKeywords = ReplaySubject<String>.create(bufferSize: 5)

이전 서브젝트와 조금 다르게 create 메소드를 통해서 서브젝트를 생성합니다.

RxSwift의 소스코드에 있는 아래 코드때문에 그렇습니다.

    /// Creates new instance of `ReplaySubject` that replays at most `bufferSize` last elements of sequence.
    ///
    /// - parameter bufferSize: Maximal number of elements to replay to observer after subscription.
    /// - returns: New instance of replay subject.
    public static func create(bufferSize: Int) -> ReplaySubject<Element> {
        if bufferSize == 1 {
            return ReplayOne()
        }
        else {
            return ReplayMany(bufferSize: bufferSize)
        }
    }

이 코드를 보면, bufferSize 를 입력받고, 1이 아닌 경엔, 해당 크기만큼 버퍼를 생성하도록하는 "ReplayMany" 클래스를 호출합니다.

private final class ReplayMany<Element> : ReplayManyBase<Element> {
    private let bufferSize: Int
    
    init(bufferSize: Int) {
        self.bufferSize = bufferSize
        
        super.init(queueSize: bufferSize)
    }
    
    override func trim() {
        while self.queue.count > self.bufferSize {
            _ = self.queue.dequeue()
        }
    }
}
private class ReplayManyBase<Element>: ReplayBufferBase<Element> {
    fileprivate var queue: Queue<Element>
    
    init(queueSize: Int) {
        self.queue = Queue(capacity: queueSize + 1)
    }
    
    override func addValueToBuffer(_ value: Element) {
        self.queue.enqueue(value)
    }

    override func replayBuffer<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
        for item in self.queue {
            observer.on(.next(item))
        }
    }

    override func synchronized_dispose() {
        super.synchronized_dispose()
        self.queue = Queue(capacity: 0)
    }
}

위 코드에 대한 내용는 참고용으로 올려둡니다.

다시 본론으로 돌아와서

let recentSeachKeywords = ReplaySubject<String>.create(bufferSize: 5)

로 서브젝트를 구성했으면, 10 개의 이벤트를 전달해보겠습니다.

recentSeachKeywords.onNext("한라봉")
recentSeachKeywords.onNext("낑깡")
recentSeachKeywords.onNext("블랙베리")
recentSeachKeywords.onNext("아보카도")
recentSeachKeywords.onNext("표고버섯")
recentSeachKeywords.onNext("샤인머스켓")

(배열로 묶은 뒤, forEach로 구현하셔도 무방합니다.)

6개의 검색어를 이벤트로 전달했습니다. 그리고 서브젝트의 버퍼는 5로 했으니 5 개를 저장하고 있겠네요. 이제 옵저버를 추가해서 어떤결과가 이벤트로 전달되는지 확인해보겠습니다.

recentSeachKeywords
	.subscribe { print($0) }
 	.disposed(by: disposeBag)

결과)

next(낑깡)
next(블랙베리)
next(아보카도)
next(표고버섯)
next(샤인머스켓)

6개를 입력했는데 5 개만 이벤트로 전달받았습니다. 그리고 최신순으로 5 개를 저장했습니다. 최신순을 먼저 판단하고 5개를 넣은 것은 아니구요. 한라봉이 삭제되고 샤인머스켓이 추가되었다는 표현이 좀더 정확합니다.

이후에 completed 이벤트를 전달하게되면, 5개를 전달하고 completed이벤트를 전달합니다. completed 이후에 이벤트는 더이상 전달하지 않습니다.

profile
iOS & Flutter

0개의 댓글