Coroutine Flow 란? (2)

쓰리원·2022년 9월 5일
1

Coroutine Flow

목록 보기
2/2
post-thumbnail

공식문서를 기반으로 한번 훑는 것으로는 내용이 머리속에 잘 안들어와서 블로그에 글을 정리해보면서 내용을 익혀보려고 합니다.

1. 안드로이드에서 Flow란?

Android에서 클린아키텍쳐 계층의 Datasource는 일반적으로 UI 데이터 생산자입니다. 이때 사용자 인터페이스(UI)는 최종적으로 데이터를 표시하는 소비자입니다. 그렇지만 우리는 UI와 양방향적으로 상호작용하기 때문에 UI 레이어가 사용자 입력 이벤트의 생산자이고 계층 구조의 다른 레이어가 이 이벤트를 사용하기도 합니다. 생산자와 소비자 사이의 레이어는 일반적으로 다음 레이어의 요구사항에 맞게 조정하기 위해 데이터 스트림을 수정하는 중개자의 역할을 합니다.

2. flow 만들기 - Producer(생산자)

Android에서 클린아키텍쳐 계층의 Datasource는 일반적으로 UI 데이터 생산자라고 위에서 언급했습니다. 위의 경우에 따라서 예제를 따라가서 학습해 보겠습니다. flow를 만들려면 flow builder API를 사용합니다. flow 빌더 함수는 emit 함수를 사용하여 새 값을 수동으로 데이터 스트림에 내보낼 수 있는 새 흐름을 만듭니다.

class NewsRemoteDataSource(
    private val newsApi: NewsApi,
    private val refreshIntervalMs: Long = 5000
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        while(true) {
            val latestNews = newsApi.fetchLatestNews()
            emit(latestNews) // Emits the result of the request to the flow
            delay(refreshIntervalMs) // Suspends the coroutine for some time
        }
    }
}

// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
    suspend fun fetchLatestNews(): List<ArticleHeadline>
}

위의 코드 예에서 데이터 소스는 고정된 간격으로 최신 뉴스를 자동으로 가져옵니다. 정지 함수(suspend fun)는 연속된 값을 여러 개 반환할 수 없으므로, 데이터 소스가 이러한 요구사항을 충족하는 flow를 만들고 반환합니다. 이 경우 데이터 소스가 생산자의 역할을 합니다.

flow 빌더는 코루틴 내에서 실행됩니다. 따라서 동일한 비동기 API의 이점을 활용할 수 있지만 몇 가지 제한사항이 적용됩니다.

  • 흐름이 순차적입니다. 생산자가 코루틴에 있으므로, 정지 함수(suspend fun)를 호출하면 생산자는 정지 함수(suspend fun)가 반환될 때까지 정지 상태로 유지됩니다. 이 예에서 생산자는 fetchLatestNews 네트워크 요청이 완료될 때까지 정지됩니다. 그런 다음에만 결과를 스트림으로 내보냅니다.

  • flow 빌더에서는 생산자가 다른 CoroutineContext의 값을 emit할 수 없습니다. 그러므로 새 코루틴을 만들거나 코드의 withContext 블록을 사용하여 다른 CoroutineContext에서 emit를 호출하면 안됩니다. 이런 경우 callbackFlow 같은 다른 흐름 빌더를 사용할 수 있습니다.

3. flow 빌더에서는 생산자가 다른 CoroutineContext의 값을 emit할 수 없다.

다음은 flow 빌더에서 생산자가 다른 CoroutineContext의 값을 emit할 수 없는 예제입니다.

fun main() = runBlocking {
    val flow = flow {
        withContext(Dispatchers.IO) {
            // 다른 CoroutineContext에서 비동기 작업을 수행한다.
            val result = async {
                delay(1000L)
                "Hello, World!"
            }.await()

            // result 값을 emit한다. (error 발생)
            emit(result)
        }
    }

    flow.collect { value ->
        println(value)
    }
}

