[RxSwift] subscribe(on:) vs observe(on:)

Benedicto·2025년 4월 24일
0

iOS

목록 보기
28/29

비동기 작업을 처리하기 위해 GCD를 사용했다면, RxSwift에서는 Scheduler를 통해 작업을 비동기적으로 처리한다. 즉, Scheduler는 작업의 실행 위치와 순서를 제어하는 역할을 하며, GCD에 대응되는 개념이다.

Scheduler를 사용하는 방법은 주로 subscribe(on:)observe(on:)을 통해 이루어지며, Observable을 어디서 생성할지와 Observer를 어디서 관찰할지를 지정해 줄 수 있다.


observe(on:)과 subscribe(on:)

Observable이 subscription 되는 흐름으로써,

  1. create()로 Observable을 정의 / 생성하며, 이 시점에서는 Observable이 아직 생성되지 않는다.
  2. map, filter 등 Operators를 사용하여 데이터를 변형 (조작)한다.
  3. Sugar API인 subscribe() 호출시, Observable이 생성된다.

Observable.create{...}는 (Subscription Code) 로써, subscribe()로부터 호출된다.
이 함수는 subscription을 만들고 elements를 생성하는 작업을 수행한다.

subscribe()는 (Observation Code) 로써, 방출된 값을 관찰하는 곳.

// MARK - Create.swift
extension ObservableType {
    public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
        AnonymousObservable(subscribe)
    }
}

// MARK - ObservableType+Extensions.swift
extension ObservableType {
    public func subscribe(_ on: @escaping (Event<Element>) -> Void) -> Disposable {
        let observer = AnonymousObserver { e in 
            on(e)
        }
        return self.asObservable().subscribe(observer)
    }
}

// MARK - ObservableType.swift
public protocol ObservableType: ObservableConvertibleType {
    func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element
}

extension ObservableType {
    public func asObservable() -> Observable<Element> {
        Observable.create { o in self.subscribe(o) }
    }
}

subscribe(on:)

subscribe(on:)은 Subscription Code가 실행될 스케줄러를 바꾸는 메서드이다.
기본적으로 Observable 생성은 subscribe(on:)을 호출한 thread에서 이루어지며, subscribe(on:)을 통해 Observable이 생성될 thread를 바꿔준다.

override func viewDidAppear(_ animated: Bool) {
    super.viewDidAppear(animated)
    
    fetch()
        .subscribe(on: ConcurrentDispatchQueueScheduler(qos: .background))
        .subscribe(onNext: { person in
            print(("***** [Observation Code가 동작되는 스레드]: \(Thread.current) *****"))
            self.nameObservable.onNext(person)
        })
        .disposed(by: disposeBag)
}

func fetchData() -> Observable<Person> {
    return Observable.create { observer in
        print("***** [Subscription Code가 동작되는 스레드]: \(Thread.current) *****")
        
        URLSession.shared.dataTask(with: URL(string: "<#URL#>")!) { data, response, error in
            print("***** [Closure Code가 동작되는 스레드]: \(Thread.current) *****")
            guard let safeData = data else { return }
            do {
                let decode = try JSONDecoder().decode(Person.self, from: safeData)
                
                observer.onNext(decode)
                observer.onCompleted()
            } catch {
                observer.onError(error)
            }
        }.resume()
        
        return Disposables.create()
    }    
}

🧵 스레드 흐름
[Main] subscribe() -> [Background] create {...} -> [Background] onNext {...}

만약, subscribe() 내에서 UI 작업이 필요하다면 이는 Main Thread에서 실행되어야 한다.

.subscribe(onNext: { person in
    print(("***** [Observation Code가 동작되는 스레드]: \(Thread.current) *****"))
    self.nameObservable.onNext(person)
    
    //	UI 작업
    DispatchQueue.main.async {
        self.nameLabel.text = person.name
    }
})

작업의 흐름을 background 스레드로 바꿔줬기에, 이를 main 스레드로 옮겨줘야 한다. 그러나 매번 DispatchQueue.main.async {...} 를 작성해주는 것은 번거로운 작업이므로, 이를 대신해서 나온 Sugar API가 observe(on:) 이다.


observe(on:)

observe(on:)은 Observation Code가 실행될 스케줄러를 바꾸는 메서드이다.
값 방출시, 스케줄러를 변경할 필요가 있다면 observe(on:)을 사용하여 실행 흐름을 바꿔준다.

fetch()
    .subscribe(on: ConcurrentDispatchQueueScheduler(qos: .background))
    .observe(on: MainScheduler.instance)
    .subscribe(onNext: { person in
        print(("***** [Observation Code가 동작되는 스레드]: \(Thread.current) *****"))
        self.nameObservable.onNext(person)
    })
    .disposed(by: disposeBag)

🧵 스레드 흐름
[Main] subscribe() -> [Background] create {...} -> [Main] onNext {...}


ref.

profile
 Developer

0개의 댓글