BehaviorSubject + 정해준 버퍼 크기만큼의 next이벤트를 defaultValue로 저장후 방출
https://reactivex.io/documentation/ko/subject.html 에서 제공하는 그림입니다.(마블다이어그램)
맨 위에 사각형이 subject입니다, 그리고 아래 두 개의 선이 각각의 옵저버입니다. 서브젝트를 처음 구독한 빨간색 공을 보면 그냥 일반적인 PublishSubject처럼 동작하고 있죠. 그런데 세 번째 선이 구독하는 시점에서 보면 바로 빨간공과 녹색공이 방출됩니다. 즉, ReplaySubject에서 이전의 이벤트들을 저장하고 있다가, 방출해줬다고 볼 수 있겠죠. 이름처럼 Replay를 해주는 겁니다. 이전의 이벤트를 다시 실행해줌으로써 옵저버에게 이벤트를 전달하는 것이죠. 이후에 추가된 파란공은 일반적인 subject처럼 동작합니다. 이벤트가 방출되면 옵저버에게 전달하는 방식으로요.
그러면 언제 사용하면 좋을까요? 최근검색어를 5 개 정도를 보여줘야하는 UI와 데이터를 연결해야할 때가 생각나네요. 최근검색어 이므로 최근을 순서대로 데이터를 알려줘야합니다. 그리고 특정 갯수 제한된 것만큼 보여주면 되겠죠. 이 상황을 코드로 유사하게 작성해볼게요.
(UI를 다룰 때는 다른식으로 바인딩하지만, 이 글에서는 subject의 이해가 중점이므로 해당 부분은 다른 글에서 설명하겠습니다.)
let recentSeachKeywords = ReplaySubject<String>.create(bufferSize: 5)
이전 서브젝트와 조금 다르게 create 메소드를 통해서 서브젝트를 생성합니다.
RxSwift의 소스코드에 있는 아래 코드때문에 그렇습니다.
/// Creates new instance of `ReplaySubject` that replays at most `bufferSize` last elements of sequence.
///
/// - parameter bufferSize: Maximal number of elements to replay to observer after subscription.
/// - returns: New instance of replay subject.
public static func create(bufferSize: Int) -> ReplaySubject<Element> {
if bufferSize == 1 {
return ReplayOne()
}
else {
return ReplayMany(bufferSize: bufferSize)
}
}
이 코드를 보면, bufferSize
를 입력받고, 1이 아닌 경엔, 해당 크기만큼 버퍼를 생성하도록하는 "ReplayMany" 클래스를 호출합니다.
private final class ReplayMany<Element> : ReplayManyBase<Element> {
private let bufferSize: Int
init(bufferSize: Int) {
self.bufferSize = bufferSize
super.init(queueSize: bufferSize)
}
override func trim() {
while self.queue.count > self.bufferSize {
_ = self.queue.dequeue()
}
}
}
private class ReplayManyBase<Element>: ReplayBufferBase<Element> {
fileprivate var queue: Queue<Element>
init(queueSize: Int) {
self.queue = Queue(capacity: queueSize + 1)
}
override func addValueToBuffer(_ value: Element) {
self.queue.enqueue(value)
}
override func replayBuffer<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
for item in self.queue {
observer.on(.next(item))
}
}
override func synchronized_dispose() {
super.synchronized_dispose()
self.queue = Queue(capacity: 0)
}
}
위 코드에 대한 내용는 참고용으로 올려둡니다.
다시 본론으로 돌아와서
let recentSeachKeywords = ReplaySubject<String>.create(bufferSize: 5)
로 서브젝트를 구성했으면, 10 개의 이벤트를 전달해보겠습니다.
recentSeachKeywords.onNext("한라봉")
recentSeachKeywords.onNext("낑깡")
recentSeachKeywords.onNext("블랙베리")
recentSeachKeywords.onNext("아보카도")
recentSeachKeywords.onNext("표고버섯")
recentSeachKeywords.onNext("샤인머스켓")
(배열로 묶은 뒤, forEach로 구현하셔도 무방합니다.)
6개의 검색어를 이벤트로 전달했습니다. 그리고 서브젝트의 버퍼는 5로 했으니 5 개를 저장하고 있겠네요. 이제 옵저버를 추가해서 어떤결과가 이벤트로 전달되는지 확인해보겠습니다.
recentSeachKeywords
.subscribe { print($0) }
.disposed(by: disposeBag)
결과)
next(낑깡)
next(블랙베리)
next(아보카도)
next(표고버섯)
next(샤인머스켓)
6개를 입력했는데 5 개만 이벤트로 전달받았습니다. 그리고 최신순으로 5 개를 저장했습니다. 최신순을 먼저 판단하고 5개를 넣은 것은 아니구요. 한라봉이 삭제되고 샤인머스켓이 추가되었다는 표현이 좀더 정확합니다.
이후에 completed 이벤트를 전달하게되면, 5개를 전달하고 completed이벤트를 전달합니다. completed 이후에 이벤트는 더이상 전달하지 않습니다.