비동기 작업을 처리하기 위해 GCD를 사용했다면, RxSwift에서는 Scheduler를 통해 작업을 비동기적으로 처리한다. 즉, Scheduler는 작업의 실행 위치와 순서를 제어하는 역할을 하며, GCD에 대응되는 개념이다.
Scheduler를 사용하는 방법은 주로 subscribe(on:)
과 observe(on:)
을 통해 이루어지며, Observable을 어디서 생성할지와 Observer를 어디서 관찰할지를 지정해 줄 수 있다.
Observable이 subscription 되는 흐름으로써,
Observable.create{...}
는 (Subscription Code) 로써, subscribe()로부터 호출된다.
이 함수는 subscription을 만들고 elements를 생성하는 작업을 수행한다.
subscribe()
는 (Observation Code) 로써, 방출된 값을 관찰하는 곳.
// MARK - Create.swift
extension ObservableType {
public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
AnonymousObservable(subscribe)
}
}
// MARK - ObservableType+Extensions.swift
extension ObservableType {
public func subscribe(_ on: @escaping (Event<Element>) -> Void) -> Disposable {
let observer = AnonymousObserver { e in
on(e)
}
return self.asObservable().subscribe(observer)
}
}
// MARK - ObservableType.swift
public protocol ObservableType: ObservableConvertibleType {
func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element
}
extension ObservableType {
public func asObservable() -> Observable<Element> {
Observable.create { o in self.subscribe(o) }
}
}
subscribe(on:)
은 Subscription Code가 실행될 스케줄러를 바꾸는 메서드이다.
기본적으로 Observable 생성은 subscribe(on:)
을 호출한 thread에서 이루어지며, subscribe(on:)
을 통해 Observable이 생성될 thread를 바꿔준다.
override func viewDidAppear(_ animated: Bool) {
super.viewDidAppear(animated)
fetch()
.subscribe(on: ConcurrentDispatchQueueScheduler(qos: .background))
.subscribe(onNext: { person in
print(("***** [Observation Code가 동작되는 스레드]: \(Thread.current) *****"))
self.nameObservable.onNext(person)
})
.disposed(by: disposeBag)
}
func fetchData() -> Observable<Person> {
return Observable.create { observer in
print("***** [Subscription Code가 동작되는 스레드]: \(Thread.current) *****")
URLSession.shared.dataTask(with: URL(string: "<#URL#>")!) { data, response, error in
print("***** [Closure Code가 동작되는 스레드]: \(Thread.current) *****")
guard let safeData = data else { return }
do {
let decode = try JSONDecoder().decode(Person.self, from: safeData)
observer.onNext(decode)
observer.onCompleted()
} catch {
observer.onError(error)
}
}.resume()
return Disposables.create()
}
}
🧵 스레드 흐름
[Main] subscribe() -> [Background] create {...} -> [Background] onNext {...}
만약, subscribe() 내에서 UI 작업이 필요하다면 이는 Main Thread에서 실행되어야 한다.
.subscribe(onNext: { person in
print(("***** [Observation Code가 동작되는 스레드]: \(Thread.current) *****"))
self.nameObservable.onNext(person)
// UI 작업
DispatchQueue.main.async {
self.nameLabel.text = person.name
}
})
작업의 흐름을 background 스레드로 바꿔줬기에, 이를 main 스레드로 옮겨줘야 한다. 그러나 매번 DispatchQueue.main.async {...}
를 작성해주는 것은 번거로운 작업이므로, 이를 대신해서 나온 Sugar API가 observe(on:)
이다.
observe(on:)
은 Observation Code가 실행될 스케줄러를 바꾸는 메서드이다.
값 방출시, 스케줄러를 변경할 필요가 있다면 observe(on:)을 사용하여 실행 흐름을 바꿔준다.
fetch()
.subscribe(on: ConcurrentDispatchQueueScheduler(qos: .background))
.observe(on: MainScheduler.instance)
.subscribe(onNext: { person in
print(("***** [Observation Code가 동작되는 스레드]: \(Thread.current) *****"))
self.nameObservable.onNext(person)
})
.disposed(by: disposeBag)
🧵 스레드 흐름
[Main] subscribe() -> [Background] create {...} -> [Main] onNext {...}
ref.
- https://jouureee.tistory.com/169
- https://jeonyeohun.tistory.com/category/%F0%9F%8D%8E%20%EC%95%84%EC%9D%B4-%EC%98%A4-%EC%97%90%EC%8A%A4/%E2%9A%A1%EF%B8%8F%20RxSwift
- https://medium.com/swift-india/rxswift-subscribeon-vs-observeon-2b121ba95161
- https://minsone.github.io/programming/reactive-swift-observeon-subscribeon