위 예제에서는 flow 빌더 안에서 withContext 함수를 사용하여 Dispatchers.IO에서 비동기 작업을 수행하고 있습니다. 이때, async 함수를 사용하여 1초 후에 "Hello, World!" 문자열을 반환하는 작업을 수행하고, await 함수를 사용하여 해당 작업의 결과값을 가져온 뒤, emit 함수를 사용하여 해당 결과값을 emit하려고 합니다.

그러나, emit 함수는 현재의 CoroutineContext에서 실행 중인 코드를 참조하여 값을 emit하기 때문에, withContext 함수 내부에서 생성된 CoroutineContext(Dispatchers.IO)의 값이 아닌, 현재의 CoroutineContext(runBlocking)의 값을 emit하게 됩니다. 이는 emit 함수가 CoroutineScope의 CoroutineContext를 사용하기 때문입니다.

따라서, 위 예제에서는 emit 함수에서 IllegalStateException이 발생하여, 아래와 같은 오류 메시지가 출력됩니다.

Caused by: java.lang.IllegalStateException: Flow invariant is violated:
        Emit was called from a coroutine different from the one that was used to collect the flow. 
        Debugging tip: If you are emitting in a background thread switch to the Dispatchers.Main dispatcher 
        using the 'flowOn' transformation.

이 오류 메시지에서는, emit 함수가 collect 함수가 호출된 coroutine과 다른 coroutine에서 호출되었기 때문에 발생한 오류라는 것을 알려줍니다. 이를 해결하기 위해서는, emit 함수가 withContext 함수 내부에서 생성된 CoroutineContext를 참조하도록 하는 등의 방법을 사용해야 합니다.

4. 스트림 수정 - Intermediary(중간 연산자)

중개자는 중간 연산자를 사용하여 값을 소비하지 않고도 데이터 스트림을 수정할 수 있습니다. 이 연산자는 데이터 스트림에 적용되는 경우 값이 향후에 사용될 때까지 실행되지 않을 작업 체인을 설정하는 함수입니다.

아래 예에서 저장소 레이어는 중간 연산자 map을 사용하여 데이터가 View에 표시되도록 변환합니다.

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData
) {
    /**
     * Returns the favorite latest news applying transformations on the flow.
     * These operations are lazy and don't trigger the flow. They just transform
     * the current value emitted by the flow at that point in time.
     */
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            // Intermediate operation to filter the list of favorite topics
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            // Intermediate operation to save the latest news in the cache
            .onEach { news -> saveInCache(news) }
}

중간 연산자는 시차를 두고 차례로 적용할 수 있어 항목을 흐름에 내보낼 때 느리게 실행되는 작업 체인을 구성할 수 있습니다. 스트림에 중간 연산자를 적용하는 것만으로 흐름 수집이 시작되지는 않습니다.

5. flow에서 수집 - Consumer(소비자)

터미널 연산자를 사용하여 값 수신 대기를 시작하는 흐름을 트리거합니다. 내보낼 때 스트림의 모든 값을 가져오려면 collect를 사용합니다. collect는 정지 함수이므로 코루틴 내에서 실행해야 합니다. 모든 새 값에서 호출되는 매개변수로 람다를 사용합니다. 정지 함수이므로, collect를 호출하는 코루틴은 흐름이 종료될 때까지 정지될 수 있습니다. 이전 예에서 저장소 레이어의 데이터를 사용하는 ViewModel을 간단히 구현하면 다음과 같습니다.

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            // Trigger the flow and consume its elements using collect
            newsRepository.favoriteLatestNews.collect { favoriteNews ->
                // Update View with the latest favorite news
            }
        }
    }
}

흐름을 수집하면 고정된 간격으로 최신 뉴스를 새로고침하고 네트워크 요청 결과를 내보내는 생산자가 트리거 됩니다. 생산자는 while(true) 루프로 항상 활성 상태가 유지되므로 ViewModel이 삭제되어 viewModelScope가 취소되면 데이터 스트림이 종료됩니다.

