[RxSwift] - Time Based Operators (Buffer,DelaySubscription,Replay, delay, interval, timer, window)

sun02·2022년 2월 17일
0

RxSwift

목록 보기
7/12

1. Replay

observable이 item을 방출한 이후에 구독하였더라도 모든 subscriber가 동일한 방출된 item의 sequence를 볼 수 있게 함

let hi = PublishSubject<String>()
let parrot = hi.replay(1)
parrot.connect()

hi.onNext("hi")
hi.onNext("hello")

parrot
    .subscribe(onNext: {
        print($0)
    })
    .disposed(by: disposeBag)
hi.onNext("good")
  • replay관련 연산자를 사용할 땐 꼭 connect()로 연결해주어야합니다.
  • replay(bufferSize)에 1을 작성해 주었으므로 가장 최신으로 배출된 item 1개를 받습니다.

// 실행 시 다음을 출력합니다.
hello
good

만약 bufferSize가 2라면 hello전에 hi도 출력합니다.

+ replayAll

: replay와 같지만 buffersize와 관계없이 구독 이전에 방출된 item을 모두 나타냅니다.

let a = PublishSubject<String>()
let statements = a.replayAll()
statements.connect()

a.onNext("a")
a.onNext("b")

statements
    .subscribe(onNext: {
        print($0)
    })
    .disposed(by: disposeBag)

// 실행 시 다음을 출력합니다.
a
b

2. Buffer

: observable이 방출한 item을 주기적으로 모아 한번에 한 item씩이 아니라 bundle로 배출함


 public func buffer(timeSpan: RxTimeInterval, count: Int, scheduler: SchedulerType)
        -> Observable<[Element]> {
        BufferTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler)
    }
  • timeSpan : buffer의 최대 시간 길이
  • count: buffer가 최대로 가지고 있을 수 있는 item의 수
  • scheduler: buffer 타이머를 실행하는 스케쥴러
let source = PublishSubject<String>()

var count = 0
let timer = DispatchSource.makeTimerSource()

timer.schedule(deadline: .now() + 2, repeating: .seconds(1))
timer.setEventHandler {
    count += 1
    source.onNext("\(count)")
}
timer.resume()

source
    .buffer(timeSpan: .seconds(2), count: 2, scheduler: MainScheduler.instance)
    .subscribe(onNext: {
        print($0)
    })
    .disposed(by: disposeBag)
  • 첫 구독, 혹은 이전 이벤트 발생 이후로 최대 2초가 지나거나
  • buffer가 item을 2개이상 가지고 있는 경우
    이벤트를 방출합니다.

// 실행 시 다음을 출력합니다.

["1"] // 2초 내에 1밖에 받지 못했기 때문에 1만 출력

["2", "3"] // item 수가 최대(2)에 도달했기 때문에 출력

["4", "5"]

["6", "7"]

["8", "9"]

["10", "11"]

["12"]

["13", "14"]

["15", "16"]

["17", "18"]
.
.
. // 타이머이므로 계속해서 출력됩니다.

3. Window

: observable의 항목을 주기적으로 observable window로 나누고 이러한 observable window를 방출합니다.

 public func window(timeSpan: RxTimeInterval, count: Int, scheduler: SchedulerType)
        -> Observable<Observable<Element>> {
            return WindowTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler)
    }
  • buffer와 인자는 같지만 반환값이 다릅니다.
  • buffer는 Observable<[Element]> 를 방출지만, window는 Observable<Observable<Element'>> 를 방출합니다.

observable들을 병합하여 하나의 observable sequence로 방출하는 flatMap을 사용하여 window operator로 처리된 결과를 알아보겠습니다.

let window = PublishSubject<String>()
var windowCount = 0
let windowTimeSource = DispatchSource.makeTimerSource()

windowTimeSource.schedule(deadline: .now()+2, repeating: .seconds(1))
windowTimeSource.setEventHandler {
    windowCount += 1
    window.onNext("\(windowCount)")
}
windowTimeSource.resume()

window
    .window(timeSpan: .seconds(3), count: 3, scheduler: MainScheduler.instance)
    .flatMap { windowObservable -> Observable<(index: Int, element: String)> in
        return windowObservable.enumerated()
    }
    .subscribe(onNext: {
        print("\($0.index) 번째 observable의 요소 \($0.element)")
    })
    .disposed(by: disposeBag)
   
  • 첫 구독, 혹은 이전 이벤트 발생 이후로 최대 3초가 지나거나
  • window가 item을 3개이상 가지고 있는 경우
    이벤트를 방출합니다.

