코틀린 Flow 공식문서 읽기 스터디 6차

Mendel·2023년 11월 26일
0

이번 범위

Launching Flow

어떤 곳에서 발생하는 비동기적인 이벤트를 나타내기 위해 Flow를 사용하기 쉽다. 이런 경우, addEventListener같은 전통적인 방식으로 함수를 등록해서 이벤트를 처리한다. onEach라는 중간 연산자가 이 일을 해줄 수 있다. 하지만, onEach는 중간 연산자라서 이것만 추가해서는 플로우의 수집이 되질 않는다. 때문에 종료 연산자인 collect를 달아줘야 한다.

collect는 내부적으로 while문을 반복하면서 들어오는 값을 모두 처리한다.
즉, collect를 호출한 이후 그 아래의 문장들은 수집이 모두 완료될때까지 실행되지 못한다는 것을 의미한다.

아래와 같은 케이스를 봐보자.

// Imitate a flow of events
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .collect() // <--- Collecting the flow waits
    println("Done")
}       

Event: 1
Event: 2
Event: 3
Done

수집이 모두 끝난 후에야 collect 아래에 있는 print문이 출력된 것을 알 수 있다.
수집을 별도의 코루틴에서 launch시켜서 이런 문제를 없애고 싶다면 collect대신 launchIn을 사용할 수 있다.

여기서 사실 필자는 아무 인자를 받지 않는 collect문을 처음보았다. 해당 api는 우리가 보통 쓰는 것이 아니라 확장함수로 제공되는 함수였다. 위 설명을 보면, 보통 onEach나 onCompletion, catch와 함께 사용하기 위해 만든 것 같다. 플로우에 시작에 대한 단순 트리거 용도로 사용할 수 있다고 더 위 문장에 나와 있었다.

launchIn

launchIn은 collect()함수 호출을 다른 코루틴에서 실행하기 위한 용도이다. 아래 예제를 봐보자.

fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .launchIn(this) // <--- Launching the flow in a separate coroutine
    println("Done")
}     

Done
Event: 1
Event: 2
Event: 3

아까 위의 예제와는 다르게 Done이 먼저 잘 출력된 것을 알 수 있다. launchIn이 다른 코루틴에서 실행됨으로써 그 아래 문장의 실행들을 블럭하지 않았기 때문이다.

위 예제에서는 launchIn에서 사용할 스코프로 runBlocking빌더에 있는 스코프를 사용하고 있다. 덕분에 이 스코프에서 실행한 코루틴인 수집 코루틴이 정상적으로 완료될때까지 프로세스가 종료되지 않고 잘 유지되는 것이다.

위에서 아까 시작할때, onEach가 addEventListener의 역할을 한다고 했다. 그렇다면, removeEventListener도 필요할까? 코루틴의 구조화된 동시성이라는 특징때문에 수집을 하는 코루틴의 스코프가 죽게되면 자동으로 취소되기 때문이다.

한가지 더 주목할 점으로는 launchIn은 Job을 반환하기 때문에 우리가 따로 cancel이나 join등을 호출해서 제어해줄 수 있다.

Flow cancellation checks

Flow빌더는 값이 방출될때마다 ensureActive검사를 추가적으로 한다.(이 검사는 원래 코루틴이 중지될때마다 수행되는 검사로, 해당 코루틴이 취소되어야 하는지 아닌지를 검사하는 용도로 진행된다)
이로인해 플로우는 해당 코루틴이 취소되면 따라서 방출도 취소될 수 있는 것임.
아래의 예제를 봐보자

fun foo(): Flow<Int> = flow { 
    for (i in 1..5) {
        println("Emitting $i") 
        emit(i) 
    }
}

fun main() = runBlocking<Unit> {
    foo().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}

Emitting 1
1
Emitting 2
2
Emitting 3
3
Emitting 4
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@6d7b4f4c

3이 수집될때 해당 코루틴이 취소됐다. 그로인해 4를 방출하자 예외가 발생하고 터진것임.

그러나, 대부분의 다른 플로우 연산자들은 추가적인 취소검사를 하지않는다. 성능상의 이유 때문이라고 함.
때문에 만약 아래와 같이 구성하고 중간에 코루틴을 중지시키지 않는다면 취소에 대한 검사를 하지 않고 계속 진행되고, 예외가 발생하게 된다.

fun main() = runBlocking<Unit> {
    (1..5).asFlow().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}

1
2
3
4
5
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@3327bd23

여기서 취소가 감지되는 시점은 runBlocking이 끝날때다. 그 이유는 중간에 해당 코루틴이 중지되는 경우가 한 번도 없기 때문에 취소검사가 돌지 않기 때문이다.

이해를 좀 더 돕기 위해, 다음과 같이 작성되어 있다면 어떨까?

fun main() = runBlocking<Unit> {
    (1..5).asFlow().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
    println("ㅎㅇ")
}

위의 경우는 ㅎㅇ까지 출력되고 예외가 발생한다.

fun main() = runBlocking<Unit> {
    (1..5).asFlow().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
    delay(1)
    println("ㅎㅇ")
}

위 예시는 중간 delay로 인해 코루틴이 잠시 중단된다. 이때, ensureActive를 검사하면서 코루틴이 취소됐는지 검사하고 예외가 발생하기 때문에 ㅎㅇ가 출력되지 않는다.

코루틴이 취소될 수 있게 만들기

때문에 emit을 사용하지 않고 플로우를 구성하는 경우 추가적인 최소 검사를 해줘야 한다. 이런걸 위해 아래와 같은 중간 연산자를 추가할 수도 있다.

onEach { currentCoroutineContext().ensureActive() }

하지만, 이런 작업을 간단히 해주기 위해 cancellable()이라는 중간연산자 api를 제공한다.

fun main() = runBlocking<Unit> {
    (1..5).asFlow().cancellable().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}

때문에 위와 같은 코드는 정상적으로 우리가 예상하는것과 같은 결과가 나온다. 3까지만 출력되고 취소예외가 발생한다.

Flow and Reactive Streams

리액티티브 스트림이나 리액티브 프레임워크(RxJava 등)에 익숙하다면 플로우의 설계가 더 친숙할 것이라고 한다. (필자도 최근에 RxJava를 짧게 접해보았는데 생각보다 플로우랑 사용법이나 제공api함수명 등이 너무 비슷해서 놀랐다)

플로우는 실제로 이런 기존의 것들을 반영해서 설계되었다고 한다. 그러나 플로우의 주 목적은 가능한 단순하고 코틀린과 서스펜션 친화적이고 구조화된 동시성을 따르는 것이라고 한다.

실제로 플로우는 RxJava와 호환될 수 있는 방법들을 제공한다.(컨버터 등을 kotlinx-coroutines-rx2에서 확인해볼 수 있다)


후기

드디어 다읽었다....
스터디를 하니 확실히 강제성도 생기고 다 같이 얘기를 나눠보면서 모호한 지식들도 잡혀 나가는 것 같다.
얼른 채널까지 끝내고, 컴포즈 스터디 시작하고 싶다ㅎㅎ


참고
https://kotlinlang.org/docs/flow.html#launching-flow

profile
이것저것(안드로이드, 백엔드, AI, 인프라 등) 공부합니다

0개의 댓글