[RxSwift] Observable 생성 / 구독의 동작 원리

Benedicto·2025년 5월 3일
0

iOS

목록 보기
29/30
let observable = Observable<Int>.create { observer in
	observer.onNext(1)
    observer.onNext(2)
    observer.onCompleted()
    
    return Disposables.create()
}
let disposable = observer.subscribe { value in
	print("value: \(value)")
} onError: { error in
	print("error: \(error)")
} onCompleted: {
	print("Completed")
} onDisposed: {
	print("Disposed")
}

제네릭 클래스 Observable<Element>에 Int 타입을 지정하여,
Int 타입의 값을 방출할 수 있는 Observable 객체를 생성하고 구독.

  • "마블 다이어그램"을 활용한 Observable의 흐름과 전환 구조
  • Observable 생성 / 구독의 클래스 계층 다이어그램

Observable의 생성 - .create()

protocol ObservableType: ObservableConvertibleType {
    func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element
}

extension ObservableType {
    static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
        return AnonymousObservable(subscribe)
    }
}

create(_ subscribe:)는 ObservableType 프로토콜의 타입 메서드로,
((AnyObserver<Element>) -> Disposable) 함수 타입의 escaping closure를 파라미터로 받는다.

함수 타입의 값 (= 클로저)을 받는 subscribe 파라미터는 AnonymousObservable<Element> 생성자 메서드의 파라미터로 전달된다.

final private class AnonymousObservable<Element>: Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
    
    let subscribeHandler: SubscribeHandler
    
    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self.subscribeHandler = subscribeHandler
    }
    
    //	(후략)
}

즉, create(_ subscribe:)를 호출한다 해서 값이 바로 방출되는 것이 아니라,
AnonymousObservable<Int> 인스턴스를 생성하고 return 한다.

전달한 클로저는 AnonymousObservable의 subscribeHandler에 저장.


Observable의 구독 - .subscribe()

extension ObservableType {
    func subscribe(onNext: ((Element) -> Void)? = nil,
                   onError: ((Error) -> Void)? = nil,
                   onCompleted: (() -> Void)? = nil,
                   onDisposed: (() -> Void)? = nil) -> Disposable {
        
        let disposable: Disposable
        
        /*
        if let disposed = onDisposed {
            //  구독 취소시 동작을 지정한다면,
            //  AnonymousDisposable 인스턴스를 반환하는 create(with:)가 호출됨
            disposable = Disposables.create(with: disposed)
        } else {
            //  그렇지 않으면, NopDisposable 인스턴스를 반환
            disposable = Disposables.create()
        }
         */
        
        //	현재는 구독 취소시 수행된 비동기 작업이 없으므로, 그냥 빈 disposable 생성
        disposable = Disposables.create()
        
        let observer = AnonymousObserver<Element> { event in
            switch event {
            case .next(let value):
                onNext?(value)
            case .error(let error):
                onError?(error)
                disposable.dispose()
            case .completed:
                onCompleted?()
                disposable.dispose()
            }
        }
        
        return Disposables.create(
        	self.asObservable().subscribe(observer), 
            disposable
            )
    }
}

subscribe(onNext:onError:onCompleted:onDisposed:)는 4개의 옵셔널 함수 타입 파라미터를 가지며, 각각의 기본값은 nil로 설정된 함수.

내부에서 빈 disposable과 AnonymousObserver 인스턴스를 생성한다.

💡 subscribe(onNext:...) 자체가 구독자가 아니라 내부에서 구독자를 만들어준다.

final class AnonymousObserver<Element>: ObserverBase<Element> {
    typealias EventHandler = (Event<Element>) -> Void
    
    private let eventHandler: EventHandler
    
    init(_ eventHandler: @escaping EventHandler) {
        self.eventHandler = eventHandler
    }
    
    override func onCore(_ event: Event<Element>) {
        self.eventHandler(event)
    }
    
    deinit { }
}

AnonymousObserver 생성시, subscribe()에 전달된 클로저를 사용하여 Event의 각 case를 처리하며, error와 completed의 경우에는 dispose()까지 실행한다.

💡 error와 completed에서 dispose()를 호출함으로써, 자동으로 시퀀스가 종료된다.

