let observable = Observable<Int>.create { observer in
observer.onNext(1)
observer.onNext(2)
observer.onCompleted()
return Disposables.create()
}
let disposable = observer.subscribe { value in
print("value: \(value)")
} onError: { error in
print("error: \(error)")
} onCompleted: {
print("Completed")
} onDisposed: {
print("Disposed")
}
제네릭 클래스 Observable<Element>
에 Int 타입을 지정하여,
Int 타입의 값을 방출할 수 있는 Observable 객체를 생성하고 구독.
protocol ObservableType: ObservableConvertibleType {
func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element
}
extension ObservableType {
static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
return AnonymousObservable(subscribe)
}
}
create(_ subscribe:
)는 ObservableType 프로토콜의 타입 메서드로,
((AnyObserver<Element>) -> Disposable)
함수 타입의 escaping closure를 파라미터로 받는다.
함수 타입의 값 (= 클로저)을 받는 subscribe 파라미터는 AnonymousObservable<Element>
생성자 메서드의 파라미터로 전달된다.
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self.subscribeHandler = subscribeHandler
}
// (후략)
}
즉, create(_ subscribe:
)를 호출한다 해서 값이 바로 방출되는 것이 아니라,
AnonymousObservable<Int>
인스턴스를 생성하고 return 한다.
전달한 클로저는 AnonymousObservable의 subscribeHandler에 저장.
extension ObservableType {
func subscribe(onNext: ((Element) -> Void)? = nil,
onError: ((Error) -> Void)? = nil,
onCompleted: (() -> Void)? = nil,
onDisposed: (() -> Void)? = nil) -> Disposable {
let disposable: Disposable
/*
if let disposed = onDisposed {
// 구독 취소시 동작을 지정한다면,
// AnonymousDisposable 인스턴스를 반환하는 create(with:)가 호출됨
disposable = Disposables.create(with: disposed)
} else {
// 그렇지 않으면, NopDisposable 인스턴스를 반환
disposable = Disposables.create()
}
*/
// 현재는 구독 취소시 수행된 비동기 작업이 없으므로, 그냥 빈 disposable 생성
disposable = Disposables.create()
let observer = AnonymousObserver<Element> { event in
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
onError?(error)
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
}
subscribe(onNext:onError:onCompleted:onDisposed:
)는 4개의 옵셔널 함수 타입 파라미터를 가지며, 각각의 기본값은 nil로 설정된 함수.
내부에서 빈 disposable과 AnonymousObserver 인스턴스를 생성한다.
💡 subscribe(
onNext:...
) 자체가 구독자가 아니라 내부에서 구독자를 만들어준다.
final class AnonymousObserver<Element>: ObserverBase<Element> {
typealias EventHandler = (Event<Element>) -> Void
private let eventHandler: EventHandler
init(_ eventHandler: @escaping EventHandler) {
self.eventHandler = eventHandler
}
override func onCore(_ event: Event<Element>) {
self.eventHandler(event)
}
deinit { }
}
AnonymousObserver 생성시, subscribe()에 전달된 클로저를 사용하여 Event의 각 case를 처리하며, error와 completed의 경우에는 dispose()까지 실행한다.
💡 error와 completed에서 dispose()를 호출함으로써, 자동으로 시퀀스가 종료된다.
마지막으로 Disposable 프로토콜 인터페이스를 return 하는데,
Disposables의 create(_ disposable1:, _ disposable2:
)를 호출한다.
extension Disposables {
static func create(_ disposable1: Disposable, _ disposable2: Disposable) -> Cancelable {
return BinaryDisposable(disposable1, disposable2)
}
}
private final class BinaryDisposable: DisposeBase, Cancelable {
private var disposable1: Disposable?
private var disposable2: Disposable?
init(_ disposable1: Disposable, _ disposable2: Disposable) {
self.disposable1 = disposable1
self.disposable2 = disposable2
super.init()
}
// (후략)
}
create(_ disposable1:, _ disposable2:
)는 Disposable 프로토콜을 채택하는 Disposables 구조체의 타입 메서드로, BinaryDisposable 인스턴스를 생성하여 return 한다.
func subscribe(onNext:onError:onCompleted:onDisposed:) -> Disposable {
let disposable: Disposable
disposable = Disposables.create()
let observer = AnonymousObserver<Element> { ... } // (생략)
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
disposable2
에는 빈 disposable을 전달하고, disposable1
은 self.asObservable().subscribe(observer)
를 전달.
extension ObservableType {
func asObservable() -> Observable<Element> {
return Observable.create { observer in self.subscribe(observer) }
}
}
protocol ObservableType: ObservableConvertibleType {
func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element
}
asObservable()은 내부에서 create(_ subscribe:
)을 호출하여 return 한다.
즉, AnonymousObservable 인스턴스를 생성.
![]()
서로 다른 2개의 AnonymousObservable이 생성된다.
AnonymousObservable: Observable<Int>.create()
를 통해 생성된 AnonymousObservable<Int>
AnonymousObservable: 노란색 Observable을 subscribe(onNext:...
) 하면서 self.asObservable()
을 통해 생성된 AnonymousObservable<Int>
이 때, AnonymousObservable의 subscribeHandler 내부에서의 self는
AnonymousObservable.
💡 AnonymousObservable은 Observable의 서브 클래스. (=
Is-a 관계
)즉, AnonymousObservable을 subscribe() 했으며, 전달된 클로저 내부에서의 self의 타입은
AnonymousObservable<Int>
.func asObservable() -> Observable<Element> { return Observable.create { observer in self.subscribe(observer) } }
{ observer in self.subscribe(observer) }
에서의 self는 외부의 Observable 인스턴스를 캡처.즉, '외부의 Observable 인스턴스'는 subscribe() 안의 self인 AnonymousObservable.
self.asObservable()
의 결과는 AnonymousObservable이며,
AnonymousObservable에 대한 subscribe(_ observer:
)를 호출한다.
💡 subscribe(
_ observer:
)는 Observable과 Producer에서 구현되어있음.![]()
상속 관계와 다형성의 특성에 의해 Producer의 override 된 subscribe()를 호출함.
Observable는 제네릭 클래스로 유사 추상 클래스의 역할
class Producer<Element>: Observable<Element> {
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
let disposer = SinkDisposer()
let sinkAndSubsription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubsription.sink, subscription: sinkAndSubsription.subscription)
return disposer
}
}
class SinkDisposer: Cancelable {
private var sink: Disposable?
private var subscription: Disposable?
func setSinkAndSubscription(sink: Disposable, subscription: Disposable) {
self.sink = sink
self.subscription = subscription
}
// (후략)
}
SinkDisposer 인스턴스를 생성하고 전달받은 observer와 함께 run()의 파라미터 값으로 넘겨주어 실행.
그 결과인 sinkAndSubsription을 통해 setSinkAndSubscription()을 호출하여 SinkDisposer의 프로퍼티에 저장.
class Producer<Element>: Observable<Element> {
// (생략)
func run<Observer: ObserverType>(
_ observer: Observer,
cancel: Cancelable
) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
fatalError()
}
}
Producer의 run(_ observer:, cancel:
)은 일종의 추상 메서드 개념으로, 재정의 된 실제 구현부는 서브 클래스인 AnonymousObservable에 존재한다.
💡 이 또한 상속 관계와 다형성 때문에 가능
class AnonymousObservable<Element>: Producer<Element> {
// (생략)
override func run<Observer: ObserverType>(
_ observer: Observer,
cancel: Cancelable
) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
typealias Element = Observer.Element
typealias Parent = AnonymousObservable<Element>
override init(observer: Observer, cancel: Cancelable) {
super.init(observer: observer, cancel: cancel)
}
// (생략)
func run(_ parent: Parent) -> Disposable {
return parent.subscribeHandler(AnyObserver(self))
}
}
run(_ observer:, cancel:
)은 전달받은 observer와 Producer의 override subscribe()에서 생성된 SinkDisposer 객체 disposer
를 AnonymousObservableSink의 생성자 메서드의 값으로 전달하여 sink 객체를 생성하고, sink의 run(_ parent:
)을 실행.
💡 run(
_ observer:, cancel:
)에 전달된 observer는 ObservableType의 subscribe(onNext:...
) 안에서 생성된 AnonymousObserver 인스턴스observer
.let observer = AnonymousObserver<Element> { event in ... }
![]()
class Sink<Observer: ObserverType>: Disposable {
fileprivate let observer: Observer
fileprivate let cancel: Cancelable
init(observer: Observer, cancel: Cancelable) {
self.observer = observer
self.cancel = cancel
}
// (후략)
}
AnonymousObservableSink의 생성자로 전달된 observer와 cancel은 상위 클래스인 Sink의 생성자로 전달된다.
AnonymousObservable.run(_ observer:, cancel:)
override func run<Observer: ObserverType>(
_ observer: Observer,
cancel: Cancelable
) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
그럼 run() 안에서 sink 객체가 생성되며, sink의 run()의 인자로 self를 전달하게 되는데,
이 self는 AnonymousObservable이다.
func run(_ parent: Parent) -> Disposable {
return parent.subscribeHandler(AnyObserver(self))
}
run()의 parent는 typealias Parent = AnonymousObservable<Element>
로 AnonymousObservable이 되며, subscribeHandler를 실행하여 결과를 return 한다.
subscribeHandler 클로저의 인자로 self인 AnonymousObservableSink를 AnyObserver 타입으로 변환한 인스턴스가 들어간다.
AnonymousObservableSink인 self와 AnyObserver는 ObserverType 프로토콜을 채택함으로써, 아래의 생성자 메서드에 의해 타입 캐스팅이 가능.
struct AnyObserver<Element>: ObserverType {
typealias EventHandler = (Event<Element>) -> Void
private let observer: EventHandler
// (생략)
init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
self.observer = observer.on
}
}
init(_ observer:
)에서는 전달받은 ObserverType 인스턴스의 on(_ event:
)를 EventHandler에 저장.
class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
func on(_ event: Event<Element>) {
switch event {
case .next: self.forwardOn(event: event)
case .error, .completed:
self.forwardOn(event: event)
self.dispose()
}
}
}
class Sink<Observer: ObserverType>: Disposable {
func forwardOn(event: Event<Observer.Element>) {
self.observer.on(event)
}
}
on()의 내부에서는 Sink의 fowardOn(event:
)를 호출하고 있으며, Sink에는 AnonymousObserver 인스턴스 observer
가 저장되어있다.
class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
// (생략)
func run(_ parent: Parent) -> Disposable {
return parent.subscribeHandler(AnyObserver(self))
}
}
결국 run(_ parent:
)를 통해 AnonymousObservable의 subscribeHandler인 'self.subscribe(observer)' 가 실행된다.
즉, AnonymousObservable의 subscribeHandler가 실행된다는 것은,
"이제부터는 AnonymousObservable를 subscribe() 한다는 의미" + "값 방출이 발생" 이다.
💡 AnonymousObservable의 subscribeHandler에서
self
는 AnonymousObservable이며, subscribe()에 전달되는observer
는 AnonymousObserver 인스턴스observer
.
그럼 지금까지의 똑같은 과정을 다시 거치게 된다.
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
let disposer = SinkDisposer()
let sinkAndSubsription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubsription.sink, subscription: sinkAndSubsription.subscription)
return disposer
}
Producer의 subscribe()를 통해서 observer
를 run()에 전달하고,
이때의 self는 AnonymousObservable이 된다.
그러면서 다시 아래의 AnonymousObservableSink의 run()
func run(_ parent: Parent) -> Disposable {
return parent.subscribeHandler(AnyObserver(self))
}
parent에 대한 메모리 주소가 AnonymousObservable → AnonymousObservable 순서로 출력됨.
let disposable = observer.subscribe (
// ----- Observation Code -----
onNext: { value in print("value: \(value)") },
onError: { error in print("error: \(error)") },
onCompleted: { print("Completed") },
onDisposed: { print("Disposed") }
)
AnonymousObservable의 subscribeHandler가 실행되면서 Observation Code가 실행된다.
class AnonymousObservable<Element>: Producer<Element> {
// (생략)
override func run<Observer: ObserverType>(
_ observer: Observer,
cancel: Cancelable
) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
subscribeHandler를 마치고 반환된 Disposable은 연속적으로 반환되어서 subscription에 저장되고
run은 sink에는 AnonymousObservableSink의 인스턴스,
subscription에는 subscribeHandler를 모두 실행하고 반환된 Disposable을 저장해 튜플로 return 한다.
class Producer<Element>: Observable<Element> {
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
let disposer = SinkDisposer()
let sinkAndSubsription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubsription.sink, subscription: sinkAndSubsription.subscription)
return disposer
}
}
반환된 sink와 subscription은 disposer 내부에 저장되고, disposer이 다시 return.
subscribe()의 return 타입은 Disposable이며, SinkDisposer는 Disposable을 채택하고 있음.
class SinkDisposer: Cancelable {
private var sink: Disposable?
private var subscription: Disposable?
func setSinkAndSubscription(sink: Disposable, subscription: Disposable) {
self.sink = sink
self.subscription = subscription
}
func dispose() {
self.sink?.dispose()
self.subscription?.dispose()
self.sink = nil
self.subscription = nil
}
}
dispose()는 sink와 subscription의 dispose()를 모두 호출하고 있다.
subscribe()는 총 2번 불렸기 때문에, 이 과정을 한번 더 거쳐서 return 하게 되면 subscribe(onNext:...
)로 돌아오게 된다.
func subscribe(
onNext: ((Element) -> Void)? = nil,
onError: ((Swift.Error) -> Void)? = nil,
onCompleted: (() -> Void)? = nil,
onDisposed: (() -> Void)? = nil
) -> Disposable {
let disposable: Disposable
disposable = Disposables.create()
let observer = AnonymousObserver<Element> { event in
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
onError?(error)
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
마지막으로 반환되는 Disposable에는 두 종류의 disposable이 들어간다.
첫번째는 자기자신에 대한 subscribe를 만들면서 옵저버를 등록했던 Disposable
두번째는 subscribe안에서 만들었던 disposable