다음과 같은 이유로 흐름 수집이 중지될 수 있습니다.

  • 이전 예시에 나온 것처럼 수집된 코루틴이 취소된 경우. 이 경우 기본 생산자도 중지됩니다.

  • 생산자가 항목 방출을 완료한 경우. 이 경우 데이터 스트림이 종료되고 collect를 호출한 코루틴이 실행을 다시 시작합니다.

다른 중간 연산자를 통해 지정되지 않은 경우 흐름의 상태는 콜드 및 지연입니다. 즉, 흐름에서 터미널 연산자가 호출될 때마다 생산자 코드가 실행됩니다. 이전 예시에서 흐름 수집기가 여러 개 있으면 데이터 소스가 서로 다른 고정된 간격으로 최신 뉴스를 여러 번 가져옵니다. 여러 소비자가 동시에 수집할 때 흐름을 최적화하고 공유하려면 shareIn 연산자를 사용합니다.

6. 예외 처리 방법

생산자 구현은 서드 파티 라이브러리에서 가져올 수 있습니다. 따라서 예기치 않은 예외가 발생할 수 있습니다. 이러한 예외를 처리하려면 catch 중간 연산자를 사용합니다.

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // Intermediate catch operator. If an exception is thrown,
                // catch and update the UI
                .catch { exception -> notifyError(exception) }
                .collect { favoriteNews ->
                    // Update View with the latest favorite news
                }
        }
    }
}

이전 예시에서 예외가 발생하는 경우 새 항목이 수신되지 않았기 때문에 collect 람다가 호출되지 않습니다. catch는 항목을 흐름에 emit할 수도 있습니다. 대신 예제 저장소 레이어는 캐시된 값을 emit할 수 있습니다.

class NewsRepository(...) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            .onEach { news -> saveInCache(news) }
            // If an error happens, emit the last cached values
            .catch { exception -> emit(lastCachedNews()) }
}

이 예에서는 예외가 발생하면 collect 람다가 호출되므로 예외로 인해 새 항목이 스트림에 내보내집니다.

7. 다른 CoroutineContext에서 실행하기

기본적으로 flow 빌더의 생산자는 수집하는 코루틴의 CoroutineContext에서 실행됩니다. 앞에서 언급한 것처럼 다른 CoroutineContext에서 값을 emit할 수 없습니다. 이 동작은 경우에 따라 원하지 않는 동작일 수도 있습니다. 예를 들어, 이 주제 전체에 사용된 예에서 저장소 레이어는 viewModelScope가 사용하는 Dispatchers.Main에서 작업을 실행하면 안 됩니다.

흐름의 CoroutineContext를 변경하려면 중간 연산자 flowOn을 사용합니다. flowOn은 업스트림 흐름의 CoroutineContext를 변경합니다. 즉, 생산자 및 중간 연산자가 flowOn 전에(또는 위에) 적용됩니다. 다운스트림 흐름(flowOn 이후의 중간 연산자 및 소비자)은 영향을 받지 않으며 흐름에서 collect하는 데 사용되는 CoroutineContext에서 실행됩니다. flowOn 연산자가 여러 개 있는 경우 각 연산자는 현재 위치에서 업스트림을 변경합니다.

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData,
    private val defaultDispatcher: CoroutineDispatcher
) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> // Executes on the default dispatcher
                news.filter { userData.isFavoriteTopic(it) }
            }
            .onEach { news -> // Executes on the default dispatcher
                saveInCache(news)
            }
            // flowOn affects the upstream flow ↑
            .flowOn(defaultDispatcher)
            // the downstream flow ↓ is not affected
            .catch { exception -> // Executes in the consumer's context
                emit(lastCachedNews())
            }
}