마지막으로 Disposable 프로토콜 인터페이스를 return 하는데,
Disposables의 create(_ disposable1:, _ disposable2:)를 호출한다.

extension Disposables {
    static func create(_ disposable1: Disposable, _ disposable2: Disposable) -> Cancelable {
        return BinaryDisposable(disposable1, disposable2)
    }
}
private final class BinaryDisposable: DisposeBase, Cancelable {
    
    private var disposable1: Disposable?
    private var disposable2: Disposable?
    
    init(_ disposable1: Disposable, _ disposable2: Disposable) {
        self.disposable1 = disposable1
        self.disposable2 = disposable2
        super.init()
    }
    
    //	(후략)
}

create(_ disposable1:, _ disposable2:)는 Disposable 프로토콜을 채택하는 Disposables 구조체의 타입 메서드로, BinaryDisposable 인스턴스를 생성하여 return 한다.

func subscribe(onNext:onError:onCompleted:onDisposed:) -> Disposable {

	let disposable: Disposable
    disposable = Disposables.create()
    
    let observer = AnonymousObserver<Element> { ... }	//	(생략)
    
	return Disposables.create(
    	self.asObservable().subscribe(observer), 
        disposable
        )
}

disposable2에는 빈 disposable을 전달하고, disposable1self.asObservable().subscribe(observer)를 전달.

extension ObservableType {
    func asObservable() -> Observable<Element> {
        return Observable.create { observer in self.subscribe(observer) }
    }
}
protocol ObservableType: ObservableConvertibleType {
    func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable  where Observer.Element == Element
}

asObservable()은 내부에서 create(_ subscribe:)을 호출하여 return 한다.
즉, AnonymousObservable 인스턴스를 생성.

서로 다른 2개의 AnonymousObservable이 생성된다.

  • AnonymousObservable: Observable<Int>.create()를 통해 생성된 AnonymousObservable<Int>

  • AnonymousObservable: 노란색 Observable을 subscribe(onNext:...) 하면서 self.asObservable()을 통해 생성된 AnonymousObservable<Int>

이 때, AnonymousObservable의 subscribeHandler 내부에서의 self는
AnonymousObservable
.

💡 AnonymousObservable은 Observable의 서브 클래스. (= Is-a 관계)

즉, AnonymousObservable을 subscribe() 했으며, 전달된 클로저 내부에서의 self의 타입은 AnonymousObservable<Int>.

func asObservable() -> Observable<Element> {
    return Observable.create { observer in self.subscribe(observer) }
}

{ observer in self.subscribe(observer) }에서의 self는 외부의 Observable 인스턴스를 캡처.

즉, '외부의 Observable 인스턴스'는 subscribe() 안의 self인 AnonymousObservable.

self.asObservable()의 결과는 AnonymousObservable이며,
AnonymousObservable에 대한 subscribe(_ observer:)를 호출한다.

💡 subscribe(_ observer:)는 Observable과 Producer에서 구현되어있음.

상속 관계와 다형성의 특성에 의해 Producer의 override 된 subscribe()를 호출함.

Observable는 제네릭 클래스로 유사 추상 클래스의 역할

class Producer<Element>: Observable<Element> {
    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        let disposer = SinkDisposer()
        let sinkAndSubsription = self.run(observer, cancel: disposer)
        
        disposer.setSinkAndSubscription(sink: sinkAndSubsription.sink, subscription: sinkAndSubsription.subscription)
        return disposer
    }
}
class SinkDisposer: Cancelable {

    private var sink: Disposable?
    private var subscription: Disposable?
    
    func setSinkAndSubscription(sink: Disposable, subscription: Disposable) {
        self.sink = sink
        self.subscription = subscription
    }
    
    //	(후략)
}

SinkDisposer 인스턴스를 생성하고 전달받은 observer와 함께 run()의 파라미터 값으로 넘겨주어 실행.
그 결과인 sinkAndSubsription을 통해 setSinkAndSubscription()을 호출하여 SinkDisposer의 프로퍼티에 저장.

class Producer<Element>: Observable<Element> {
	//	(생략)
    
	func run<Observer: ObserverType>(
    	_ observer: Observer,
    	cancel: Cancelable
    ) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
    	fatalError()
	}
}

