원본 영상: https://developer.apple.com/videos/play/wwdc2021/10058/
대상: Swift Concurrency(Async/Await) / AsyncSequence를 처음부터 끝까지 이해하고 싶은 iOS/macOS 개발자
for await, 오류 처리, break/continue, 동시성/취소AsyncStream / AsyncThrowingStreammap/filter/reduce/first(where:) 등Sequence 사용감으로, for await ... in으로 순회합니다.FileHandle.bytes, URL.lines, URLSession.AsyncBytes(bytes(from:)), NotificationCenter.notifications 등.AsyncStream / AsyncThrowingStream으로 간단히 브리지할 수 있으며, onTermination을 통해 취소/정리를 확실히 보장합니다.Task 수명과 함께 관리하고 필요 시 cancel()로 안전하게 중단합니다.next()가 nil을 반환합니다.associatedtype Element associatedtype AsyncIterator: AsyncIteratorProtocol func makeAsyncIterator() -> AsyncIterator AsyncIteratorProtocol.next() async throws -> Element?Sequence와 유사하지만, 각 요소를 비동기적으로 전달합니다.for await element in asyncSequence {
// 요소를 하나씩 비동기적으로 소비
}
do {
for try await element in throwingSequence {
// ...
}
} catch {
// 스트림 과정에서 throw된 에러 처리
}
break / continue 지원for-in과 동일하게, 조건에 따라 조기 종료(break)나 건너뛰기(continue)가 가능합니다.for-in을 내부적으로 이터레이터/while let ... next()로 풀어냅니다.next()를 await 가능한 버전으로 바꾼 형태가 됩니다.Task로 감싸서 소유자 수명에 묶고, 필요 시 cancel()로 중단합니다.let t1 = Task { for await e in streamA { handle(e) } }
let t2 = Task {
do { for try await e in streamB { handle(e) } }
catch { log(error) }
}
// ...나중에
t1.cancel(); t2.cancel()
다음 API들은 iOS 15 / macOS 12 등에서 사용할 수 있습니다.
for try await line in FileHandle.standardInput.bytes.lines {
// stdin 라인 스트리밍
}
for try await line in URL(fileURLWithPath: "/tmp/data.csv").lines {
// 파일을 라인 단위로 소비
}
let (bytes, response) = try await URLSession.shared.bytes(from: url)
guard let http = response as? HTTPURLResponse, http.statusCode == 200 else {
throw URLError(.badServerResponse)
}
for try await b in bytes {
consume(b)
}
let center = NotificationCenter.default
let firstMatch = await center.notifications(named: .NSPersistentStoreRemoteChange)
.first { note in
(note.userInfo?["Key"] as? String) == "ExpectedValue"
}
URL.lines로 읽고, dropFirst()로 헤더를 건너뛰며 각 라인을 파싱해 실시간 처리합니다.let endpoint = URL(string: "https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_month.csv")!
for try await line in endpoint.lines.dropFirst() {
let parts = line.split(separator: ",")
// time, lat, lon, magnitude 등 파싱
}
AsyncStream / AsyncThrowingStreamfinal class Monitor {
var handler: ((Event) -> Void)?
func start() { /* 이벤트 발생 시 handler?(event) */ }
func stop() { /* 정리 */ }
}
func makeStream() -> AsyncStream<Event> {
AsyncStream(Event.self, bufferingPolicy: .bufferingOldest(100)) { c in
let m = Monitor()
m.handler = { c.yield($0) }
c.onTermination = { _ in m.stop() } // 취소/종료 시 정리
m.start()
}
}
yield(_:)finish()onTermination에서 타이머/소켓 해제 등AsyncThrowingStreamenum StreamError: Error { case failure }
func throwingStream() -> AsyncThrowingStream<Int, Error> {
AsyncThrowingStream(Int.self) { c in
// 값: c.yield(v), 오류 종료: c.finish(throwing: error)
}
}
.unbounded / .bufferingNewest(n) / .bufferingOldest(n) 중 상황에 맞게 선택.Newest), 순서 보존은 Oldest가 유리.map, compactMap, filter, reduce, first(where:), dropFirst, prefix 등 대부분의 익숙한 연산이 비동기 버전으로 제공됩니다.let numbers = URL(fileURLWithPath: "/tmp/numbers.txt").lines
.compactMap(Int.init)
.filter { $0 % 2 == 0 }
.prefix(10)
for try await n in numbers {
print(n)
}
1) Task 수명 관리: 화면/소유자 종료 시 task.cancel() 호출.
2) 정리 보장: onTermination에서 관찰자/소켓/타이머 정리.
3) 버퍼링: 생산>소비인 상황을 가정하고 정책 명시.
4) UI/MainActor: UI 갱신은 @MainActor 또는 메인 스레드에서.
5) 에러/종료 시그널: finish()/finish(throwing:)로 명시적 종료가 테스트에 유리.
6) 테스트: 수집/타임아웃/정리 호출 여부를 검증하는 유틸을 활용.
let note = await NotificationCenter.default
.notifications(named: .NSPersistentStoreRemoteChange)
.first { $0.userInfo?["SomeKey"] as? String == "Target" }
func timerStream(interval: TimeInterval) -> AsyncStream<Date> {
AsyncStream { c in
let t = Timer.scheduledTimer(withTimeInterval: interval, repeats: true) { _ in
c.yield(Date())
}
c.onTermination = { _ in t.invalidate() }
}
}
extension AsyncSequence {
func mapLatest<T>(_ transform: @escaping (Element) async -> T) -> AsyncStream<T> {
AsyncStream { c in
let task = Task {
var latest: Task<Void, Never>?
for await v in self {
latest?.cancel()
latest = Task {
let out = await transform(v)
guard !Task.isCancelled else { return }
c.yield(out)
}
}
c.finish()
}
c.onTermination = { _ in task.cancel() }
}
}
}
Q. AsyncSequence는 언제 종료되나요?
A. 더 이상 값이 없으면 이터레이터의 next()가 nil을 반환하며 종료합니다. 오류가 발생하면 그 시점에 종료되며 이후 next()는 nil입니다.
Q. 무한 스트림은 어떻게 다뤄야 하나요?
A. Task로 감싸 수명을 소유자와 묶고, 필요 시 cancel()로 중단합니다. onTermination에서 정리를 보장하세요.
Q. 백프레셔는 어떻게 처리하나요?
A. AsyncStream 생성 시 버퍼링 정책을 명시해 생산/소비 속도 차이를 제어합니다.
Q. 에러를 던지는 스트림은 어떻게 만들죠?
A. AsyncThrowingStream<Element, Error>를 사용하고, 오류 발생 시 finish(throwing:)를 호출합니다.