깨알 개념/Kotlin

[Kotlin] Coroutines Flow - 기초

interfacer_han 2024. 8. 1. 19:09

#1 Coroutines Flow

#1-1 개요

 

Flow

Flow An asynchronous data stream that sequentially emits values and completes normally or with an exception. Intermediate operators on the flow such as map, filter, take, zip, etc are functions that are applied to the upstream flow or flows and return a do

kotlinlang.org

Flow는 내부적으로 Coroutine을 사용해 비동기적으로 데이터 스트림을 처리하는 API다. 이를 반응형 프로그래밍이라고도 한다. 반응형 프로그래밍을 한 마디로 정의하면, "데이터의 변경을 감지하고 선언적으로 프로그래밍하여 View를 업데이트"하는 프로그래밍 방식이다.
 

#1-2 CoroutineScope.produce와의 비교

 

[Kotlin] Coroutines - Coroutine builder

#1 Coroutine builder kotlinx-coroutines-coreCore primitives to work with coroutines. Coroutine builder functions: Coroutine dispatchers implementing CoroutineDispatcher: More context elements: Synchronization primitives for coroutines: Top-level suspendin

kenel.tistory.com

데이터 스트림하면, 위 게시글의 #5에 나와있는 Coroutine builder인 produce()가 생각날 것이다. 그리고 역할도 서로 같다. 물론 완전히 같지는 않은데 그 차이를 비유하자면, produce()가 Sqlite라면 Flow는 Room라고 할 수 있다. 즉, produce()는 Flow에 비해 개발자의 직접 (데이터 스트림 채널 등의) 관리가 요구되며 더 저수준이다. 반면 Flow는 더 선언적인 API로 코드가 구성되어있고, 암시적인 방법으로 코드의 안정성을 올려주는 등 더 고수준의 코드다.

여담으로 Room은 대놓고 Sqlite의 추상화 계층인 반면, Flow는 내부적으로 코루틴을 사용할 뿐 produce()의 직접적인 추상화 계층이라고까지 말하기는 어렵다고 한다.
 

#2 구조 - 기초

#2-1 produce()

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.produce

// ReceiveChannel 생성 함수
@OptIn(ExperimentalCoroutinesApi::class)
fun produceNumbers(): ReceiveChannel<Int> {
    return CoroutineScope(Dispatchers.Default).produce {
        for (x: Int in 1..5) {
            delay(1000)
            send(x) // 값을 채널로 보냄
        }
    }
}

fun main() {
    val channel: ReceiveChannel<Int> = produceNumbers() // ReceiveChannel 생성

    runBlocking {
        for (value in channel) { // value = channel.receive()와 같은 동작
            println("produce() $value")
        }
    }

    println("Done receiving")

    channel.cancel() // ReceiveChannel을 닫음
}

/* ↑ ↑ ↑ 출력 결과
produce() 1
produce() 2
produce() 3
produce() 4
produce() 5
Done receiving
*/

먼저, 이미 배워서 친숙한 CoroutineScope.produce를 사용한 코드다. 코드 속 for문에 대해 설명하자면, ReceiveChannel 인터페이스는 ChannelIterator를 제공한다. 이 ChannelIterator는 채널로부터 더 받을 값이 있는지, 채널이 닫히지 않았는지를 감지하며 순회를 수행한다. 채널로부터 더 받을 값이 있으며 채널이 닫히지 않았다면 무한정으로 순회한다. 위의 코드에서는 더 이상 보낼 값이 없으므로 for문 순회가 종료된다.
 
이 코드와 같은 역할을 하는 코드를 Flow를 이용해 짜보면,
 

#2-2 Flow

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking

// flow 생성 함수
fun numberFlow(): Flow<Int> = flow {
    for (i in 1..5) {
        delay(1000)
        emit(i) // 숫자 i를 '방출'
    }
}

fun main() {
    val flow: Flow<Int> = numberFlow()

    runBlocking {
        flow.collect { value ->
            println("coroutineFlow $value")
        }
    }

    println("Done collecting")
}

/* ↑ ↑ ↑ 출력 결과
coroutineFlow 1
coroutineFlow 2
coroutineFlow 3
coroutineFlow 4
coroutineFlow 5
Done collecting
*/

#2-1와 같은 역할을 하는 Flow 코드다. #2-1의 receive()나 바로 위에 있는 collect() 모두 suspend 키워드가 붙는 함수이기 때문에, runBlocking { ... } 블록 내에서 모든 작업이 완료된 후에야 runBlocking { ... } 블록 밖의 코드가 실행된다. 출력 결과와 같이 말이다. 아래에서 Flow의 코드를 구석구석 살펴본다.
 
