[Combine] Subscribers

rbw·2023년 11월 26일
0

Combine

목록 보기
2/11

https://www.apeth.com/UnderstandingCombine/subscribers/subscribers.html
이번엔 구독자에 대해서 알아보겠슴니다

Sink

이 친구는 일종의 다목적 구독자입니다. 가장 완전한 형태는 다음과 같이 생성됩니다

func sink(receiveCompletion: f1, receiveValue: f2) -> AnyCancellable

위 표기법에서 f1, f2는 함수입니다. 일반적으로 익명 함수이며 두 번째 함수는 일반적으로 후행 클로저 구문을 사용하므로 다음과 같이 보일것입니다.

.sink(receiveCompletion: {
    comp in // ...
}) {
    val in // ...
}

첫 번째 함수에서 들어오는 매개변수는 완료입니다. 앞서 설명했듯이 이것은 실제로 완료 메시지를 저장하는 Subscribers.Completion 열거형입니다. 케이스는 두 가지가 있습니다.

  • .finished: 게시자가 순조롭게 값 생성을 마쳤으므로 더 이상 값을 생성할 수 없음을 알려줍니다. 예를 들어, dataTaskPublisher 는 요청된 모든 네트워킹을 완료 후, .finished를 보냅니다
  • .failure: 게시자가 복구할 수 없는 오류가 발생하여 더 이상 값을 생성할 수 없음을 아려줍니다. 해당 실패의 특성은 이 열거형 케이스의 연관 값인 Error 객체로 표현됩니다

두 번째 함수에서 들어오는 매개변수는 파이프라인을 따라 내려온 실제 값입니다. 아래 예시코드가 간단한 사용예시 입니다

.sink(receiveCompletion: {
    switch $0 {
        case .finished: print("finished in good order")
        case let .failure(err): print("failure!", err)
    }
}) { print($0) }

만약, 싱크 바로 위의 업스트림에 있는 퍼블리셔의 실패 유형이 Never인 경우, 즉 실패가 발생하지 않는 경우에 receiveCompletion 매개변수를 완전히 생략해도 됩니다. 이 경우 .sink 구현을 더 간단하게 다시 작성할 수 있습니다.

sink의 가장 두드러진 특징은 receiveValue 함수에서 원하는 모든 것을 할 수 있다는 것입니다. '파이프라인 끝에 값이 도착했는데 무엇을 하고 싶으신가요?' 라는 질문에 가장 유연한 답변이 될 수 있슴니다

Assign

이 친구는 오류유형이 Never인 게시자에게만 사용 가능합니다

func assign(to:on:) -> AnyCancellable

첫 번째 매개변수는 Swift 키패스로 지정된 프로퍼티이고, 두 번째 매개변수는 객체입니다. 그 결과 들어오는 값이 해당 객체의 해당 프로퍼티에 할당됩니다

앞에서 살펴본 코드는 파이프라인을 따라 내려오는 UIImage가 이미지 뷰의 이미지 프로퍼티에 할당되는 예제였습니다

.assign(to: \.image, on: self.imageView)

sink 구독자가 할 수 없는 작업을 assign 구독자가 수행한다는 것은 명확하지 않습니다. sink 함수 내부에서도 충분히 가능한 경우가 많기 때문임니다

그러나 assign은 파이프라인의 목적이 값을 사용하여 일부 개체의 속성을 설정하는 일반적인 경우를 표현하는 좋은 방법임니다. 반면에 참조 사이클이 발생할 수 있다는 점은 조심해야합니다. self에 할당하는 경우에 이 점을 조심해야 함니다

위에 살펴본 경우의 self.iv는 self가 아니기 때문에 사이클이 발생하지는 않슴니다. 파이프라인의 작업이 실패를 보내거나 하나의 값을 전달한 후 완료를 전달하므로 어느 쪽이든 전체 파이프라인은 거의 즉시 정상적으로 릴리스됩니다. 프로퍼티가 self의 프로퍼티이고, 파이프라인이 업스트림에서 취소되지 않을 때 문제가 보통 발생합니다.

해결 방법은 캡처 리스트를 사용하여 사이클을 끊을 수 있으므로 sink를 사용하는 것입니다. 만약 위의 예시의 image가 self의 프로퍼티라고 가정하면 assign으로는 사이클이 발생할 수 있습니다.

AnyCancellable

위에 살펴본 sink, assignSubscribers.Sink, Subscribers.Assign 객체를 만듭니다. 하지만 모두 AnyCancellable 인스턴스로 래핑하여 얘를 반환함니다. 이에 대해서도 살펴보겠슴니다

먼저 이 클래스는 type-eraser 클래스로, Sink 객체나 Assign 객체가 실제로 무엇인지에 대한 핵심을 다루지 않아도 되므로 실제로는 Sink, Assign이 될 수 있는 공통 유형을 제공함니다. 두 유형 모두 취소 가능한 프로토콜을 준수하므로 취소 메서드가 존재함니다.

