깨알 개념/Kotlin

[Kotlin] Coroutines Flow - Flow.combine()과 Flow.stateIn()

interfacer_han 2024. 8. 29. 19:35

#1 개요

 

combine

Returns a Flow whose values are generated with transform function by combining the most recently emitted values by each flow. It can be demonstrated with the following example: val flow = flowOf(1, 2).onEach { delay(10) }val flow2 = flowOf("a", "b", "c").o

kotlinlang.org

 

stateIn

Converts a cold Flow into a hot StateFlow that is started in the given coroutine scope, sharing the most recently emitted value from a single running instance of the upstream flow with multiple downstream subscribers. See the StateFlow documentation for th

kotlinlang.org

Flow의 확장 함수인 Flow.combine() 및 Flow.stateIn()을 살펴본다. 
 

#2 Flow.combine()

#2-1 개요

Flow.combine()은 2개 이상의 Flow를 조합(Combine)하여 새로운 Flow를 만드는 확장함수다.

 

#2-2 조합(combine)할 재료가 전부 Cold Flow인 경우

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.flow

// 온도 Flow
fun tempFlow(): Flow<Int> = flow {
    for (i in 33 downTo 26) {
        delay(1000)
        emit(i)
    }
}

// 구름 Flow
fun cloudFlow(): Flow<String> = flow {
    val cloudList = arrayListOf("맑음", "흐림", "먹구름", "맑음")

    for (cloud in cloudList) {
        delay(3000)
        emit(cloud)
    }
}

// 습도 Flow
fun humFlow(): Flow<Int> = flow {
    for (i in 40..70 step 5) {
        delay(2000)
        emit(i)
    }
}

fun main() {
    val combinedFlow = combine(tempFlow(), cloudFlow(), humFlow()) { temp, cloud, hum ->
        "온도: $temp°C, 구름: $cloud, 습도: $hum%"
    }

    CoroutineScope(Dispatchers.Default).launch {
        var printCount = 0

        combinedFlow.collect { result ->
            println("(${++printCount}) $result")
        }
    }

    runBlocking {
        delay(20000)
    }
}

/* ↑ ↑ ↑ 출력 결과
(1) 온도: 32°C, 구름: 맑음, 습도: 40%
(2) 온도: 31°C, 구름: 맑음, 습도: 40%
(3) 온도: 31°C, 구름: 맑음, 습도: 45%
(4) 온도: 30°C, 구름: 맑음, 습도: 45%
(5) 온도: 29°C, 구름: 맑음, 습도: 45%
(6) 온도: 29°C, 구름: 흐림, 습도: 50%
(7) 온도: 28°C, 구름: 흐림, 습도: 50%
(8) 온도: 27°C, 구름: 흐림, 습도: 50%
(9) 온도: 27°C, 구름: 흐림, 습도: 55%
(10) 온도: 26°C, 구름: 흐림, 습도: 55%
(11) 온도: 26°C, 구름: 먹구름, 습도: 55%
(12) 온도: 26°C, 구름: 먹구름, 습도: 60%
(13) 온도: 26°C, 구름: 맑음, 습도: 60%
(14) 온도: 26°C, 구름: 맑음, 습도: 65%
(15) 온도: 26°C, 구름: 맑음, 습도: 70%
*/

tempFlow, cloudFlow, humFlow를 조합해 combinedFlow라는 새로운 Flow를 만들었다. combinedFlow가 emit()하는 부분을 보면, 3개의 Flow를 재료로 만들었기에 3개의 매개변수(temp, cloud, hum)가 사용된 모습을 확인할 수 있다.

 

이때, tempFlow는 총 8개의 값을 emit()하고 cloudFlow는 4개, humFlow는 7개의 값을 emit()한다. 그렇다면,  combinedFlow는 총 8 × 4 × 7 = 224개의 값을 emit()하게 될까? 그렇지 않다. combinedFlow는 재료로 사용한 Flow들의 모든 조합의 경우의 수를 도출하는 목적이 아니라, 최신 값들의 조합을 최대한 빨리 표시하는데 그 목적이 있기 때문이다. 따라서, 재료로 사용된 Flow들 각각의 emit() 시점이 비슷한 경우 emit()된 값이 무시되는 경우도 생긴다. 그래서 위 코드의 출력 결과의 갯수도 15개인 것이다.

