[RxSwift] Observable, Observer

김개발소발·2022년 9월 23일
0

Rx

목록 보기
2/3

Observable

ObservableSequence와 같고,
Observer.subscribeSequence.makeIterator와 같다
Observable의 큰 이점은 비동기로 이벤트를 전달할 수 있다는 점이다

Observable은 크게 next, error, completed 이벤트를 구독자(Observer)에게 전달할 수 있습니다.
구독자에게 연속적인 데이터를 전달할 때는 .next(Element)를 전달하여 onNext 이벤트를 호출하여 전달합니다.
이벤트 전달이 종료되면 .completed를 전달하여 구독자(Observer)의 onCompleted메소드를 호출하여 종료합니다.

만약, 중간에 오류로 비정상적으로 종료될 경우 .error(Swift.Error)를 전달하여, 구독자(Observer) 의onError를 통해 에러를 전달하고 종료됩니다.

next(E): 시퀀스의 한 이벤트(데이터)를 방출
completed: 시퀀스의 정상적 종료를 알리는 이벤트(노티피케이션)
error(Error): 시퀀스의 비정상적 종료를 알리는 이벤트(노티피케이션)

마블 다이어그램

아래와 같은 시퀀스(Observable)이 있다고 가정하면

--1--2--3--4--5--6--| 
// next 방출 후 정상적인(completed) 종료

1,2,3,4,5,6 는 next를 통해 방출된 이벤트
마지막의 | 문자는 정상적으로 종료를 알리는 completed 이벤트입니다.

만약, 오류 발생으로 비정상적인 종료가 된다면 마지막이 X로 표시합니다.

--1--2--3--4--5--6--X 
// // next 방출 후 비정상적인(error) 종료

Observable의 구조


ObservableObservableType이라는 프로토콜을 채택하고 있고, subscribe 메소드가 정의되어 있습니다
Observer가 구독하기 위해 등록하는 Observable.subscribe 메소드는 ObservableType에 정의 되어 있습니다.

ObservableType 프로토콜은 ObservableConvirtiableType이라는 프로토콜을 상속하고 있습니다.
ObservableConvirtibleTypeasObservable 메소드를 정의하여 채택되는 클래스를 Observable로 캐스팅할 수 있도록 지원합니다.

RxCocoa 라이브러리에는 Drive 캐스팅하기 위한 메소드 asDrive도 정의되어 있습니다. 이 주제는 따로 다루겠습니다.

// RxCocoa, extension ObservableConvertibleType
public func asDriver(onErrorJustReturn: Element) -> Driver<Element>
    
public func asDriver(onErrorDriveWith: Driver<Element>) -> Driver<Element>

public func asDriver(onErrorRecover: @escaping (_ error: Swift.Error) -> Driver<Element>) -> Driver<Element>

Observer의 구조


ObserverObservalbe로 부터 이벤트를 구독하고 이벤트가 발생하면 처리하는 역할을 담당하고 있습니다.

가장 최상위에 ObserverType 프로토콜이 있으며, 구독(subscribe) 행위에 관련된 메소드를 정의하고 있습니다.

// ObserverType
func on(_ event: Event<Element>)

public func onNext(_ element: Element) {
    self.on(.next(element))
}
    
public func onCompleted() {
    self.on(.completed)
}

public func onError(_ error: Swift.Error) {
    self.on(.error(error))
}

on(Event)를 통해 .next, .error, .completed 이벤트를 전달하는 것을 확인할 수 있습니다.

ObsesrverType을 채택한 클래스 ObserverBase 클래스가 그 아래 위치하고 있습니다.
특징은 ObsesrverType.on(Event)를 구현하고 있고, Disposable을 채택하여 Disposable.dispose() 메소드를 구현하고 있습니다.
또 다른 특징은 onCore(Event)를 정의해서 서브클래스에서 구현을 강제하도록 하고 있습니다.

func onCore(_ event: Event<Element>) {
    rxAbstractMethod()
 }

onCore(Event) 메소드는 ObserverBase에서 OberverType.on(Event)를 구현한 것을 내부적으로 처리하기 위해 서브클래스에 구현을 위임한 것입니다.


private let isStopped = AtomicInt(0)

func on(_ event: Event<Element>) {
    switch event {
    case .next:
        if load(self.isStopped) == 0 {
            self.onCore(event)
        }
    case .error, .completed:
        if fetchOr(self.isStopped, 1) == 0 {
            self.onCore(event)
        }
    }
}

그리고 isStopped:AtimicInt를 통해 비동기에 순차적으로 이벤트를 처리하고 있습니다.
.error, .completed이벤트가 전달될 경우 isStopped의 값을 1로 패치하고 있습니다.
이후에 발생하는 .next 이벤트는 구독자에게 전달되지 않도록 구현되어 있습니다.

AnyObserver, AnonymousObserver

이미지에서 초록색으로 Observer의 구조에서 최상위에 있는 ObserverType을 구현한 AnyObserverObserverBase:ObserverType를 구현한 AnonymousObserver에 대해 알아보겠습니다.

AnyObserver:ObserverType

AnyObserver는 단순히 구독에 관련된 메소드(on, onNext, onError, onCompleted)를 구현하고 있습니다.
단순히 전달되는 이벤트에 대해서 처리하는 기본적인 Observer라고 보시면 간편해보입니다.

생성자 파라미터로 이벤트 핸들러 클로저를 전달받아 on(Event)메소드 내부에서 이벤트를 처리합니다.

public typealias EventHandler = (Event<Element>) -> Void

private let observer: EventHandler

public init(eventHandler: @escaping EventHandler) {
    self.observer = eventHandler
}

...

public func on(_ event: Event<Element>) {
    self.observer(event)
}

RxCommunity의 Action 라이브러리에서 내부적으로 inputsAnyObserver로 처리하여 PublishRelay로 전달하고 있습니다.

let inputsRelay = PublishRelay<Input>()
inputs = AnyObserver { event in
    guard case .next(let value) = event else { return }
    inputsRelay.accept(value)
}

그리고 ObserverType.asObserver()를 호출할 경우 AnyObserver를 생성하여 반환합니다.

// extension ObserverType
public func asObserver() -> AnyObserver<Element> {
    AnyObserver(self)
}

// AnyObserver
public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
    self.observer = observer.on
}

개인적으로는 AnyObserver를 뷰에서 발생한 이벤트를 뷰모델로 전달할 때 뷰모델의 setter로 활용합니다.
뷰에서 throttle 등 연산자를 통해 다중 발생을 방지했기 때문에 뷰모델에서 연속적으로 이벤트를 받지 않기 때문에 이렇게 사용합니다

AnonymousObserver:ObserverBase

AnonymousObserverAnyObserver와 동일하게 초기화 파라미터로 이벤트 핸들러를 전달받아 ObserverBase.onCore(Event)를 오버라이딩하여 처리하고 있습니다.

AnonymousObserver의 특징은 일반적인 Observable.subscribe하면 내부에서 AnonymousObserver가 생성됩니다.

// extension ObservableType
public func subscribe(
        onNext: ((Element) -> Void)? = nil,
        onError: ((Swift.Error) -> Void)? = nil,
        onCompleted: (() -> Void)? = nil,
        onDisposed: (() -> Void)? = nil
    ) -> Disposable {
    
    ...
    
    let observer = AnonymousObserver<Element> { event in
    ...
    }
    
	return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
    
}

수정해야될 부분이 있으면 알려주시면 확인 후 수정하겠습니다.
감사합니다.

profile
사람들 속에 숨어사는 INTJ 성향을 가진 개발자

0개의 댓글