// 실행 시 다음을 출력합니다.
0 번째 observable의 요소 1
1 번째 observable의 요소 2
0 번째 observable의 요소 3
1 번째 observable의 요소 4
2 번째 observable의 요소 5
0 번째 observable의 요소 6
1 번째 observable의 요소 7
2 번째 observable의 요소 8
0 번째 observable의 요소 9
1 번째 observable의 요소 10
2 번째 observable의 요소 11
.
.
.// 타이머기에 계속해서 진행됩니다.

  • timespan이 3이고, count가 3이기에 다음과 같이 출력됩니다.
  • 만약 count가 3이여도 timespan이 2라면 0, 1번째 observable의 요소만 출력됩니다.

4. DelaySubscription

: 소스 observable에 대한 구독을 지연시킬 수 있는 연산자입니다

public func delaySubscription(_ dueTime: RxTimeInterval, scheduler: SchedulerType)
        -> Observable<Element> {
        DelaySubscription(source: self.asObservable(), dueTime: dueTime, scheduler: scheduler)
    }
  • dueTime 매개변수에 지연될 시간을 작성합니다.
let delaysource = PublishSubject<String>()

var delayCount = 0
let delayTimeSource = DispatchSource.makeTimerSource()
delayTimeSource.schedule(deadline: .now(), repeating: .seconds(1))
delayTimeSource.setEventHandler {
    delayCount += 1
    delaysource.onNext("\(delayCount)")
}
delayTimeSource.resume()

delaysource
    .delaySubscription(.seconds(5), scheduler: MainScheduler.instance)
    .subscribe(onNext: {
        print($0)
    })
    .disposed(by: disposeBag)
  • 5초 뒤에 구독을 시작하도록합니다.

// 실행시 다음을 출력합니다.
6
7
8
9
10
11
12
13
.
.
. // 타이머이므로 계속해서 출력됩니다.

+ Delay

: 특정 시간만큼 observable을 이동하여 방출을 늦춥니다.

public func delay(_ dueTime: RxTimeInterval, scheduler: SchedulerType)
        -> Observable<Element> {
            return Delay(source: self.asObservable(), dueTime: dueTime, scheduler: scheduler)
    }
    
  • dueTime 매개변수에 방출을 늦출 시간을 작성합니다.
let delaySubject = PublishSubject<String>()

var delayCount = 0
let delayTimerSource = DispatchSource.makeTimerSource()
delayTimerSource.schedule(deadline: .now(), repeating: .seconds(1))
delayTimerSource.setEventHandler {
    delayCount += 1
    delaySubject.onNext("\(delayCount)")
}
delayTimerSource.resume()

delaySubject
    .delay(.seconds(3), scheduler: MainScheduler.instance)
    .subscribe(onNext: {
        print($0)
    })
    .disposed(by: disposeBag)
  • 3초 미뤄서 방출합니다.

// 실행 시 다음을 출력합니다.
2
3
4
5
6
7
8
9
10
11
12

만약 delay(.seconds(5), scheduler: MainScheduler.instance) 였더라도 출력값은 같습니다.
왜냐하면 그저 방출 시간만 미뤄진 것이기 떄문입니다.

그렇다면 그 차이를 어떻게 알 수 있을까?

다음과 같이 3초 delay 된 경우에는 timer와 횟수가 4회 차이나지만,
5초 delay 된 경우에는 2초 더 늦어졌기 때문에
timer와 횟수가 2회 더 추가된 6회 차이나는 것을 볼 수 있습니다.

5. Interval

: 주어진 시간 간격만큼 정수를 방출하는 Observable을 생성합니다.


Observable<Int>
	.interval(.second(3), schedule: MainSchedule.instance)
    .subscribe(onNext: {
    	print($0)
    }
    .dispose(by: disposeBag)
    

// 실행 시 다음을 출력합니다.
0
1
2
3
.
.
.// 계속 출력됩니다.

6. Timer

: 주어진 만큼 지연한 이후에 item을 방출하는 observable을 생성합니다.

public static func timer(_ dueTime: RxSwift.RxTimeInterval, period: RxSwift.RxTimeInterval? = nil, scheduler: RxSwift.SchedulerType) -> RxSwift.Observable<Self.Element>
  • dueTime : 제일 첫번째 값을 언제 생성할 것인지
  • period : 그 다음 값을 생성하는 시간 간격
Observable<Int>
    .timer(.seconds(5), period: .seconds(2), scheduler: MainScheduler.instance)
    .subscribe(onNext: {
        print($0)
    })
    .disposed(by: disposeBag)

// 실행 시 다음을 출력합니다
0 // 구독 시작 후 5초 뒤에 출력됩니다.
1 // 0이 출력되고 2초 뒤에 출력됩니다.
2 // 1이 출력되고 2초 뒤에 추력됩니다.
3
4
5
.
.
.// 계속 출력됩니다.

출처 - Reactive X

0개의 댓글