#2-3 조합(combine)할 재료에 Hot Flow도 있는 경우 (SharedFlow)

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

// 온도 Flow
fun tempFlow(): Flow<Int> = flow {
    for (i in 33 downTo 26) {
        delay(1000)
        emit(i)
    }
}

// 구름 Flow
fun cloudFlow() = MutableSharedFlow<String>()

// 습도 Flow
fun humFlow(): Flow<Int> = flow {
    for (i in 40..70 step 5) {
        delay(2000)
        emit(i)
    }
}

fun main() {
    val cloudFlowInstance = cloudFlow()
    val combinedFlow = combine(tempFlow(), cloudFlowInstance, humFlow()) { temp, cloud, hum ->
        "온도: $temp°C, 구름: $cloud, 습도: $hum%"
    }

    CoroutineScope(Dispatchers.Default).launch {
        var printCount = 0

        combinedFlow.collect { result ->
            println("(${++printCount}) $result")
        }
    }

    CoroutineScope(Dispatchers.Default).launch {
        val cloudList = arrayListOf("맑음", "흐림", "먹구름", "맑음")

        for (cloud in cloudList) {
            delay(3000)
            cloudFlowInstance.emit(cloud)
        }
    }

    runBlocking {
        delay(20000)
    }
}

/* ↑ ↑ ↑ 출력 결과
(1) 온도: 32°C, 구름: 맑음, 습도: 40%
(2) 온도: 31°C, 구름: 맑음, 습도: 40%
(3) 온도: 31°C, 구름: 맑음, 습도: 45%
(4) 온도: 30°C, 구름: 맑음, 습도: 45%
(5) 온도: 29°C, 구름: 맑음, 습도: 45%
(6) 온도: 29°C, 구름: 흐림, 습도: 45%
(7) 온도: 29°C, 구름: 흐림, 습도: 50%
(8) 온도: 28°C, 구름: 흐림, 습도: 50%
(9) 온도: 27°C, 구름: 흐림, 습도: 50%
(10) 온도: 27°C, 구름: 흐림, 습도: 55%
(11) 온도: 26°C, 구름: 흐림, 습도: 55%
(12) 온도: 26°C, 구름: 먹구름, 습도: 55%
(13) 온도: 26°C, 구름: 먹구름, 습도: 60%
(14) 온도: 26°C, 구름: 맑음, 습도: 60%
(15) 온도: 26°C, 구름: 맑음, 습도: 65%
(16) 온도: 26°C, 구름: 맑음, 습도: 70%
*/

#2-2의 코드 속 cloudFlow를 Hot Flow로 변경했다. 출력 결과에 크게 달라진 점은 없다. 출력 결과의 갯수가 15개 에서 16개로 변경되긴 했지만, 이는 애초에 같은 코드를 반복해서 실행해도 1 ~ 2개씩 다르다.

 

#2-4 Hot Flow에 초깃값을 늦게 준다면? (SharedFlow)

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

// 온도 Flow
fun tempFlow(): Flow<Int> = flow {
    for (i in 33 downTo 26) {
        delay(1000)
        emit(i)
    }
}

// 구름 Flow
fun cloudFlow() = MutableSharedFlow<String>()

// 습도 Flow
fun humFlow(): Flow<Int> = flow {
    for (i in 40..70 step 5) {
        delay(2000)
        emit(i)
    }
}

fun main() {
    val cloudFlowInstance = cloudFlow()
    val combinedFlow = combine(tempFlow(), cloudFlowInstance, humFlow()) { temp, cloud, hum ->
        "온도: $temp°C, 구름: $cloud, 습도: $hum%"
    }

    CoroutineScope(Dispatchers.Default).launch {
        var printCount = 0

        combinedFlow.collect { result ->
            println("(${++printCount}) $result")
        }
    }

    runBlocking {
        delay(10000) // 10초
        println("초깃값을 보유하지 않은 Flow를 재료로 combine()하면 해당 Flow에 값이 들어올 때까지 collect()가 수행되지 않음")
    }

    CoroutineScope(Dispatchers.Default).launch {
        val cloudList = arrayListOf("맑음", "흐림", "먹구름", "맑음")

        for (cloud in cloudList) {
            delay(3000)
            cloudFlowInstance.emit(cloud)
        }
    }

    runBlocking {
        delay(20000)
    }
}