이 코드에서 onEach 및 map 연산자는 defaultDispatcher를 사용하는 반면, catch 연산자와 소비자는 viewModelScope에 사용되는 Dispatchers.Main에서 실행됩니다. 데이터 소스 레이어가 I/O 작업을 수행하므로, I/O 작업에 최적화된 디스패처를 사용해야 합니다.

class NewsRemoteDataSource(
    ...,
    private val ioDispatcher: CoroutineDispatcher
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        // Executes on the IO dispatcher
        ...
    }
        .flowOn(ioDispatcher)
}

8. Jetpack 라이브러리의 Flow

Flow는 많이 사용되는 Android 서드 파티 라이브러리인 다수의 Jetpack 라이브러리에 통합됩니다. Flow는 실시간 데이터 업데이트 및 무제한 데이터 스트림에 아주 적합합니다.

Flow with Room을 사용하여 데이터베이스 변경 알림을 받을 수 있습니다. 데이터 액세스 객체(DAO)를 사용하는 경우 실시간 업데이트를 받으려면 Flow 유형을 반환합니다.

@Dao
abstract class ExampleDao {
    @Query("SELECT * FROM Example")
    abstract fun getExamples(): Flow<List<Example>>
}

Example 테이블이 변경될 때마다 데이터베이스의 새 항목이 포함된 새 목록이 내보내집니다.

9. 콜백 기반 API를 흐름으로 변환

callbackFlow는 콜백 기반 API를 흐름으로 변환할 수 있는 흐름 빌더입니다. 예를 들어 Firebase Firestore Android API는 콜백을 사용합니다. 이 API를 흐름으로 변환하고 Firestore 데이터베이스 업데이트를 수신 대기하려면 다음 코드를 사용하면 됩니다.

class FirestoreUserEventsDataSource(
    private val firestore: FirebaseFirestore
) {
    // Method to get user events from the Firestore database
    fun getUserEvents(): Flow<UserEvents> = callbackFlow {

        // Reference to use in Firestore
        var eventsCollection: CollectionReference? = null
        try {
            eventsCollection = FirebaseFirestore.getInstance()
                .collection("collection")
                .document("app")
        } catch (e: Throwable) {
            // If Firebase cannot be initialized, close the stream of data
            // flow consumers will stop collecting and the coroutine will resume
            close(e)
        }

        // Registers callback to firestore, which will be called on new events
        val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
            if (snapshot == null) { return@addSnapshotListener }
            // Sends events to the flow! Consumers will get the new events
            try {
                offer(snapshot.getEvents())
            } catch (e: Throwable) {
                // Event couldn't be sent to the flow
            }
        }

        // The callback inside awaitClose will be executed when the flow is
        // either closed or cancelled.
        // In this case, remove the callback from Firestore
        awaitClose { subscription?.remove() }
    }
}

flow 빌더와 달리 callbackFlow에서는 send 함수를 사용하여 다른 CoroutineContext에서 값을 내보내거나 offer 함수 사용하여 코루틴 외부로 값을 내보낼 수 있습니다.

내부적으로 callbackFlow는 개념상 차단 큐와 매우 유사한 채널을 사용합니다. 채널은 버퍼링 가능한 요소의 최대 수인 용량으로 구성됩니다. callbackFlow에서 생성된 채널의 기본 용량은 요소 64개입니다. 전체 채널에 새 요소를 추가하려는 경우 send는 새 요소를 위한 공간이 생길 때까지 생산자를 정지하는 반면, offer는 채널에 요소를 추가하지 않고 즉시 false를 반환합니다.

10. reference

https://developer.android.com/kotlin/flow?hl=ko

profile
가장 아름다운 정답은 서로의 협업안에 있다.

1개의 댓글

comment-user-thumbnail
2023년 10월 5일

잘 읽었습니다. 도움 많이 되었어요~~!
좋은 포스팅 감사합니다.

답글 달기