코틀린 Flow 공식문서 읽기 스터디 3차

Mendel·2023년 11월 6일
0

1차, 2차 스터디 내용은 추후에 올리도록 하겠습니다.

범위

Flow Context

Flow를 수집하는 함수는 suspend 키워드이다. 즉, 흐름을 수집하기 위해서는 코루틴이 필요하다는 것을 알 수 있으며, 코루틴 컨텍스트를 기반으로 실행된다고 함. 이 말은 해당 코루틴의 컨텍스트에서 실행된다는 것은 아니며, collect가 호출된 코루틴 컨텍스트 환경에서 실행된다는 것을 의미한다.

아래 예시는 collect가 구체적인 코루틴 컨텍스트를 지정받고 실행된 예시임.

withContext(context) {
    simple().collect { value ->
        println(value) // run in the specified context
    }
}

즉, flow빌더 블록 안의 코루틴 컨텍스트는 collect로 수집을 한 대상이 따르는 컨텍스트임.

context preservation (컨텍스트 보존)
flow빌더 블록 안에서 컨텍스트를 바꿔서 emit을 실행하면 런타임에 터지는 것을 확인할 수 있다. 즉, 방출된 값은 수집자와 항상 같은 컨텍스트를 따라서 중간 플로우 연산자들을 거쳐서 수집자에게 같은 컨텍스트상에서 들어갈 것이다.
이런 구조로 플로우가 기본값으로 돌아가야 비동기 코드가 호출자에 대한 블럭킹없이 가장 깔끔하게 돌아가서 이렇게 설계했다고 함.
이런 Flow의 특성을 컨텍스트 보존이라고 한다.

하지만, 컨텍스트 보존이라는 특성때문에 우리는 flow빌더 내에서 시간이 오래걸리는 작업을 withContext같은 것을 활용해서 다른 컨텍스트상에서 실행하고 방출할 수 없다.

실제로 잘못사용한다면 아래 같은 예외가 발생한다.

Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, Dispatchers.Default].
Please refer to 'flow' documentation or use 'flowOn' instead
at ...

여기 맨 아래에 flowOn을 대신 사용하라는 것을 알 수 있다.

flowOn 연산자

아래 예시는 flowOn을 사용하는 올바른 예시 중 하나다.

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it in CPU-consuming way
        log("Emitting $i")
        emit(i) // emit next value
    }
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder

fun main() = runBlocking<Unit> {
    simple().collect { value ->
        log("Collected $value") 
    } 
}    

실행 결과
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3

flow빌더블록은 작업스레드에서 수행되고, 수집은 메인스레드에서 이루어지고 있다.

  • flowOn의 역할
    컨텍스트에서 CoroutineDispatcher를 변경하고 싶을 때 사용하면 된다. flowOn으로 컨텍스트를 변경한다면, flowOn이 지정된 위치를 기준으로 위의 업스트림 흐름에 대해서 새로운 코루틴을 생성한다. 즉, flowOn을 기준으로 위와 아래의 흐름이 다른 코루틴으로 돌아가는 것이다.

Buffering

위에서 본 것처럼, 플로우의 부분부분들이 서로 다른 코루틴에서 동작하는 것은 플로우의 생성부터 수집까지의 전체적인 과정에서 봤을 때 시간적으로 유용할 수 있다.
예를 들어 플로우의 모든 일련의 과정이 하나의 코루틴(수집을 시작한 코루틴 컨텍스트를 기준)에서 이루어지는 경우에, delay가 중간에 있다면 그 아래 다운스트림을 비롯한 모든 부분에 영향을 줄 수 있다.
아래 코드를 보고 시간을 먼저 예측해보자.

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        simple().collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
    }   
    println("Collected in $time ms")
}

결과
1
2
3
Collected in 1220 ms

약 1.2초가 걸린다.
하지만, 우리는 위 코드를 작성할 때 일반적으로 이런 결과를 원한게 아닐거다.
생성자가 수집자의 시간에 영향을 받는다니?
생성자는 수집자가 소모하든 말든 자신이 데이터를 만들기로 정한 그 시간 텀을 기준으로 계속 데이터를 만드는 것이 올바른 결과일 것이다.
이럴때 buffer()라는 중간 연산자를 사용할 수 있다.

buffer()

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

val time = measureTimeMillis {
    simple()
        .buffer() // buffer emissions, don't wait
        .collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
}   
println("Collected in $time ms")

결과
1
2
3
Collected in 1057 ms

약 1초가 걸렸다.
0.1초만에 하나를 방출하고 그게 버퍼에 들어가고, 하나를 수집하면서 0.3초를 collect에서 멈춘다.하지만, collect에서 delay가 되는 동안 다시 생성자에게 돌아가서 마저 방출을 한다. 그리고 0.3초가 끝났을때 이미 생산자는 3개를 모두 만든 상태임.
그리고, 소비자는 계속 0.3초마다 버퍼에서 데이터를 끄집어내서 소모하면 된다.
즉, 0.1초 + 0.3초(이때 생산자가 나머지 두 개도 다 생성함) + 0.3초 + 0.3초 = 1초

그렇다면, flowOn()과 buffer()는 뭐가 다른것인가?
flowOn도 버퍼링 매커니즘을 사용하는 것은 동일하나 아예 코루틴 디스패처를 바꿔서 실행 컨텍스트를 변경해야할때 사용한다.

Conflation

버퍼링 매커니즘이 적용되는 상황에서, 소비자의 소비 속도가 생산자의 생산 속도보다 느린 경우 그 모든 내용을 소비가 순차적으로 처리하게 하기 싫을 수 있음.
conflation을 사용하면 중간에 생성된 값들을 건너뛰고 가장 최신의 값을 받아서 처리하게 할 수 있다.

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

val time = measureTimeMillis {
    simple()
        .conflate() // conflate emissions, don't process each one
        .collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
}   
println("Collected in $time ms")

결과
1
3
Collected in 758 ms

1을 처리한 다음에, 2와 3이 있을때 2를 건너뛰고 3을 처리한 것을 알 수 있다.

최신 값 처리하기 collectLatest

conflate는 현재 처리하던 것은 다 처리하고 그 다음 버퍼에 있는 것 중 가장 최신의 값을 받는 방식이였다.
하지만, collecLatest는 진행중이던 컬렉터를 취소하고 새 값이 방출될 때마다 다시 시작하는 것임.

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

val time = measureTimeMillis {
    simple()
        .collectLatest { value -> // cancel & restart on the latest value
            println("Collecting $value") 
            delay(300) // pretend we are processing it for 300 ms
            println("Done $value") 
        } 
}   
println("Collected in $time ms")

결과
Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 741 ms

새로운 값이 방출될때마다 취소되고 다시 컬렉터가 시작되므로 마지막 값에 대해서만 완료되는 것을 알 수 있다.

profile
이것저것(안드로이드, 백엔드, AI, 인프라 등) 공부합니다

0개의 댓글