/* ↑ ↑ ↑ 출력 결과
초깃값을 보유하지 않은 Flow를 재료로 combine()하면 해당 Flow에 값이 들어올 때까지 collect()가 수행되지 않음
(1) 온도: 26°C, 구름: 맑음, 습도: 65%
(2) 온도: 26°C, 구름: 맑음, 습도: 70%
(3) 온도: 26°C, 구름: 흐림, 습도: 70%
(4) 온도: 26°C, 구름: 먹구름, 습도: 70%
(5) 온도: 26°C, 구름: 맑음, 습도: 70%
*/

#2-3와 같은 코드지만, collecter를 붙이고 나서 10초 뒤에 Hot Flow가 값을 emit()하게 만들었다. 이러면 combinedFlow는 어떻게 작동할까? 즉, combinedFlow는 그 재료 Flow인 cloudFlow가 10초 동안 초깃값을 보유하지 않은 상태로 방치되는 것이다. SharedFlow인 cloudFlow가 값을 방출하기 전까지 combinedFlow.collect()가 실행되지 않다가, cloudFlow가 값을 출력하면 비로소 실행된다.

 

출력 결과를 보면, collector를 붙인 시점에, cloudFlow를 제외한 나머지 Flow들은 이미 값을 emit()하고 있었음을 알 수 있다. 왜냐하면 tempFlow는 32°C부터 값을 emit()하는데, 출력 결과에는 해당 온도가 보이지 않기 때문이다. cloudFlow가 10초 동안 값을 emit()하지 않는 동안 온도가 26°C까지 내려간 것이다.

 

#2-5 초깃값이 항상 존재하는 Hot Flow라면? (StateFlow)

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

// 온도 Flow
fun tempFlow(): Flow<Int> = flow {
    for (i in 33 downTo 26) {
        delay(1000)
        emit(i)
    }
}

// 구름 Flow
fun cloudFlow() = MutableStateFlow("구름 정보 없음 (초깃값)")

// 습도 Flow
fun humFlow(): Flow<Int> = flow {
    for (i in 40..70 step 5) {
        delay(2000)
        emit(i)
    }
}

fun main() {
    val cloudFlowInstance = cloudFlow()
    val combinedFlow = combine(tempFlow(), cloudFlowInstance, humFlow()) { temp, cloud, hum ->
        "온도: $temp°C, 구름: $cloud, 습도: $hum%"
    }

    CoroutineScope(Dispatchers.Default).launch {
        var printCount = 0

        combinedFlow.collect { result ->
            println("(${++printCount}) $result")
        }
    }

    CoroutineScope(Dispatchers.Default).launch {
        val cloudList = arrayListOf("맑음", "흐림", "먹구름", "맑음")

        for (cloud in cloudList) {
            delay(3000)
            cloudFlowInstance.emit(cloud)
        }
    }

    runBlocking {
        delay(20000)
    }
}

/* ↑ ↑ ↑ 출력 결과
(1) 온도: 33°C, 구름: 구름 정보 없음 (초깃값), 습도: 40%
(2) 온도: 32°C, 구름: 구름 정보 없음 (초깃값), 습도: 40%
(3) 온도: 32°C, 구름: 맑음, 습도: 40%
(4) 온도: 31°C, 구름: 맑음, 습도: 40%
(5) 온도: 31°C, 구름: 맑음, 습도: 45%
(6) 온도: 30°C, 구름: 맑음, 습도: 45%
(7) 온도: 29°C, 구름: 맑음, 습도: 45%
(8) 온도: 29°C, 구름: 흐림, 습도: 45%
(9) 온도: 29°C, 구름: 흐림, 습도: 50%
(10) 온도: 28°C, 구름: 흐림, 습도: 50%
(11) 온도: 27°C, 구름: 흐림, 습도: 50%
(12) 온도: 27°C, 구름: 흐림, 습도: 55%
(13) 온도: 26°C, 구름: 흐림, 습도: 55%
(14) 온도: 26°C, 구름: 먹구름, 습도: 55%
(15) 온도: 26°C, 구름: 먹구름, 습도: 60%
(16) 온도: 26°C, 구름: 맑음, 습도: 60%
(17) 온도: 26°C, 구름: 맑음, 습도: 65%
(18) 온도: 26°C, 구름: 맑음, 습도: 70%
*/