Producer의 run(_ observer:, cancel:)은 일종의 추상 메서드 개념으로, 재정의 된 실제 구현부는 서브 클래스인 AnonymousObservable에 존재한다.

💡 이 또한 상속 관계와 다형성 때문에 가능

class AnonymousObservable<Element>: Producer<Element> {
	//	(생략)
    
    override func run<Observer: ObserverType>(
    	_ observer: Observer, 
        cancel: Cancelable
    ) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
    
    	let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        
        return (sink: sink, subscription: subscription)
    }
}
class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
    typealias Element = Observer.Element
    typealias Parent = AnonymousObservable<Element>
    
    override init(observer: Observer, cancel: Cancelable) {
        super.init(observer: observer, cancel: cancel)
    }
    
    //	(생략)	
    
    func run(_ parent: Parent) -> Disposable {
        return parent.subscribeHandler(AnyObserver(self))
    }
}

run(_ observer:, cancel:)은 전달받은 observer와 Producer의 override subscribe()에서 생성된 SinkDisposer 객체 disposer를 AnonymousObservableSink의 생성자 메서드의 값으로 전달하여 sink 객체를 생성하고, sink의 run(_ parent:)을 실행.

💡 run(_ observer:, cancel:)에 전달된 observer는 ObservableType의 subscribe(onNext:...) 안에서 생성된 AnonymousObserver 인스턴스 observer.

let observer = AnonymousObserver<Element> { event in ... }
class Sink<Observer: ObserverType>: Disposable {
    fileprivate let observer: Observer
    fileprivate let cancel: Cancelable
    
    init(observer: Observer, cancel: Cancelable) {
        self.observer = observer
        self.cancel = cancel
    }
    
    //	(후략)
}

AnonymousObservableSink의 생성자로 전달된 observer와 cancel은 상위 클래스인 Sink의 생성자로 전달된다.

  • AnonymousObservable.run(_ observer:, cancel:)

    override func run<Observer: ObserverType>(
        _ observer: Observer, 
        cancel: Cancelable
    ) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
    
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
    
        return (sink: sink, subscription: subscription)
    }

그럼 run() 안에서 sink 객체가 생성되며, sink의 run()의 인자로 self를 전달하게 되는데,
이 self는 AnonymousObservable이다.

  • AnonymousObservableSink.run(_ parent:)
    func run(_ parent: Parent) -> Disposable {
        return parent.subscribeHandler(AnyObserver(self))
    }

run()의 parent는 typealias Parent = AnonymousObservable<Element>AnonymousObservable이 되며, subscribeHandler를 실행하여 결과를 return 한다.

  • AnonymousObservable의 subscribeHandler

subscribeHandler 클로저의 인자로 self인 AnonymousObservableSink를 AnyObserver 타입으로 변환한 인스턴스가 들어간다.

  • self는 Sink에 observer를 전달

AnonymousObservableSink인 self와 AnyObserver는 ObserverType 프로토콜을 채택함으로써, 아래의 생성자 메서드에 의해 타입 캐스팅이 가능.

struct AnyObserver<Element>: ObserverType {
	typealias EventHandler = (Event<Element>) -> Void
    private let observer: EventHandler
    
	//	(생략)
    
    init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
        self.observer = observer.on
    }
}

init(_ observer:)에서는 전달받은 ObserverType 인스턴스의 on(_ event:)를 EventHandler에 저장.

class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
    
    func on(_ event: Event<Element>) {
        switch event {
        case .next: self.forwardOn(event: event)
        case .error, .completed:
            self.forwardOn(event: event)
            self.dispose()
        }
    }
}
class Sink<Observer: ObserverType>: Disposable {

	func forwardOn(event: Event<Observer.Element>) {
        self.observer.on(event)
    }
}

on()의 내부에서는 Sink의 fowardOn(event:)를 호출하고 있으며, Sink에는 AnonymousObserver 인스턴스 observer가 저장되어있다.

class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
	//	(생략)
    
	func run(_ parent: Parent) -> Disposable {
    	return parent.subscribeHandler(AnyObserver(self))
	}
}

결국 run(_ parent:)를 통해 AnonymousObservable의 subscribeHandler인 'self.subscribe(observer)' 가 실행된다.

