observable이 item을 방출한 이후에 구독하였더라도 모든 subscriber가 동일한 방출된 item의 sequence를 볼 수 있게 함
let hi = PublishSubject<String>()
let parrot = hi.replay(1)
parrot.connect()
hi.onNext("hi")
hi.onNext("hello")
parrot
.subscribe(onNext: {
print($0)
})
.disposed(by: disposeBag)
hi.onNext("good")
// 실행 시 다음을 출력합니다.
hello
good
만약 bufferSize가 2라면 hello전에 hi도 출력합니다.
: replay와 같지만 buffersize와 관계없이 구독 이전에 방출된 item을 모두 나타냅니다.
let a = PublishSubject<String>()
let statements = a.replayAll()
statements.connect()
a.onNext("a")
a.onNext("b")
statements
.subscribe(onNext: {
print($0)
})
.disposed(by: disposeBag)
// 실행 시 다음을 출력합니다.
a
b
: observable이 방출한 item을 주기적으로 모아 한번에 한 item씩이 아니라 bundle로 배출함
public func buffer(timeSpan: RxTimeInterval, count: Int, scheduler: SchedulerType)
-> Observable<[Element]> {
BufferTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler)
}
let source = PublishSubject<String>()
var count = 0
let timer = DispatchSource.makeTimerSource()
timer.schedule(deadline: .now() + 2, repeating: .seconds(1))
timer.setEventHandler {
count += 1
source.onNext("\(count)")
}
timer.resume()
source
.buffer(timeSpan: .seconds(2), count: 2, scheduler: MainScheduler.instance)
.subscribe(onNext: {
print($0)
})
.disposed(by: disposeBag)
// 실행 시 다음을 출력합니다.
["1"] // 2초 내에 1밖에 받지 못했기 때문에 1만 출력
["2", "3"] // item 수가 최대(2)에 도달했기 때문에 출력
["4", "5"]
["6", "7"]
["8", "9"]
["10", "11"]
["12"]
["13", "14"]
["15", "16"]
["17", "18"]
.
.
. // 타이머이므로 계속해서 출력됩니다.
: observable의 항목을 주기적으로 observable window로 나누고 이러한 observable window를 방출합니다.
public func window(timeSpan: RxTimeInterval, count: Int, scheduler: SchedulerType)
-> Observable<Observable<Element>> {
return WindowTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler)
}
observable들을 병합하여 하나의 observable sequence로 방출하는 flatMap을 사용하여 window operator로 처리된 결과를 알아보겠습니다.
let window = PublishSubject<String>()
var windowCount = 0
let windowTimeSource = DispatchSource.makeTimerSource()
windowTimeSource.schedule(deadline: .now()+2, repeating: .seconds(1))
windowTimeSource.setEventHandler {
windowCount += 1
window.onNext("\(windowCount)")
}
windowTimeSource.resume()
window
.window(timeSpan: .seconds(3), count: 3, scheduler: MainScheduler.instance)
.flatMap { windowObservable -> Observable<(index: Int, element: String)> in
return windowObservable.enumerated()
}
.subscribe(onNext: {
print("\($0.index) 번째 observable의 요소 \($0.element)")
})
.disposed(by: disposeBag)
// 실행 시 다음을 출력합니다.
0 번째 observable의 요소 1
1 번째 observable의 요소 2
0 번째 observable의 요소 3
1 번째 observable의 요소 4
2 번째 observable의 요소 5
0 번째 observable의 요소 6
1 번째 observable의 요소 7
2 번째 observable의 요소 8
0 번째 observable의 요소 9
1 번째 observable의 요소 10
2 번째 observable의 요소 11
.
.
.// 타이머기에 계속해서 진행됩니다.
: 소스 observable에 대한 구독을 지연시킬 수 있는 연산자입니다
public func delaySubscription(_ dueTime: RxTimeInterval, scheduler: SchedulerType)
-> Observable<Element> {
DelaySubscription(source: self.asObservable(), dueTime: dueTime, scheduler: scheduler)
}
let delaysource = PublishSubject<String>()
var delayCount = 0
let delayTimeSource = DispatchSource.makeTimerSource()
delayTimeSource.schedule(deadline: .now(), repeating: .seconds(1))
delayTimeSource.setEventHandler {
delayCount += 1
delaysource.onNext("\(delayCount)")
}
delayTimeSource.resume()
delaysource
.delaySubscription(.seconds(5), scheduler: MainScheduler.instance)
.subscribe(onNext: {
print($0)
})
.disposed(by: disposeBag)
// 실행시 다음을 출력합니다.
6
7
8
9
10
11
12
13
.
.
. // 타이머이므로 계속해서 출력됩니다.
: 특정 시간만큼 observable을 이동하여 방출을 늦춥니다.
public func delay(_ dueTime: RxTimeInterval, scheduler: SchedulerType)
-> Observable<Element> {
return Delay(source: self.asObservable(), dueTime: dueTime, scheduler: scheduler)
}
let delaySubject = PublishSubject<String>()
var delayCount = 0
let delayTimerSource = DispatchSource.makeTimerSource()
delayTimerSource.schedule(deadline: .now(), repeating: .seconds(1))
delayTimerSource.setEventHandler {
delayCount += 1
delaySubject.onNext("\(delayCount)")
}
delayTimerSource.resume()
delaySubject
.delay(.seconds(3), scheduler: MainScheduler.instance)
.subscribe(onNext: {
print($0)
})
.disposed(by: disposeBag)
// 실행 시 다음을 출력합니다.
2
3
4
5
6
7
8
9
10
11
12
만약 delay(.seconds(5), scheduler: MainScheduler.instance) 였더라도 출력값은 같습니다.
왜냐하면 그저 방출 시간만 미뤄진 것이기 떄문입니다.
그렇다면 그 차이를 어떻게 알 수 있을까?
다음과 같이 3초 delay 된 경우에는 timer와 횟수가 4회 차이나지만,
5초 delay 된 경우에는 2초 더 늦어졌기 때문에
timer와 횟수가 2회 더 추가된 6회 차이나는 것을 볼 수 있습니다.
: 주어진 시간 간격만큼 정수를 방출하는 Observable을 생성합니다.
Observable<Int>
.interval(.second(3), schedule: MainSchedule.instance)
.subscribe(onNext: {
print($0)
}
.dispose(by: disposeBag)
// 실행 시 다음을 출력합니다.
0
1
2
3
.
.
.// 계속 출력됩니다.
: 주어진 만큼 지연한 이후에 item을 방출하는 observable을 생성합니다.
public static func timer(_ dueTime: RxSwift.RxTimeInterval, period: RxSwift.RxTimeInterval? = nil, scheduler: RxSwift.SchedulerType) -> RxSwift.Observable<Self.Element>
Observable<Int>
.timer(.seconds(5), period: .seconds(2), scheduler: MainScheduler.instance)
.subscribe(onNext: {
print($0)
})
.disposed(by: disposeBag)
// 실행 시 다음을 출력합니다
0 // 구독 시작 후 5초 뒤에 출력됩니다.
1 // 0이 출력되고 2초 뒤에 출력됩니다.
2 // 1이 출력되고 2초 뒤에 추력됩니다.
3
4
5
.
.
.// 계속 출력됩니다.