Flow란 무엇인가?

thsamajiki·2023년 1월 13일
0

Coroutine

목록 보기
6/8
post-thumbnail

코루틴 flow는 코루틴 상에서 리액티브 프로그래밍을 지원하기 위해 만들어진 구현체
코루틴에서 데이터 스트림을 구현하기 위해서는 Flow를 사용해야 한다.

순차적으로 값을 배출해서, 정상적으로 완료하거나 에러를 던지는 비동기 데이터 스트림 입니다. 일시중단할 수 있는 함수는(Suspending function) 비동기적으로 하나의 값을 반환합니다. 하지만 비동기적으로 계산되어진 값을 여러개를 반환할 때 Flow를 사용합니다.

Flow의 사용을 알아보기 위해 데이터 스트림을 살펴보자. 위 데이터 스트림은 아래 세가지 구성요소를 가진다.

Producer (생산자)
Intermediary (중간 연산자)
Consumer (소비자)

데이터 스트림의 구성요소가 어떻게 작동하는 지 알아보면서 Flow의 사용법을 알아보자

DataStream : Producer

생산자는 데이터를 발행한다. 주로 Local 또는 Remote의 DataSource에서 데이터를 가져온다.

Flow에서의 Producer는 emit()을 통해 데이터를 생성한다.

class RemoteDataSource(
	private val remoteApi: RemoteApi
) {
    // 먼저 flow scope를 선언
    fun getObjectFlow(): Flow<List<Object>> = flow {
        while(true) {
            val objs = remoteApi.fetchLastedObject() // remote 서버로 부터 데이터를 받아옴
            emit(objs)   // emit으로 데이터를 발행
            delay(60000) // 60초 마다 반복
        }
    }
}

DataStream : Intermediary
생산자가 데이터를 생성했으면 중간 연산자는 생성된 데이터를 수정한다.

여기서 생성자가 A라는 객체의 데이터를 발행했지만 B라는 객체 데이터가 필요한 경우 Flow에서 지원하는 중간 연산자를 이용해 A객체를 B객체로 바꿀 수 있다.

map, filter ,onEach

class ObjectRepository(
    private val objectRemoteDataSorce: ObjectRemoteDataSorce
) {
    fun getObjectOfViewItem(locale : Locale) =
        objectRemoteDataSorce.getObjectFlow().map{ it.filter (this.prop == prop)
}

View에 모든 처리가 완료된 가공된 데이터만을 전달하는 것이 좋다.

이를 위해 Intermediary에서 전달하기 위해 데이터를 가공한다.

DataStream : Consumer

중간연산자가 생산자가 생성한 데이터를 가공하여 소비자로 데이터를 전달한다.

안드로이드에서 소비자라 함은 UI 구성요소를 생각하면 된다.

Flow에서는 collect를 이용해 전달된 데이터를 소비할 수 있다.

class ObjectViewModel(
    private val objectRepository: ObjectRepository
) : ViewModel() {
    fun collectObjectOf(prop: Prop) =
        viewModelScope.launch {
            dustRepository.getObjectFlow().collect { obj ->
                text = obj.prop ...
            }
        }
    }
}

받은 object 데이터를 이용하여 viewModel 에서 필요한 처리를 하고, View에서 사용하면 된다.




코루틴에서 데이터 스트림을 구현하기 위해서의 구성요소는 다음과 같습니다.

Producer(생산자)
Intermediary(중간 연산자) - 선택사항
Consumer(소비자)

  1. Flow 빌더

flow {}를 사용하는 것이 가장 기본적인 플로우 빌더입니다. emit() 이외에 asFlow()를 통하여 Collection 및 Sequence를 Flow로 변환 할 수 있습니다.

(1..10).asFlow().collect { value ->
    println(value)
}
  1. Producer(생산자)

먼저 생산자에서는 데이터를 발행하기 위하여 flow {} 코루틴 블록(빌더)을 생성한 후 내부에서 emit()을 통하여 데이터를 생성합니다. 또한 flow {} 블록은 suspend 함수이므로 delay를 호출할 수 있습니다.

fun flowSomething(): Flow<Int> = flow {
    repeat(10) {
        emit(it) // 0 1 2 3..9
        delay(100L) // 100ms
    }
}
  1. Intermediary(중간 연산자)

생산자에서 데이터를 발행을 하였다면 중간 연산자는 생성된 데이터를 수정 할 수 있습니다. 코틀린의 컬렉션의 함수와 같이 대표적으로 map(데이터를 원하는 형태로 변환), filter(데이터 필터링), onEach(데이터를 변경후 수행한 결과를 반환) 등이 있습니다.

flowSomethings().filter {
    it % 2 == 0 // 짝수만 필터링
}
  
flowSomethings().map {
    it * 2 // 값을 2배씩
}
  1. Consumer(소비자)

생산자에서 데이터를 발행하고, 중간 연산자(선택)에서 데이터를 가공하였다면 소비자에서는 collect()를 이용하여 전달된 데이터를 소비할 수 있습니다.

fun main() = runBlocking {
    flowSomething().map {
        it * 2
    }.collect { value ->
        println(value)
    }
}
// 0 2 4 6 8 .. 18
  1. flow 더 알아보기
    Flow는 코루틴을 기반으로 빌드되며 여러 값을 제공할 수 있습니다. Flow는 비동기식으로 계산할 수 있는 데이터 스트림의 개념입니다. 내보낸 값은 동일한 유형이어야 합니다. 예를 들어 Flow는 정수 값을 내보내는 흐름입니다.