그리고 AnyCancellable은 래핑된 구독자 자체가 사라질 때 자동으로 취소를 호출하는 속성을 가지고 있슴니다 이 친구는 메모리 관리를 제공한다고 볼 수 있겠슴니다

반대로 파이프라인이 조기에 취소되는 위험을 감수하고 싶지 않다면, sink, assign에서 생성된 AnyCancellable 객체를 유지해야 합니다. 조기취소는 아래와 같은 경우에 발생할 수 있는 현상입니다

let url = URL(string:"https://www.apeth.com/pep/manny.jpg")!
URLSession.shared.dataTaskPublisher(for: url)
    .map {$0.data}
    .replaceError(with: Data())
    .compactMap { UIImage(data:$0) }
    .receive(on: DispatchQueue.main)
    .assign(to: \.image, on: self.iv)

앱이 실행되면 이미지 뷰에 이미지가 할당되지 않고 콘솔에 오류 메시지가 표시됩니다.

이는 AnyCancellable이 네트워킹을 수행하기도 전에 dataTaskPublisher에게 취소 호출을 보냈기 때문임니다. 따라서 항상 .store(in:)의 호출이 뒤따라야 AnyCancellable을 유지할 수 있슴니다.

연습삼아 직접 래핑을 수행하여 assign 연산자가 Assign 객체를 AnyCancellable에서 래핑하는 방법을 직접 시연하겠슴니다

let url = URL(string:"https://www.apeth.com/pep/manny.jpg")!
let pub : AnyPublisher<UIImage?,Never> =
    URLSession.shared.dataTaskPublisher(for: url)
        .map {$0.data}
        .replaceError(with: Data())
        .compactMap { UIImage(data:$0) }
        .receive(on: DispatchQueue.main)
        .eraseToAnyPublisher()

let assign = Subscribers.Assign(object: self.iv, keyPath: \UIImageView.image)
pub.subscribe(assign) // assign 으로 pub을 구독하겠다는 의미
let any = AnyCancellable(assign)
any.store(in:&self.storage) 

뒤에서 이런식으로 래핑하고 있음을 상상할 수 있슴니다.

하지만 이 전체 구조가 너무 무거워 보일 수 있는 상황도 있슴니다. 실제로 위의 예시에서는 한 번만 게시하고 중단할 것이라는 사실을 잘 알고 있슴니다. 영구적으로, 심지어 주변 객체만큼 오래 유지될 필요는 없슴니다.

한 번만 게시할 수 있을 만큼만 유지하면 되겠죠. 다음 장에서 이런 구독자를 살펴보겠슴니다.

One-Shot Subscribers

바로 위에서도 설명했듯이 구독자를 영구 저장소에 저장해야 퍼블리셔가 향후 언제든 게시할 기회를 가질 수 있도록 파이프라인이 충분히 오래 유지됩니다. 그러나 그 이후에는 한 번만 게시하는 퍼블리셔(one-shot publisher)는 작업을 완료한 것으로 간주되며 존재하지 않도록 허용될 수 있습니다.

이런 상황에서 구독자가 value, completion를 받을 수 있을 만큼만 오래 지속하면 됩니다. 방법은 아래와 같습니다.

var cancellable: AnyCancellable? // 1
cancellable = pub.sink(receiveCompletion: {_ in // 2
    cancellable?.cancel() // 3
}) { image in
    self.imageView.image = image
}

단계를 살펴보겠습니다

  1. 먼저 AnyCancellable 변수를 선언합니다
  2. 구독자를 생성하고 해당 구독자를 위 변수에 할당합니다.
  3. 마지막으로 완료를 받는다면 취소를 합니다. cancel()

세 번째 단계의 취소는 실제로 취소를 호출하는 것과 별개로 메모리 관리와 관련된 두 가지 작업을 수행함니다.

  1. Sink의 비동기 완료 함수 내에서 cancellable을 참조함으로써 구독자로부터 값이 도착할 때까지 전체 파이프라인을 오래 살아있게 유지합니다.
  2. cancellable을 취소함으로써 파이프라인이 사라지도록 허용하고 주변 VC에 누수를 유발할 수 있는 참조 사이클을 방지합니다.

Custom Subscribers

이제 구독자를 만든다고 가정할 때 어떻게 만들어야할 지 살펴보겠슴니다.

아주 간단한 구독자를 작성해보겠슴니다. 업스트림 퍼블리셔로부터 받은 값이나 실패 메시지를 자동으로 인쇄할 뿐인 구독자입니다.

구독자가 처리해야 할 가장 중요한 사항은 구독자와의 통신이 전적으로 구독 객체를 통해 이루어지는 사실임니다 구독자는 해당 구독 객체를 참조하는 인스턴스 프로퍼티가 있어야 퍼블리셔와 대화할 수 있슴니다. 따라서 구독자에게 해당 프로퍼티를 제공하는 것으로 시작하여 완료 or 취소로 인해 종료되었는지 여부를 나타내는 불리언 프로퍼티도 제공하겠슴니다.

