coroutine 에 대해 잘 설명돼있는 블로그 글을 보고 정리한 내용이다.
an instance of a suspendable computation
launch
async
runBlocking
fun main() = runBlocking {...}
suspend fun main()
은 특수한 상황으로, 컴파일러가 main 함수를 coroutine 으로 실행시킨다.public interface CoroutineScope {
public val coroutineContext: CoroutineContext
}
launch {}
와 같은 coroutine builder 로 넘긴 실행할 code block Continuation 이라는 단위로 만들어짐.resume()
호출로 resumed
상태로 전환되어 실행된다.resume()
이 호출될 때, 현재 context 의 dipacher 에게 스레드 전환(dispacth) 가 필요하다면 dispatch()
함수를 호출하여 적합한 스레드로 전달하여 수행한다.공식 가이드에서 Coroutine 은 light-weight thread 라고 말한다. 이를 분석해보자.
fun main(args: Array<String>) = runBlocking {
repeat(100_000) {
launch {
delay(1000L)
print(".")
}
}
}
위 코드에서 Dispatcher 를 재정의하지 않고 launch {}
를 사용한다. 따라서, runBlocking
이 사용하는 GlobalScope
를 상속 받아 사용하게 된다. 이 scope 에서 BlockingDispatcher
를 사용하는데, 이는 이벤트 루프 형태의 Dispacher
구현이다. 즉, 위 코드는 이벤트 루프 기반으로 10만번의 이벤트를 발생하여 .
을 출력하게 되며 스레드 부하가 없다. 따라서, OOM 을 피할 수 있게 된다.
coroutineScope
를 통해 생성된 scope 는 자식 coroutine 이 끝날 때까지 종료되지 않는다.이 부분은 정확히 알지 못해 이 블로그 글에서 옮겨 적었다.
Continuation이란?
1. 한줄 정의
- 중단된 계산을 이어서 실행하기 위해 필요한 스냅샷(상태 + 다음 실행 위치 + 컨텍스트)
- 컴파일러가 하는 일
suspend
함수는 컴파일 단계에서 다음과 같이 바뀜
- 상태머신(state machine) 으로 변환: 각 suspend 지점(예: delay, 다른 suspend 호출)은 label 로 구분된 상태 번호가 됨.
- 마지막 파라미터로 Continuation 추가: 결과 타입 T를 담아서 resumeWith(Result) 로 콜백.
- 지역 변수 저장: 다음 재개 시 필요하도록 로컬 변수들을 컨티뉴에이션의 필드에 저장.
- 재개 지점 복원: resumeWith 호출 시 저장된 label 에 따라 정확한 위치부터 다시 실행.
- Continuation 구성
- context: CoroutineContext
- Job(취소/구조화), Dispatcher(스레드/큐), CoroutineName 등 메타데이터 묶음
- resumeWith(result: Result)
- 중단된 계산을 성공값 또는 예외와 함께 재개
- 예제
suspend fun fetch(): String { delay(100) // ⟵ suspend 지점 1 val a = apiCall() // ⟵ suspend 지점 2 (가정) return a.uppercase() }
컴파일 후 개념적으로
fun fetch(cont: Continuation<String>): Any { when (cont.label) { 0 -> { cont.label = 1 // delay(100)를 시작하고, 완료되면 cont.resumeWith(Result.success(Unit)) return SUSPENDED } 1 -> { cont.label = 2 // apiCall() 비동기 시작, 완료되면 cont.resumeWith(Result.success(value)) return SUSPENDED } 2 -> { // 이전 단계에서 저장해둔 value를 꺼내 uppercase 후 return "VALUE".uppercase() } else -> error("invalid state") } }
## dispatcher
- coroutine 이 어떤 스레드(`Worker`)에서 처리될지 스케줄링 요청을 보내는 책임
- coroutine scope 에 어떠한 dispacher 도 설정돼있지 않다면, `Dispachers.Default` 를 사용한다.
- 사전 정의/커스터마이징 dispacher 를 coroutine context 에 지정해서 사용할 수도 있다.
- Main dispacher 는 애플리케이션 메인 스레드(single thread)에서 이벤트 루프를 이용해 coroutine 의 실행을 스케줄링함.
- Unconfined 는 continuation 이 재개되는 스레드에서 바로 실행함.
### coroutine 스케줄링
Kotlin/JVM 에서는 백그라운드 작업을 수행하기 위해 이 두 가지 dispacher 를 제공
- 주로 CPU 사용하는 작업은 `Default` 를, 주로 Network, Disk I/O 를 사용하는 작업은 `IO` 를 사용한다.
- `Defalult` 와 `IO` 는 `CoroutineScheduler` 라는 동일한 스케줄러를 공유한다.
- coroutine 들은 dispatcher 를 통해 `CoroutineScheduler` 로 요청될 때, `Task` 라는 형태로 래핑되어 요청된다.
- `Default` 를 사용하도록 설정된 coroutine 은 `NonBlockingTaskContext` 으로 표시되며, `IO` 를 사용하도록 설정된 coroutine 은 `ProbablyBlockingTaskContext` 로 표시된다. 따라서, 각각 CPU intensive, I/O intensive 한 작업에 적합하다.

[그림 참조](https://myungpyo.medium.com/코루틴-디스패쳐-조금-더-살펴보기-92db58efca24)
dispacth to `CoroutineScheduler`
- `Deafult` 는 바로 coroutine 을 `Task` 로 래핑하여 `CoroutineScheduler` 에게 넘긴다. `IO` 는 `LimitingDispatcher` 로써 병렬 제한치(parallelism limit)라는 버퍼를 두고 스케줄링 요청을 할지, 자체적으로 갖는 `Task` 큐에 대기시킬지 결정한다.
`WorkerQueue`
- `CoroutineScheduler` 는 `Worker`(스레드)를 배열로 관리(생성/갱신/제거)한다.
- `Worker` 는 수행해야할 Task 들을 완료하고 나면 대기 상태에 들어가고 `parkedWorkerStack` 에 push 되어 대기하게 된다.
- 활성된 `Worker` 들이 바빠서 요청된 `Task` 를 수행할 수 없으면 이 대기 상태의 `Worker` 깨우고 `Task` 를 수행하도록 한다.
- stack 을 사용하여 LIFO 방식으로 `Worker` 들을 깨우는 이유는 최근까지 사용된 `Worker` 부터 재사용함으로써 메모리 footprint(용량) 감소와 referaence locality 면에서 이득 때문이다.
`WorkQueue`
- `Worker` 는 내부적으로 할당된 `Task` 를 담고 있는 `WorkQueue` 를 갖고 있다.
- 이 큐는 SPMC(Single-Producer, Multi-Consumer) 자료 구조로 사용된다.
- Producer 는 해당 큐를 소유한 `Worker` 이며 Consumer 는 다른 활성 `Worker` 들이다.
- 자신의 `Task` 를 마친 `Worker`는 parked 되기 전에, 다른 활성 `Worker` 를 살펴보고 `Task` 를 빼앗아 수행하는 Task Stealing Algorithm 이 적용되어 있다.
`globlaCpuQueue`, `globalBlockingQueue`
- `Worker` 에 할당되지 못한 `Task` 의 경우, 각 `TaskContext` 에 맞는 queue 에서 가용한 `Worker` 에 할당될 때까지 기다림