[Kotlin] Flow

sundays·2022년 12월 16일
0

coroutine

목록 보기
2/7

Flow

  • reactive programming framework
  • 여러 값을 순차적으로 내보낼수 있는 유형
  • Flow를 사용하여 데이터베이스에서 실시간 업데이트를 수신할 수 있다
  • 코루틴 기반으로 빌드되어 비동기 식으로 계산할수 있는 데이터 스트림
  • 기본 스레드를 차단하지 않고 다음 값을 생성할 네트워크 요청을 안전하게 만들수 있다

1. Data Stream

1.1. 생산자

  • 스트림에 추가되는 데이터를 생성
  • 코루틴 덕분에 비동기적으로 데이터가 생성될 수 있다

1.2. 중계자

  • 스트림에 내보내는 각각의 값이나 스트림 자체를 수정할 수 있다

1.3. 소비자

  • 스트림의 값을 사용한다

  • Repository = UI 데이터 생산자
  • UI는 데이터를 표시하는 소비자
  • 생산자와 소비자 사이의 레이어는 다음 레이어의 요구사항에 맞게 조정하기 위해 데이터 스트림을 수정하는 중개자의 역할

2. Flow 생성

  • flow 빌더 함수는 emit 함수를 사용하여 새 값을 수동으로 데이터 스트림에 내보낼 수 있음
class NewsRemoteDataSource(
    private val newsApi: NewsApi,
    private val refreshIntervalMs: Long = 5000
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        while(true) {
            val latestNews = newsApi.fetchLatestNews()
            emit(latestNews) // Emits the result of the request to the flow
            delay(refreshIntervalMs) // Suspends the coroutine for some time
        }
    }
}

// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
    suspend fun fetchLatestNews(): List<ArticleHeadline>
}
  • 데이터 소스는 고정된 간격으로 최신 뉴스를 자동으로 가져옵니다
  • 정지 함수는 연속된 값을 여러개 반환할 수 없기 때문에 데이터 소스의 요구사항을 충족하는 흐름을 만들고 반환합니다.
  • fetchLatestNews 네트워크 요청이 완료될때까지 정지하고 난 다음 결과를 스트림으로 내보냅니다
  • flow builder 에서 생산자가 다른 CoroutineContext 의 결과를 emit 할 수 없기 때문에
    새 코루틴을 만들거나 withContext 블록으로 다른 CoroutineContext에서 emit 하지말고 callbackFlow 같은 다른 빌더를 사용하세요

3. Stream 수정

  • 데이터 스트림에 적용되는 경우 값이 향후에 사용될때까지 실행되지 않을 작업 체인을 설정하는 함수
class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData
) {
    /**
     * Returns the favorite latest news applying transformations on the flow.
     * These operations are lazy and don't trigger the flow. They just transform
     * the current value emitted by the flow at that point in time.
     */
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            // Intermediate operation to filter the list of favorite topics
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            // Intermediate operation to save the latest news in the cache
            .onEach { news -> saveInCache(news) }
}

4. Flow Collect

  • collect 는 정지 함수이므로 코루틴 내에서 실행되어야 합니다
  • collect 를 호출하는 코루틴은 흐름이 종료될 때까지 정지될 수 있습니다.
class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            // Trigger the flow and consume its elements using collect
            newsRepository.favoriteLatestNews.collect { favoriteNews ->
                // Update View with the latest favorite news
            }
        }
    }
}
  • 고정된 간격으로 최신 뉴스를 새로고침하고 네트워크에 요청 결과를 내보내는 생산자가 트리거 됩니다
  • while(true) 로 항상 활성 상태이기 때문에 viewmodel이 삭제되어 viewmodelscope가 취소될때 데이터 스트림이 종료됩니다.
  • 수집이 종료 될 경우
    • collect coroutine이 취소 된 경우 기본 생성자도 중지됩니다
    • 생산자가 emit을 완료한 경우 데이터 스트림이 종료되고 collect를 호출한 코루틴이 실행을 다시 시작합니다.
  • flow collect 가 여러개 있으면 데이터 소스가 서로 다른 고정된 간격으로 최신뉴스를 여러번 가져옵니다.
  • 여러 consumer가 동시에 collect할때 flow를 최적화하려면 shareIn 연산자를 사용합니다

5. Exception

  • catch 중간 연산자를 사용합니다
class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // Intermediate catch operator. If an exception is thrown,
                // catch and update the UI
                .catch { exception -> notifyError(exception) }
                .collect { favoriteNews ->
                    // Update View with the latest favorite news
                }
        }
    }
}
  • 예시에서 예외 발생시 새 항목이 수신되지 않아서 collect 람다를 호출하지 않습니다.
class NewsRepository(...) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            .onEach { news -> saveInCache(news) }
            // If an error happens, emit the last cached values
            .catch { exception -> emit(lastCachedNews()) }
}
  • catch 항목을 emit 할 수 도 있습니다. 예외가 발생하면 collect 람다가 호출되므로 예외로 인해 새 항목이 스트림에 내보내집니다.

Other CoroutineContext

  • 다른 CoroutineContext 에서 값을 emit 할수 없습니다. viewModelScope가 사용하는 Dispatcher.Main에서 작업을 실행하면 안됩니다.
  • CoroutineContext를 변경하려면 중간 연산자 flowOn 을 사용합니다
  • flowOn은 업스트림 흐름의 CoroutineContext를 변경합니다.
    • 생산자 및 중간 연산자가 flowOn 전에 적용 됩니다.
    • 다운 스트림 흐름은 영향을 받지 않으며 collect 할때 사용되는 CoroutineContext 에서 실행됩니다.
    • flowOn 연산자가 여러개 있는 경우 각 연산자는 현재 위치에서 업스트림 변경
class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData,
    private val defaultDispatcher: CoroutineDispatcher
) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> // Executes on the default dispatcher
                news.filter { userData.isFavoriteTopic(it) }
            }
            .onEach { news -> // Executes on the default dispatcher
                saveInCache(news)
            }
            // flowOn affects the upstream flow ↑
            .flowOn(defaultDispatcher)
            // the downstream flow ↓ is not affected
            .catch { exception -> // Executes in the consumer's context
                emit(lastCachedNews())
            }
}
  • defaultDispatcher를 사용하는 것은 onEach 및 map 연산자 이다
  • viewModelScope 내 Dispatchers.Main 에서 실행되는 것은 catch 연산자

Jetpack Flow

  • 실시간 데이터 업데이트 및 무제한 데이터 스트림에 사용
  • Flow With Room 을 사용하여 데이터베이스 변경 알림을 받을 수 있음
  • @Dao(Data Access Object) 를 사용하는 경우 실시간 업데이트를 받으려면 Flow 로 반환해야 합니다
@Dao
abstract class ExampleDao {
    @Query("SELECT * FROM Example")
    abstract fun getExamples(): Flow<List<Example>>
}

Callback API

  • callbackFlow 는 콜백 기반 API를 흐름으로 변환할 수 있는 흐름 빌더
  • 다른 CoroutineContext에서 값을 내보내거나 offer 함수를 사용하여 코루틴 외부로 값을 emit 할 수 있습니다
  • 내부적으로 callbackFlow는 개념상 Blocking Queue 와 매우 유사한 channel을 사용합니다
  • callbackflow에서 생성된 채널의 기본 용량은 요소 64개 입니다
  • 전체 채널에 새 요소를 추가하려는 경우
    • send는 새요소를 위한 공간이 생길때까지 생산자를 정지
    • offer는 채널에 요소를 추가하지 않고 즉시 false를 반환

Reference

profile
develop life

0개의 댓글