코루틴 flow는 코루틴 상에서 리액티브 프로그래밍을 지원하기 위해 만들어진 구현체
코루틴에서 데이터 스트림을 구현하기 위해서는 Flow를 사용해야 한다.
순차적으로 값을 배출해서, 정상적으로 완료하거나 에러를 던지는 비동기 데이터 스트림 입니다. 일시중단할 수 있는 함수는(Suspending function) 비동기적으로 하나의 값을 반환합니다. 하지만 비동기적으로 계산되어진 값을 여러개를 반환할 때 Flow를 사용합니다.
Flow의 사용을 알아보기 위해 데이터 스트림을 살펴보자. 위 데이터 스트림은 아래 세가지 구성요소를 가진다.
Producer (생산자)
Intermediary (중간 연산자)
Consumer (소비자)
데이터 스트림의 구성요소가 어떻게 작동하는 지 알아보면서 Flow의 사용법을 알아보자
DataStream : Producer
생산자는 데이터를 발행한다. 주로 Local 또는 Remote의 DataSource에서 데이터를 가져온다.
Flow에서의 Producer는 emit()을 통해 데이터를 생성한다.
class RemoteDataSource(
private val remoteApi: RemoteApi
) {
// 먼저 flow scope를 선언
fun getObjectFlow(): Flow<List<Object>> = flow {
while(true) {
val objs = remoteApi.fetchLastedObject() // remote 서버로 부터 데이터를 받아옴
emit(objs) // emit으로 데이터를 발행
delay(60000) // 60초 마다 반복
}
}
}
DataStream : Intermediary
생산자가 데이터를 생성했으면 중간 연산자는 생성된 데이터를 수정한다.
여기서 생성자가 A라는 객체의 데이터를 발행했지만 B라는 객체 데이터가 필요한 경우 Flow에서 지원하는 중간 연산자를 이용해 A객체를 B객체로 바꿀 수 있다.
map, filter ,onEach
class ObjectRepository(
private val objectRemoteDataSorce: ObjectRemoteDataSorce
) {
fun getObjectOfViewItem(locale : Locale) =
objectRemoteDataSorce.getObjectFlow().map{ it.filter (this.prop == prop)
}
View에 모든 처리가 완료된 가공된 데이터만을 전달하는 것이 좋다.
이를 위해 Intermediary에서 전달하기 위해 데이터를 가공한다.
DataStream : Consumer
중간연산자가 생산자가 생성한 데이터를 가공하여 소비자로 데이터를 전달한다.
안드로이드에서 소비자라 함은 UI 구성요소를 생각하면 된다.
Flow에서는 collect를 이용해 전달된 데이터를 소비할 수 있다.
class ObjectViewModel(
private val objectRepository: ObjectRepository
) : ViewModel() {
fun collectObjectOf(prop: Prop) =
viewModelScope.launch {
dustRepository.getObjectFlow().collect { obj ->
text = obj.prop ...
}
}
}
}
받은 object 데이터를 이용하여 viewModel 에서 필요한 처리를 하고, View에서 사용하면 된다.
코루틴에서 데이터 스트림을 구현하기 위해서의 구성요소는 다음과 같습니다.
Producer(생산자)
Intermediary(중간 연산자) - 선택사항
Consumer(소비자)
flow {}를 사용하는 것이 가장 기본적인 플로우 빌더입니다. emit() 이외에 asFlow()를 통하여 Collection 및 Sequence를 Flow로 변환 할 수 있습니다.
(1..10).asFlow().collect { value ->
println(value)
}
먼저 생산자에서는 데이터를 발행하기 위하여 flow {} 코루틴 블록(빌더)을 생성한 후 내부에서 emit()을 통하여 데이터를 생성합니다. 또한 flow {} 블록은 suspend 함수이므로 delay를 호출할 수 있습니다.
fun flowSomething(): Flow<Int> = flow {
repeat(10) {
emit(it) // 0 1 2 3..9
delay(100L) // 100ms
}
}
생산자에서 데이터를 발행을 하였다면 중간 연산자는 생성된 데이터를 수정 할 수 있습니다. 코틀린의 컬렉션의 함수와 같이 대표적으로 map(데이터를 원하는 형태로 변환), filter(데이터 필터링), onEach(데이터를 변경후 수행한 결과를 반환) 등이 있습니다.
flowSomethings().filter {
it % 2 == 0 // 짝수만 필터링
}
flowSomethings().map {
it * 2 // 값을 2배씩
}
생산자에서 데이터를 발행하고, 중간 연산자(선택)에서 데이터를 가공하였다면 소비자에서는 collect()를 이용하여 전달된 데이터를 소비할 수 있습니다.
fun main() = runBlocking {
flowSomething().map {
it * 2
}.collect { value ->
println(value)
}
}
// 0 2 4 6 8 .. 18
Flow는 값 시퀀스를 생성하는 Iterator와 비슷하지만 suspend 함수를 사용하여 값을 비동기적으로 생성하고 사용합니다. 예를 들어 Flow는 기본 스레드를 차단하지 않고 다음 값을 생성할 네트워크 요청을 안전하게 만들 수 있습니다.
fun flowSomething(): Flow<Int> = flow {
repeat(10) {
emit(it) //0 1 2 3 ... 9
delay(10L)
}
}
fun main() = runBlocking {
flowSomething().collect { value ->
println(value)
}
}
/*
결과
0
1
2
3
...
9
*/
flow 플로우 빌더 함수를 이용해서 코드블록을 구성하고 emit을 호출해서 스트림에 데이터를 흘려 보냅니다. 플로우는 콜드 스트림이기 때문에 요청 측에서 collect를 호출해야 값을 발생하기 시작합니다.
콜드 스트림 - 요청이 있는 경우에 보통 1:1로 값을 전달하기 시작.
핫 스트림 - 0개 이상의 상대를 향해 지속적으로 값을 전달.
fun flowSomething(): Flow<Int> = flow {
repeat(10) {
emit(it) //0 1 2 3 ... 9
delay(100L)
}
}
fun main() = runBlocking<Unit> {
val result = withTimeoutOrNull(500L) {
flowSomething().collect { value ->
println(value)
}
true
} ?: false
if (!result) {
println("취소되었습니다.")
}
}
withTimeoutOrNull을 이용해 취소할 수 있습니다. withTimeoutOrNull(500L)라고 하게 되면 500ms 가 되었을 때 withTimeoutOrNull이 null을 리턴하기 때문에 결과적으로 false를 반환하게 됩니다.
flow 이외에도 몇가지 flowOf, asFlow등의 플로우 빌더가 있습니다. flowOf의 경우 여러 값을 인자로 전달해 플로우를 만듭니다.
fun main() = runBlocking<Unit> {
flowOf(1, 2, 3, 4, 5).collect { value ->
println(value)
}
}
/*
결과
1
2
3
4
5
*/
asFlow는 컬렉션이나 시퀀스를 전달해 플로우를 만들 수 있습니다.
fun main() = runBlocking<Unit> {
listOf(1, 2, 3, 4, 5).asFlow().collect { value ->
println(value)
}
(6..10).asFlow().collect {
println(it)
}
}
/*
결과
1
2
3
4
5
6
7
8
9
10
*/
플로우에서 map 연산을 통해 데이터를 가공할 수 있습니다.
fun flowSomething(): Flow<Int> = flow {
repeat(10) {
emit(it)
delay(10L)
}
}
fun main() = runBlocking {
flowSomething().map {
"$it $it"
}.collect { value ->
println(value)
}
}
/*
결과
0 0
1 1
2 2
...
9 9
*/
fun main() = runBlocking<Unit> {
(1..20).asFlow().filter {
(it % 2) == 0 //짝수만 필터링
}.collect {
println(it)
}
}
결과
2
4
6
...
20
만약 홀수만 남기고 싶을 때 코드를 홀수에 맞게 수정할 수 도 있습니다. 하지만 짝수를 출력하는 코드를 그대로 두고 filterNot을 사용할 수도 있습니다.
fun main() = runBlocking<Unit> {
(1..20).asFlow().filterNot {
(it % 2) == 0
}.collect {
println(it)
}
}
위의 중간 연산자(map, filter)은 요소마다 1개의 변환밖에 하지 못하지만 변환 연산자(transform)은 예시처럼 emit()을 추가하여 요소마다 여러개의 변환이 가능하게 해줍니다.
suspend fun someCalc(i: Int): Int {
delay(10L)
return i * 2
}
fun main() = runBlocking<Unit> {
(1..20).asFlow().transform {
emit(it)
emit(someCalc(it))
}.collect {
println(it)
}
}
/*
결과
1
2
2
4
3
6
4
8
...
*/
take 연산자는 몇개의 수행 결과만 취합니다.
suspend fun someCalc(i: Int): Int {
delay(10L)
return i * 2
}
fun main() = runBlocking<Unit> {
(1..20).asFlow().transform {
emit(it)
emit(someCalc(it))
}.take(5)
.collect {
println(it)
}
}
/*
결과
1
2
2
4
3
*/
takeWhile을 이용해 조건을 만족하는 동안만 값을 가져오게 할 수 있습니다.
suspend fun someCalc(i: Int): Int {
delay(10L)
return i * 2
}
fun main() = runBlocking<Unit> {
(1..20).asFlow().transform {
emit(it)
emit(someCalc(it))
}.takeWhile {
it < 15
}.collect {
println(it)
}
}
drop 연산자는 처음 몇개의 결과를 버립니다. take가 takeWhile을 가지듯 dropWhile도 있습니다.
suspend fun someCalc(i: Int): Int {
delay(10L)
return i * 2
}
fun main() = runBlocking<Unit> {
(1..20).asFlow().transform {
emit(it)
emit(someCalc(it))
}.drop(5)
.collect {
println(it)
}
}
글 잘 읽고 가요 :)👍