Flow는 값 시퀀스를 생성하는 Iterator와 비슷하지만 suspend 함수를 사용하여 값을 비동기적으로 생성하고 사용합니다. 예를 들어 Flow는 기본 스레드를 차단하지 않고 다음 값을 생성할 네트워크 요청을 안전하게 만들 수 있습니다.

  1. 간단한 flow 사용
fun flowSomething(): Flow<Int> = flow {
    repeat(10) {
        emit(it) //0 1 2 3 ... 9
        delay(10L)
    }
}

fun main() = runBlocking {
    flowSomething().collect { value ->
        println(value)
    }
}

/*
결과

0
1
2
3  
...
 
9
*/

flow 플로우 빌더 함수를 이용해서 코드블록을 구성하고 emit을 호출해서 스트림에 데이터를 흘려 보냅니다. 플로우는 콜드 스트림이기 때문에 요청 측에서 collect를 호출해야 값을 발생하기 시작합니다.

콜드 스트림 - 요청이 있는 경우에 보통 1:1로 값을 전달하기 시작.
핫 스트림 - 0개 이상의 상대를 향해 지속적으로 값을 전달.

  1. 플로우 취소
fun flowSomething(): Flow<Int> = flow {
    repeat(10) {
        emit(it) //0 1 2 3 ... 9
        delay(100L)
    }
}

fun main() = runBlocking<Unit> {
    val result = withTimeoutOrNull(500L) {
        flowSomething().collect { value ->
            println(value)
        }
        true
    } ?: false
    if (!result) {
        println("취소되었습니다.")
    }
}

withTimeoutOrNull을 이용해 취소할 수 있습니다. withTimeoutOrNull(500L)라고 하게 되면 500ms 가 되었을 때 withTimeoutOrNull이 null을 리턴하기 때문에 결과적으로 false를 반환하게 됩니다.

  1. 플로우 빌더 flowOf

flow 이외에도 몇가지 flowOf, asFlow등의 플로우 빌더가 있습니다. flowOf의 경우 여러 값을 인자로 전달해 플로우를 만듭니다.

fun main() = runBlocking<Unit> {
    flowOf(1, 2, 3, 4, 5).collect { value ->
        println(value)
    }
}

/*
결과

1
2
3
4
5
*/
  1. 플로우 빌더 asFlow

asFlow는 컬렉션이나 시퀀스를 전달해 플로우를 만들 수 있습니다.

fun main() = runBlocking<Unit> {
    listOf(1, 2, 3, 4, 5).asFlow().collect { value ->
        println(value)
    }
    (6..10).asFlow().collect {
        println(it)
    }
}

/*
결과

1
2
3
4
5
6
7
8
9
10
*/
  1. flow 연산하기
  2. flow와 map

플로우에서 map 연산을 통해 데이터를 가공할 수 있습니다.

fun flowSomething(): Flow<Int> = flow {
    repeat(10) {
        emit(it)
        delay(10L)
    }
}

fun main() = runBlocking {
    flowSomething().map {
        "$it $it"
    }.collect { value ->
        println(value)
    }
}

/*
결과

0 0
1 1
2 2

...

9 9
*/
  1. flow와 filter
fun main() = runBlocking<Unit> {
    (1..20).asFlow().filter {
        (it % 2) == 0 //짝수만 필터링
    }.collect {
        println(it)
    }
}

결과

2
4
6

...

20
  1. filterNot

만약 홀수만 남기고 싶을 때 코드를 홀수에 맞게 수정할 수 도 있습니다. 하지만 짝수를 출력하는 코드를 그대로 두고 filterNot을 사용할 수도 있습니다.

fun main() = runBlocking<Unit> {
    (1..20).asFlow().filterNot {
        (it % 2) == 0
    }.collect {
        println(it)
    }
}
  1. transform 연산자

위의 중간 연산자(map, filter)은 요소마다 1개의 변환밖에 하지 못하지만 변환 연산자(transform)은 예시처럼 emit()을 추가하여 요소마다 여러개의 변환이 가능하게 해줍니다.

suspend fun someCalc(i: Int): Int {
    delay(10L)
    return i * 2
}

fun main() = runBlocking<Unit> {
    (1..20).asFlow().transform {
        emit(it)
        emit(someCalc(it))
    }.collect {
        println(it)
    }
}

/*
결과
1
2
2
4
3
6
4
8

...
*/
  1. take 연산자

take 연산자는 몇개의 수행 결과만 취합니다.

suspend fun someCalc(i: Int): Int {
    delay(10L)
    return i * 2
}

fun main() = runBlocking<Unit> {
    (1..20).asFlow().transform {
        emit(it)
        emit(someCalc(it))
    }.take(5)
    .collect {
        println(it)
    }
}

/*
결과

1
2
2
4
3
*/
  1. takeWhile 연산자

takeWhile을 이용해 조건을 만족하는 동안만 값을 가져오게 할 수 있습니다.

suspend fun someCalc(i: Int): Int {
    delay(10L)
    return i * 2
}

fun main() = runBlocking<Unit> {
    (1..20).asFlow().transform {
        emit(it)
        emit(someCalc(it))
    }.takeWhile {
        it < 15
    }.collect {
        println(it)
    }
}
  1. drop 연산자

drop 연산자는 처음 몇개의 결과를 버립니다. take가 takeWhile을 가지듯 dropWhile도 있습니다.

suspend fun someCalc(i: Int): Int {
    delay(10L)
    return i * 2
}

fun main() = runBlocking<Unit> {
    (1..20).asFlow().transform {
        emit(it)
        emit(someCalc(it))
    }.drop(5)
    .collect {
        println(it)
    }
}
profile
안드로이드 개발자

1개의 댓글

comment-user-thumbnail
2023년 1월 30일

글 잘 읽고 가요 :)👍

답글 달기