그렇다면, cloudFlow가 SharedFlow가 아닌, Hot Flow의 또다른 형태인 StateFlow였다면? 이 경우는 combindFlow가 대기하는 일이 없다. 왜냐하면, StateFlow가 지닌 초깃값을 사용하면 되기 때문이다. 실제로, StateFlow는 리스너(collector)가 생기는 즉시 자신이 보유한 값(StateFlow.value)를 해당 리스너에게 전달한다. 즉, 이미 누구보다 빠르게 emit()이 된다는 얘기다.

 

#3 Flow.stateIn()

#3-1 개요

fun <T> Flow<T>.stateIn(
    scope: CoroutineScope, 
    started: SharingStarted, 
    initialValue: T
): StateFlow<T>

Flow.stateIn()은 Flow를 StateFlow로 변경한다. StateFlow는 초깃값이 존재하는 구조이므로 매개변수에 initialValue가 존재하는 것도 확인할 수 있다. 안드로이드 Jetpack Compose에서 stateIn()은 combine()과 단짝이다. combine()된 Flow를 StateFlow로 만들어, Jetpack Compose에서 사용할 수 있는 형태로 바꿀 수 있기 때문이다.
 

#3-2 기본적인 코드

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

// 온도 Flow
fun tempFlow(): Flow<Int> = flow {
    for (i in 33 downTo 26) {
        delay(1000)
        emit(i)
    }
}

// 구름 Flow
fun cloudFlow(): Flow<String> = flow {
    val cloudList = arrayListOf("맑음", "흐림", "먹구름", "맑음")

    for (cloud in cloudList) {
        delay(3000)
        emit(cloud)
    }
}

// 습도 Flow
fun humFlow(): Flow<Int> = flow {
    for (i in 40..70 step 5) {
        delay(2000)
        emit(i)
    }
}

fun main() {
    val combinedFlow = combine(tempFlow(), cloudFlow(), humFlow()) { temp, cloud, hum ->
        "온도: $temp°C, 구름: $cloud, 습도: $hum%"
    }

    val stateFlow = combinedFlow.stateIn(
        scope = CoroutineScope(Dispatchers.Default),
        started = SharingStarted.Lazily,
        initialValue = "초기화 중..."
    )

    CoroutineScope(Dispatchers.Default).launch {
        var printCount = 0

        stateFlow.collect { result ->
            println("(${++printCount}) $result")
        }
    }

    runBlocking {
        delay(20000)
    }
}

/* ↑ ↑ ↑ 출력 결과
(1) 초기화 중...
(2) 온도: 32°C, 구름: 맑음, 습도: 40%
(3) 온도: 31°C, 구름: 맑음, 습도: 40%
(4) 온도: 31°C, 구름: 맑음, 습도: 45%
(5) 온도: 30°C, 구름: 맑음, 습도: 45%
(6) 온도: 29°C, 구름: 맑음, 습도: 45%
(7) 온도: 29°C, 구름: 흐림, 습도: 45%
(8) 온도: 29°C, 구름: 흐림, 습도: 50%
(9) 온도: 28°C, 구름: 흐림, 습도: 50%
(10) 온도: 27°C, 구름: 흐림, 습도: 50%
(11) 온도: 27°C, 구름: 흐림, 습도: 55%
(12) 온도: 26°C, 구름: 흐림, 습도: 55%
(13) 온도: 26°C, 구름: 먹구름, 습도: 55%
(14) 온도: 26°C, 구름: 먹구름, 습도: 60%
(15) 온도: 26°C, 구름: 맑음, 습도: 65%
(16) 온도: 26°C, 구름: 맑음, 습도: 70%
*/

맨 먼저 초깃값이 출력된다. 나머지는 출력 결과는 이전과 같다.

#4 요약

Flow를 하나로 합쳐 리스너(collector)를 붙일 수 있다. 합쳐진 Flow를 Jetpack Compose에서 사용하기 좋은 StateFlow로 변환할 수도 있다.