
AsyncSequence를 공부하다 후반에 나온 AsyncStream을 이해하지 못해 문서를 읽어본다.
이전 글에서 AsyncSequence를 Custom하게 만드는 방법까지 알아보았다. 하지만 Apple에서는 기존 로직을 converting하는데 있어 다른 방법을 제안하는데, 그녀석이 AsyncStream이다.
public struct AsyncStream<Element> {
...
}
extension AsyncStream : AsyncSequence {
...
}
일단 우리가 이전에 알아본 AsyncStream을 준수하고 있다는 것을 확인할 수 있다. 그렇다면, 이 친구는 AsyncSequence를 만들어주면서 다른 추가적인 method가 들어간 녀석이 아닐까? 맞다.
이해하기 위해 Zedd님의 예시 코드를 가져왔습니다.
let digits = AsyncStream<Int> { continuation in
for digit in 1...10 {
continuation.yield(digit)
}
continuation.finish()
}
Task {
for await digit in digits {
print(digit) // 1, 2... 10
}
}
이렇게 하면 AsyncStream을 만들 수 있다. 이전에 비해 상대적으로 쉽게 AsyncStream을 만들 수 있다. 그런데 내부적으로 동작하는 방식을 모르겠다.
일단 AsyncStream의 생성자가 이상하다. 생성시에 closure를 받고 있다.
init(
_ elementType: Element.Type = Element.self,
bufferingPolicy limit: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded,
_ build: (AsyncStream<Element>.Continuation) -> Void
)
실제로 보면, 기본 인자를 두개 받고, 마지막에 build라는 변수로 closure를 받고 있다.
elementTypebufferingPolicybuildbuffer? 어떻게 버퍼로 동작하는지 궁금하여 아래의 코드를 구성했다.
let stream = AsyncStream<Int>(Int.self,
bufferingPolicy: .bufferingNewest(5)) { continuation in
for i in 0..<10 {
continuation.yield(i)
}
continuation.finish()
}
Task {
// Call point:
for await random in stream {
print ("\(random)") // 10개중 현재시점으로 부터 가장 근방에 발생한 5개의 원소만 가져온다. (5, 6, 7, 8, 9)
}
}
해당 동작 방식을 통해 어느정도 어떻게 동작하는지 유추해볼 수 있을 것 같다. build로 넣은 클로저는 다른 thread에서 동작하게 하고, 결과를 특정 buffer에 받은 뒤,
A mechanism to interface between synchronous code and an asynchronous stream.
AsyncStream이 AsyncSequence를 쉽게 만들어주는 친구라면, 이전 글에서 보았듯 throw를 처리하는 녀석도 쉽게 만들 수 있어야 한다. 이를 쉽게 만들어주는 친구가 요녀석이다.
enum CustomError: Error {
case fiveError
}
let digits = AsyncThrowingStream<Int, Error> { continuation in // AsyncThrowingStream.Continuation ✅ AsyncStream.Continuation ❎
for _ in 1...10 {
let digit = Int.random(in: 1...10)
if digit == 5 {
continuation.finish(throwing: CustomError.fiveError)
}
continuation.yield(digit)
}
continuation.finish()
}
Task {
do {
for try await digit in digits {
print ("\(digit)")
}
} catch {
print(error)
}
}
// 10
// 7
// 9
// 7
// fiveError
두 타입 모두, stream이 termination 되는 시점에 continuation에 onTermination 콜백을 지정할 수 있다. (AsyncStream.Continuation.Termination, AsyncThrowingStream.Continuation.Termination)
let digits = AsyncStream(Int.self) { continuation in
continuation.onTermination = { termination in
switch termination {
case .finished:
print("producing finished")
case .cancelled:
print("producing cancelled")
@unknown default:
fatalError()
}
}
for _ in 1...10 {
let digit = Int.random(in: 1...10)
if digit == 5 {
continuation.onTermination?(.cancelled)
}
print("produced: ", digit)
continuation.yield(digit)
}
print("before producing finish")
continuation.finish()
print("after producing finish")
}
Task {
do {
for try await digit in digits {
print ("stream: \(digit)")
}
} catch {
print(error)
}
}
produced: 8
producing cancelled
producing finished
produced: 5
produced: 9
produced: 5
produced: 3
produced: 7
produced: 9
produced: 6
produced: 7
produced: 4
before producing finish
after producing finish
stream: 8
위의 동작을 했을 때, onTermination을 호출하면 동작이 멈추는 것으로 이해했는데 그러지는 않는 듯 하다.
This means that you can perform needed cleanup in the cancellation handler.
공식 문서에 나온 표현으로 미루어 짐작해보았을 때, 해당 스트림이 finish 되거나 cancel될 때 발생하는 동작을 규정하는 듯 하다. 만약 digit이 5인 경우 동작 수행을 종료하고 싶다면, break 문등을 추가하여 만드는 것이 맞는 듯하다.
추후에 공부하면서 알게 된 것인데, 여기서 onTerminate의 cancel은 structured concurrency와 관련이 있는 듯 하다. 추후 글에서 알아보도록 하자.