Combine 공식 문서 파헤치기 (3)

DongHeon·2023년 11월 10일
0

Combine

목록 보기
4/5
post-thumbnail

안녕하세요!!

지난 시간에는 Publisher와 관련된 기사를 정리했다면 오늘은 Subscriber와 관련된 기사를 정리해 보도록 하겠습니다.

Processing Published Elements with Subscribers

Apply back pressure to precisely control when publishers produce elements.

영어로 된 요약을 해석하면 Publisher가 element를 제작할 때 back pressure를 적용해 정밀하게 제어할 수 있습니다.

일반적으로 Subscriber가 Publisher에게 요청을 보내면 element를 보내게 됩니다. 이 과정에서 Subscriber.Demand를 이용해 Item의 수를 결정할 수 있습니다.

  • 처음 구독을 시작할 때 Subscription에서 request(_:)를 호출합니다.
  • Publisher가 element를 전달하기 위해 Subscriber의 receive(_:)를 호출할 때 새로운 요구를 반환합니다.

Demand는 계속해서 변하기 때문에 요청이 추가되거나 element를 반환하면 계속해서 변합니다.

Publisher가 너무 많은 demand를 갖게 되면 더 이상 협상이 불가능합니다.

Consume Elements as the Publisher Produces Them

Subscriber를 만드는 간편한 방법이 2가지 있다고 다른 글에서 정리한 적이 있습니다. assign(to:on:),sink(receiveValue:)입니다.

이 2가지 방법을 통해 구독자를 만들면 생성되는 즉시 element를 요구하게 됩니다. 잘 못하면 Publisher에게 무한의 element를 요구하는 경우가 발생할 수 있습니다.

무한의 element를 반환하게 되면 Subscriber가 처리하는 속도보다 빠르게 반환할 수 있기 때문에 처리를 기다리는 동안 메모리 버퍼를 채워 메모리 부족 현상을 겪을 수 있습니다.

이러한 문제를 예방하기 위한 3가지 체크리스트가 존재합니다.
1. Publisher를 차단하면 안 됩니다..
2. element를 버퍼링하여 많은 메모리를 소비하면 안 됩니다.
3. element 처리에 실패하면 안 됩니다.

일반적으로 대부분의 Publisher는 속도를 제어할 수 있습니다. 다른 Publisher는 URL 로딩 시스템의 URLSession.DataTaskPublisher와 같은 단일 element만 생성합니다. 그렇기 때문에 sink를 통해 이러한 Publisher와 함께 할당하는 것은 안전합니다.

Apply Back Pressure with a Custom Subscriber

속도를 제어하고 싶다면 커스텀 Subscriber를 만들어야 합니다. 구독자가 element를 수신할 때 새로운 값을 반환하는 receive(_:)를 호출하거나 request(_:)를 호출해 더 많은 element를 요청할 수 있습니다.

back pressure?
Subscriber가 element를 수신할 준비가 되었다는 신호를 통해 흐름을 제어하는 개념이다.

Publisher는 계속해서 Demand(수요)를 추적하고 element를 필요로 하면 생산합니다.

// Publisher: Uses a timer to emit the date once per second.
let timerPub = Timer.publish(every: 1, on: .main, in: .default)
    .autoconnect()


// Subscriber: Waits 5 seconds after subscription, then requests a
// maximum of 3 values.
class MySubscriber: Subscriber {
    typealias Input = Date
    typealias Failure = Never
    var subscription: Subscription?
    
    func receive(subscription: Subscription) {
        print("published                             received")
        self.subscription = subscription
        DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
            subscription.request(.max(3))
        }
    }
    
    func receive(_ input: Date) -> Subscribers.Demand {
        print("\(input)             \(Date())")
        return Subscribers.Demand.none
    }
    
    func receive(completion: Subscribers.Completion<Never>) {
        print ("--done--")
    }
}


// Subscribe to timerPub.
let mySub = MySubscriber()
print ("Subscribing at \(Date())")
timerPub.subscribe(mySub)

Subscriber를 채택해 구현한 객체에는 3가지의 receive함수가 존재합니다. 먼저 Publisher는 subscribe함수를 호출해 구독자를 연결합니다. 이 때 가장 먼저 호출되는 함수가 receive(subscription: Subscription) 입니다. 함수는 Publisher의 요청에 따라 Subscriber를 제공하고 제공한 구독을 취소할 수 있습니다. 초기 요청을 진행한 후 element를 전달하기 위해 receive(_:)를 마지막으로 func receive(completion: Subscribers.Completion<Never>)를 호출해 성공여부와 종료여부를 판단합니다.

위 코드의 흐름을 보면 구독자를 생성하고 Publisher에 연결되면 5초 뒤부터 로직이 수행됩니다. receive(subscription: Subscription)를 통해 3개의 element를 요청했기 때문에 3번 이후에는 더 이상 element를 반환하지는 않지만 Subscribers.Completion.finished를 보내 완료 신호를 보내지도 않습니다. 왜냐하면 Publisher는 계속해서 요청을 기다리고 있기 때문입니다.

Manage Unlimited Demand by Using Back-Pressure Operators

Subscriber를 채택한 객체를 만드는 방법이 아닌 다른 방법으로도 제어를 할 수 있습니다.

  1. buffer(size:prefetch:whenFull:)
  2. debounce(for:scheduler:options:)
  3. throttle(for:scheduler:latest:)
  4. collect(_:) & collect(_:options:)

다음 시간에는 기존의 비동기 로직을 Combine으로 변경하는 기사를 정리해 보겠습니다.

0개의 댓글