안녕하세요!!
지난 시간에는 Publisher와 관련된 기사를 정리했다면 오늘은 Subscriber와 관련된 기사를 정리해 보도록 하겠습니다.
Apply back pressure to precisely control when publishers produce elements.
영어로 된 요약을 해석하면 Publisher가 element를 제작할 때 back pressure를 적용해 정밀하게 제어할 수 있습니다.
일반적으로 Subscriber가 Publisher에게 요청을 보내면 element를 보내게 됩니다. 이 과정에서 Subscriber.Demand
를 이용해 Item의 수를 결정할 수 있습니다.
request(_:)
를 호출합니다.receive(_:)
를 호출할 때 새로운 요구를 반환합니다.Demand는 계속해서 변하기 때문에 요청이 추가되거나 element를 반환하면 계속해서 변합니다.
Publisher가 너무 많은 demand를 갖게 되면 더 이상 협상이 불가능합니다.
Subscriber를 만드는 간편한 방법이 2가지 있다고 다른 글에서 정리한 적이 있습니다. assign(to:on:)
,sink(receiveValue:)
입니다.
이 2가지 방법을 통해 구독자를 만들면 생성되는 즉시 element를 요구하게 됩니다. 잘 못하면 Publisher에게 무한의 element를 요구하는 경우가 발생할 수 있습니다.
무한의 element를 반환하게 되면 Subscriber가 처리하는 속도보다 빠르게 반환할 수 있기 때문에 처리를 기다리는 동안 메모리 버퍼를 채워 메모리 부족 현상을 겪을 수 있습니다.
이러한 문제를 예방하기 위한 3가지 체크리스트가 존재합니다.
1. Publisher를 차단하면 안 됩니다..
2. element를 버퍼링하여 많은 메모리를 소비하면 안 됩니다.
3. element 처리에 실패하면 안 됩니다.
일반적으로 대부분의 Publisher는 속도를 제어할 수 있습니다. 다른 Publisher는 URL 로딩 시스템의 URLSession.DataTaskPublisher와 같은 단일 element만 생성합니다. 그렇기 때문에 sink
를 통해 이러한 Publisher와 함께 할당하는 것은 안전합니다.
속도를 제어하고 싶다면 커스텀 Subscriber를 만들어야 합니다. 구독자가 element를 수신할 때 새로운 값을 반환하는 receive(_:)
를 호출하거나 request(_:)
를 호출해 더 많은 element를 요청할 수 있습니다.
back pressure?
Subscriber가 element를 수신할 준비가 되었다는 신호를 통해 흐름을 제어하는 개념이다.
Publisher는 계속해서 Demand(수요)를 추적하고 element를 필요로 하면 생산합니다.
// Publisher: Uses a timer to emit the date once per second.
let timerPub = Timer.publish(every: 1, on: .main, in: .default)
.autoconnect()
// Subscriber: Waits 5 seconds after subscription, then requests a
// maximum of 3 values.
class MySubscriber: Subscriber {
typealias Input = Date
typealias Failure = Never
var subscription: Subscription?
func receive(subscription: Subscription) {
print("published received")
self.subscription = subscription
DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
subscription.request(.max(3))
}
}
func receive(_ input: Date) -> Subscribers.Demand {
print("\(input) \(Date())")
return Subscribers.Demand.none
}
func receive(completion: Subscribers.Completion<Never>) {
print ("--done--")
}
}
// Subscribe to timerPub.
let mySub = MySubscriber()
print ("Subscribing at \(Date())")
timerPub.subscribe(mySub)
Subscriber를 채택해 구현한 객체에는 3가지의 receive함수가 존재합니다. 먼저 Publisher는 subscribe
함수를 호출해 구독자를 연결합니다. 이 때 가장 먼저 호출되는 함수가 receive(subscription: Subscription)
입니다. 함수는 Publisher의 요청에 따라 Subscriber를 제공하고 제공한 구독을 취소할 수 있습니다. 초기 요청을 진행한 후 element를 전달하기 위해 receive(_:)
를 마지막으로 func receive(completion: Subscribers.Completion<Never>)
를 호출해 성공여부와 종료여부를 판단합니다.
위 코드의 흐름을 보면 구독자를 생성하고 Publisher에 연결되면 5초 뒤부터 로직이 수행됩니다. receive(subscription: Subscription)
를 통해 3개의 element를 요청했기 때문에 3번 이후에는 더 이상 element를 반환하지는 않지만 Subscribers.Completion.finished를 보내 완료 신호를 보내지도 않습니다. 왜냐하면 Publisher는 계속해서 요청을 기다리고 있기 때문입니다.
Subscriber를 채택한 객체를 만드는 방법이 아닌 다른 방법으로도 제어를 할 수 있습니다.
buffer(size:prefetch:whenFull:)
debounce(for:scheduler:options:)
throttle(for:scheduler:latest:)
collect(_:)
& collect(_:options:)
다음 시간에는 기존의 비동기 로직을 Combine으로 변경하는 기사를 정리해 보겠습니다.