return과 emit()
예를 들어, 반환 타입이 Int인 '일반적인' 함수를 호출하면 한 번의 함수 호출로 단 하나의 반환 값(Int)을 얻게 된다. 하지만, Int 타입의 Flow를 사용하면 여러 시간 동안 여러 반환 값(Int)을 얻게 된다. 일반적인 함수가 반환을 위해 return을 쓰듯, Flow에서는 반환을 위해 emit()을 사용한다. 위 코드에서도 emit()을 이용한 '반환'을 총 5번 수행했다.
 
interface Flow<out T>
produce()는 ReceiveChannel<T>라는 반환형을 지닌다. 즉, T를 ReceiveChannel로 감싸 캡슐화하여 반환한다. 반면, Flow는 캡슐화없이 값을 그대로 반환한다. 이로인해 Flow는 코드가 더 간결하고 선언이다. 하지만, 동시에 데이터의 생산 및 소비를 더 명시적으로 세밀하게 조정하기는 어렵다라는 (캡슐화를 안해서 생기는) 단점도 있다. 이 단점을 극복할 수 있는 channelFlow()라는 Flow도 있다. 이 함수는 Flow와 channel의 장점을 결합해 채널 기반의 데이터 흐름을 선언적으로 처리할 수 있게 만들어준다고 한다.
 
suspend fun collect()
Flow로부터, 즉 데이터 스트림으로부터 반환된 값을 '수집(collect)'한다. ReceiveChannel.receive()에 대응되는 함수다. collect()는 수집하는 대상 Flow가 '완료'될 때까지 프로그래머가 지정한 작업을 반복해서 실행한다.
 
명시적 종료 불가능 (지원 안 함)
어떤 flow { ... }에서 모든 emit()이 수행되고 flow { ... } 블록이 끝나면 Flow가 '완료' 처리되고, 해당 Flow를 collect()하는 작업 또한 자동으로 종료된다. 이 외에 직접적으로 Flow를 중지시키는 방법은 없다. ReceiveChannel이 close()라는 멤버 함수를 제공해 명시적 종료가 가능한 것과 대조된다 (여담으로, 위에서 나온 channelFlow { ... } 내에서는 close() 사용이 가능하다). Flow를 어떻게든 중지시키려면 해당 Flow를 별도의 코루틴에 넣고 그 코루틴을 cancel()시키는 간접적인 방식을 사용해야 한다. 또는, 명시적으로 종료 가능한 collect()를 대신 중지함으로서 Flow를 중지하는 것과 같은 효과를 볼 수도 있다.
 

#3 Flow가 '완료'되었다는 것의 의미

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking

suspend fun main() {
    val myFlow = flow {
        emit("1")
        delay(1000)
        emit("2")
        delay(1000)
        emit("3")
        // 더 이상 수행될 emit()이 없으므로 여기에서 flow는 종료된다.
    }

    runBlocking {
        myFlow.collect { value ->
            println("Collected value: $value")
        }
        println("\'수집\' 완료")
    }

    // 한번 더 collect()
    runBlocking {
        myFlow.collect { value ->
            println("Collected value: $value")
        }
        println("\'수집\' 완료")
    }
}

/* ↑ ↑ ↑ 출력 결과
Collected value: 1
Collected value: 2
Collected value: 3
'수집' 완료
Collected value: 1
Collected value: 2
Collected value: 3
'수집' 완료
*/

#2에서 설명한 바에 따르면, Flow는 명시적 종료가 불가능하다. 물론, 위 코드는 명시적으로 종료할 것도 없이 더 이상 출력할 emit()이 없으니 myFlow가 암시적으로 완료되고, collect() 또한 알아서 종료된다. 하지만, "myFlow라는 프로퍼티에 담긴 flow { ... } 인스턴스 자체가 소멸한 것은 아니지 않은가?" 라는 궁금증이 들어서 myFlow를 한번 더 collect()해봤다.
 
결과는 첫번째 collect()와 동일했다. myFlow는 분명 완료되었으므로 아무것도 출력되지 않아야 정상이 아닌가? myFlow의 완료란 대체 어떤 의미일까? 이에 대한 답은 바로 코틀린의 기본 Flow가 Cold Flow 방식의 데이터 스트림이라는 데에 있다. 아래 게시글에서 자세히 설명해본다.

 

Cold Flow와 Hot Flow

#1 Cold Flow와 Hot Flow#1-1 개요데이터 스트림은 크게 Cold Flow와 Hot Flow로 나눌 수 있다. 이 분류 기준에 대해 알아본다. 또, Kotlin의 Coroutines Flow를 활용한 예시 코드로 Cold Flow 및 Hot Flow를 구현해본다. #

kenel.tistory.com

 

#4 요약

Flow는 데이터 스트림 처리를 위해 고안된 고수준 API다.