class Printer<Input, Failure: Error> {
    var subscription: Subscription?
    var completd = false
}

앞서 설명했듯이 구독자는 입력, 실패 유형에 대해 제네릭해야 합니다.

이제 Subscriber는프로토콜임니다. 따라서 얘를 채택하는 확장을 추가하겠슴니다.

extension Printer: Subscriber {

}

먼저 추가해야할 메소드는 receive(subscription:) 임닌다.

func receive(subscription: Subscription) { }

이 친구는 퍼블리셔를 구독했음을 알려줍니다. 퍼블리셔가 구독 객체를 생성해서 우리에게 전달했습니다. 나중에 퍼블리셔와 소통할 경우를 대비하여 해당 구독을 보관하는 것이 매우 중요한 일임니다.

대부분의 수신(구독) 구현은 두 번째 작업도 수행하는데, 이는 바로 구독을 통해 퍼블리셔에 연락하여 값 전송을 시작하도록 요청하는 것입니다. 이는 필수 기능은 아니므로 퍼블리셔가 즉시 퍼블리싱을 시작하지 않으려는 경우 퍼블리셔에게 값 전송을 시작하도록 요청할 필요는 없습니다.

하지만 일반적으로 퍼블리셔가 즉시 게시를 시작하기를 원합니다. 또 흥미롭게도 퍼블리셔에게 값을 요청하지 않으면 퍼블리셔는 값을 보내지 않습니다

Combine에서는 퍼블리셔로부터 값을 가져오는 작업을 구독자에게 맡깁니다. 다라서 구독자는 값을 가져오지 않음으로써 퍼블리셔가 값을 보내지 못하게 할 수 있슴니다. 퍼블리셔의 값 생성 속도를 조절할 수 있는 구독자의 이러한 기능을 역압력(backpressure)라고 함니다.

퍼블리셔에게 값을 받고 싶다고 알리는 방법은 구독의 요청(request) 메서드를 호출하는 것입니다. 매개변수로 Subscribers.Demand 구조체의 정적 멤버들인 .unliited, .none, .max()가 있슴니다. 아래는 무제한으로 받겠다는 예시

func receive(subscription: Subscription) {
    if self.subscription == nil && !self.completed {
        self.subscription = subscription
        subscription.request(.unlimited)
    }
}

두 번째로 구현해야할 메소드는 func receive(_ input: Input) -> Subscribers.Demand 임니다.

얘는 퍼블리셔로부터 값을 가져왔음을 알려줌니다. 이 값으로 많은 동작을 할 수 있지만 이번 예시에서는 출력만 하겠슴니다. 또 얼마나 많은 값을 더 받고 싶은지를 나타내는 다른 Demand 타입을 반환해야합니다.

func receive(_ input: Input) -> Subscribers.Demand {
    print(input)
    return .unlimited
}

마지막으로 구현해야할 메소드는 func receive(completion: Subscribers.Completion<Failure>)임니다.

얘는 퍼블리셔로부터 완료 메시지를 받았음을 알려줌니다. 완료 메시지는 .finished일 수도 있고, .failure일 수도 있습니다. 후자의 경우 연관 값도 같이 전달받슴니다.

func receive(completion: Subscribers.Completion<Failure>) {
    if case let .failure(err) = completion {
        print(err)
    }
    self.subscription = nil
    self.completed = true
}

이것으로 저희가 만든 클래스가 구독자 프로토콜 채택 조건을 달성했슴니다. 이제 Cancellable도 만족해야겠죠 여기서 요구하는 사항은 cancel 메서드 하나 임니다. 게시자에게 취소 메시지를 보내도록 지시할 수 있ㅅ브니다.

extension Printer : Cancellable {
    func cancel() {
        self.subscription?.cancel()
        self.subscription = nil
        self.completed = true
    }
}

이제 위에 저희가 만든 구독자 Printer가 완성되었씀니다. 사용 예시는 다음과 같슴니다.

let pub = [1, 2, 3].publisher
let sub = Printer<Int, Never>()
pub.subscribe(sub)
AnyCancellable(sub)
    .store(in:&self.storage)
// prints 1, then 2, then 3

퍼블리셔와, Printer의 적절한 유형을 주어 초기화시키고, 위에서 만든 퍼블리셔를 구독함니다. 그리고 AnyCancellable로 래핑하고 저장해줌니다. 또 Printer에서 구독을 받는 부분을 .unlimited로 설정했기 때문에 값을 전부 받을 때까지 수행됨니다.

여기서 구독을 받는 부분을 .max(1)로 설정하고 입력을 받는 부분을 .none으로 설정한다면 총 하나의 값만 요청하기 때문에 값 하나만 받고 종료가 됩니다.

profile
hi there 👋

0개의 댓글