[Kotlin] Coroutine Flow (2)

문승연·2023년 11월 6일
0

Kotlin 코루틴

목록 보기
6/6

Flow 중간 연산자

Flow는 Collections, Sequence와 같이 연산자를 이용해 변환될 수 있다. 이러한 중간 연산자는 업스트림 Flow에 적용되어 다운스트림 Flow를 반환한다. 이러한 연산자들은 Flow만큼 차갑다. 연산자를 호출하는 것 자체는 suspend 함수가 아니다. 빠르게 작동하여 새롭게 변환된 Flow를 반환한다.

기본 중간 연산자들은 map 혹은 filter와 같은 친숙한 이름을 가지고 있다. 이 연산자들이 Sequence에서 사용될 때와 차이점은 이 연산자들 내부 코드 블록에서는 suspend 함수를 호출할 수 있다는 점이다.

예를 들어 요청을 수행하는 것이 오래 걸리는 작업이고 suspend 기능으로 구현되어 있는 경우에도, 요청들을 받는 Flow를 map 연산자를 사용해 결과에 매핑할 수 있다.

suspend fun performRequest(request: Int): String {
    delay(1000) // imitate long-running asynchronous work
    return "response $request"
}

fun main() = runBlocking<Unit> {
    (1..3).asFlow() // a flow of requests
        .map { request -> performRequest(request) }
        .collect { response -> println(response) }
}

위 코드의 결과는 아래와 같으면 각 줄은 이전 줄로부터 1초 후에 나타난다.

response 1
response 2
response 3

Transform 연산자

Flow 변환 연산자들 중 가장 일반적인 것은 transform 이다. map이나 filter와 같은 간단한 변환을 구현할 수도 있고 임의의 횟수만큼 값을 emit 할 수도 있다.

예를 들어 아래와 같이 오래걸리는 비동기 요청을 수행하기 전에 문자열을 방출(emit)하고 그 응답을 기다릴 수 있다.

(1..3).asFlow() // a flow of requests
    .transform { request ->
        emit("Making request $request") 
        emit(performRequest(request)) 
    }
    .collect { response -> println(response) }
Making request 1
response 1
Making request 2
response 2
Making request 3
response 3

이 외에도 방출 크기를 한정할 수 있는 연산자 take 등이 있다.

Flow 터미널 연산자

Flow의 터미널 연산자는 flow의 수집(collection)을 시작하는 일시정지 함수이다. 가장 기본적으로 collect 연산자가 있으며 이외에 toList, toSet, first, single, reduce, fold 등 다양한 터미널 연산자가 존재한다.

val sum = (1..5).asFlow()
    .map { it * it } // squares of numbers from 1 to 5                           
    .reduce { a, b -> a + b } // sum them (terminal operator)
println(sum)
15

Flow는 순차적이다

특수한 연산자를 사용하지 않는 한 각 개별 Flow의 컬렉션은 순차적으로 동작한다. 여기서 컬렉션은 터미널 연산자를 호출하는 Coroutine에서 직접 동작하며 이때 기본적으로 어떠한 새로운 Coroutine도 실행되지 않는다.

방출된 각 값들은 transform과 같은 중간 연산자들에 의해 업스트림에서 다운스트림으로 ㅓ리된 후 터미널 연산자에게 전달된다.

다음은 정수 중 짝수를 필터링한 후 문자열에 매핑하는 코드이다.

(1..5).asFlow()
    .filter {
        println("Filter $it")
        it % 2 == 0              
    }              
    .map { 
        println("Map $it")
        "string $it"
    }.collect { 
        println("Collect $it")
    }
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5

Flow Context

Flow의 수집은 언제나 Coroutine을 호출하는 Context상에서 일어난다. 만약 simple이라는 Flow가 있다면, 다음 코드의 simple flow는 구체적인 구현과 상관없이 코드 작성자가 지정한 Context상에서 실행된다.

이러한 Flow의 성질을 컨텍스트 보존(context preservation)이라 부른다.

withContext(context) {
	simple().collect { value -> 
    	println(value) // run in the specified context
    }
}

따라서 기본적으로 flow { ... } 빌더 내부의 코드는 해당 Flow의 collector가 제공하는 Context 상에서 실행된다. 예를 들어, simple 함수의 구현이 호출되는 스레드를 출력하고 3개의 숫자들을 방출한다고 해보자.

fun simple(): Flow<Int> = flow {
    log("Started simple flow")
    for (i in 1..3) {
        emit(i)
    }
}  

fun main() = runBlocking<Unit> {
    simple().collect { value -> log("Collected $value") } 
}
[main @coroutine#1] Started simple flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3

simple().collect가 메인 스레드에서 호출되므로, simple flow의 body 또한 메인 스레드에서 호출된다.

withContext를 사용할 때 주의점

하지만 CPU를 오래 사용하는 코드는 Dispatchers.Default Context에서 실행되어야 할 수도 있고, UI를 업데이트하는 코드는 Dispatchers.Main Context에서 실행되어야 할 수 있다.

일반적으로 withContext는 Coroutine을 사용하는 코드의 Context를 변경하는데 사용되지만, flow { ... } 빌더의 코드는 Context 보존 특성을 준수해야하므로 다른 Context에서 방출하는 것은 허용되지 않는다.

fun simple(): Flow<Int> = flow {
    // The WRONG way to change context for CPU-consuming code in flow builder
    kotlinx.coroutines.withContext(Dispatchers.Default) {
        for (i in 1..3) {
            Thread.sleep(100) // pretend we are computing it in CPU-consuming way
            emit(i) // emit next value
        }
    }
}

fun main() = runBlocking<Unit> {
    simple().collect { value -> println(value) } 
}

이 코드는 아래의 Exception을 생성한다.

Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
        Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
        but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, Dispatchers.Default].
        Please refer to 'flow' documentation or use 'flowOn' instead
    at ...

다른 CoroutineContext에서 실행하기

앞에서 언급한 것처럼 flow { ... } 빌더 생산자는 수집하는 CoroutineContext 에서 실행된다. 따라서 다른 CoroutineContext 에서 값을 emit 할 수 없다.

하지만 상황에 따라서 다른 Context에서 Flow를 수집하거나 값을 방출해야할 수도 있다. 이렇게 Flow의 Context를 변경하려면 중간 연산자 flowOn 을 사용해야 한다.

flowOn 연산자

flowOn업스트림 흐름의 Context를 변경한다. 즉, 생산자 및 중간 연산자가 flowOn 전에 적용된다. 다운스트림 흐름 (flowOn 이후의 중간 연산자 및 소비자)는 영향을 받지 않으며 흐름에서 collect 하는데 사용되는 CoroutineContext에서 실행된다. flowOn 연산자가 여러 개 있는 경우 각 연산자는 현재 위치에서 업스트림을변경한다.

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData,
    private val defaultDispatcher: CoroutineDispatcher
) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> // Executes on the default dispatcher
                news.filter { userData.isFavoriteTopic(it) }
            }
            .onEach { news -> // Executes on the default dispatcher
                saveInCache(news)
            }
            // flowOn affects the upstream flow ↑
            .flowOn(defaultDispatcher)
            // the downstream flow ↓ is not affected
            .catch { exception -> // Executes in the consumer's context
                emit(lastCachedNews())
            }
}

위 코드에서 onEachmap 연산자는 Dispatchers.Default를 사용하는 반면, catch 연산자와 소비자는 viewModelScope에 사용되는 Dispatchers.Main에서 실행된다.

profile
"비몽(Bemong)"이라는 앱을 개발 및 운영 중인 안드로이드 개발자입니다.

0개의 댓글