즉, AnonymousObservable의 subscribeHandler가 실행된다는 것은,
"이제부터는 AnonymousObservable를 subscribe() 한다는 의미" + "값 방출이 발생" 이다.

💡 AnonymousObservable의 subscribeHandler에서 selfAnonymousObservable이며, subscribe()에 전달되는 observerAnonymousObserver 인스턴스 observer.

그럼 지금까지의 똑같은 과정을 다시 거치게 된다.

override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
	
    let disposer = SinkDisposer()
    let sinkAndSubsription = self.run(observer, cancel: disposer)
    
    disposer.setSinkAndSubscription(sink: sinkAndSubsription.sink, subscription: sinkAndSubsription.subscription)
    return disposer
}

Producer의 subscribe()를 통해서 observer를 run()에 전달하고,
이때의 self는 AnonymousObservable이 된다.

그러면서 다시 아래의 AnonymousObservableSink의 run()

func run(_ parent: Parent) -> Disposable {
	return parent.subscribeHandler(AnyObserver(self))
}
AnonymousObservable이 parent로 전달되며, subscribeHandler가 실행.

parent에 대한 메모리 주소가 AnonymousObservableAnonymousObservable 순서로 출력됨.

let disposable = observer.subscribe (
	//	----- Observation Code -----
    onNext: { value in print("value: \(value)") },
    onError: { error in print("error: \(error)") },
    onCompleted: { print("Completed") },
    onDisposed: { print("Disposed") }
)

AnonymousObservable의 subscribeHandler가 실행되면서 Observation Code가 실행된다.

class AnonymousObservable<Element>: Producer<Element> {
	//	(생략)
    
    override func run<Observer: ObserverType>(
        _ observer: Observer,
        cancel: Cancelable
    ) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        
        return (sink: sink, subscription: subscription)
    }
}

subscribeHandler를 마치고 반환된 Disposable은 연속적으로 반환되어서 subscription에 저장되고

run은 sink에는 AnonymousObservableSink의 인스턴스,
subscription에는 subscribeHandler를 모두 실행하고 반환된 Disposable을 저장해 튜플로 return 한다.

class Producer<Element>: Observable<Element> {
	
    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        
        let disposer = SinkDisposer()
        let sinkAndSubsription = self.run(observer, cancel: disposer)
        
        disposer.setSinkAndSubscription(sink: sinkAndSubsription.sink, subscription: sinkAndSubsription.subscription)
        return disposer
    }
}

반환된 sink와 subscription은 disposer 내부에 저장되고, disposer이 다시 return.
subscribe()의 return 타입은 Disposable이며, SinkDisposer는 Disposable을 채택하고 있음.

class SinkDisposer: Cancelable {
	
    private var sink: Disposable?
    private var subscription: Disposable?
    
    func setSinkAndSubscription(sink: Disposable, subscription: Disposable) {
        self.sink = sink
        self.subscription = subscription
    }
    
    func dispose() {
        self.sink?.dispose()
        self.subscription?.dispose()
        self.sink = nil
        self.subscription = nil
    }
}

dispose()는 sink와 subscription의 dispose()를 모두 호출하고 있다.

subscribe()는 총 2번 불렸기 때문에, 이 과정을 한번 더 거쳐서 return 하게 되면 subscribe(onNext:...)로 돌아오게 된다.

func subscribe(
	onNext: ((Element) -> Void)? = nil,
    onError: ((Swift.Error) -> Void)? = nil,
    onCompleted: (() -> Void)? = nil,
    onDisposed: (() -> Void)? = nil
) -> Disposable {
	let disposable: Disposable
    
    disposable = Disposables.create()
    
    let observer = AnonymousObserver<Element> { event in
        switch event {
        case .next(let value):
            onNext?(value)
        case .error(let error):
            onError?(error)
            disposable.dispose()
        case .completed:
            onCompleted?()
            disposable.dispose()
        }
	}
    
    return Disposables.create(
        self.asObservable().subscribe(observer),
        disposable
    )
}

마지막으로 반환되는 Disposable에는 두 종류의 disposable이 들어간다.

  • 첫번째는 자기자신에 대한 subscribe를 만들면서 옵저버를 등록했던 Disposable

  • 두번째는 subscribe안에서 만들었던 disposable

profile
 Developer

0개의 댓글