깨알 개념/기타

Cold Flow와 Hot Flow (SharedFlow)

interfacer_han 2024. 8. 7. 08:57

#1 Cold Flow와 Hot Flow

#1-1 개요

데이터 스트림은 크게 Cold Flow와 Hot Flow로 나눌 수 있다. 이 분류 기준에 대해 알아본다. 또, Kotlin의 Coroutines Flow를 활용해 간단한 Cold Flow 및 Hot Flow를 구현해본다. 
 

#1-2 훌륭한 비유

 

What is the hot flow and cold flow in coroutines and the difference between them?

I am mastering Kotlin coroutines and trying to figure out 1- what is hot flow and cold flow ? 2- what is the main difference between them? 3- when to use each one?

stackoverflow.com

위 링크는 Cold Flow와 Hot Flow의 차이를 묻는 질문글이다. 이 질문에 대한 답변 중 아주 훌륭한 비유를 한 답변이 있다. 그 답변을 번역하면,
 

콜드 플로우(Cold flow): 리스너(Listener)가 존재해야만 데이터가 방출됨.

실생활 예시
당신은 거실 TV로 스파이더맨: 어크로스 더 유니버스를 보려고 한다. 거실 TV는 IPTV라서, 이 영화는 내가 원할 때 즉 상품을 결제해서 재생 버튼을 누를 때 상영된다.

핫 플로우(Hot flow): 데이터가 리스너(Listener)의 존재 여부와 관계없이 방출됨.

실생활 예시
당신은 영화 바벤하이머를 보러 가려고 영화관으로 차를 몰았다. 그러나 가는 도중 교통 체증에 걸려, 제 시간에 도착하지 못했다. 그렇다고 지각한 당신을 위해 영화 시작이 미뤄지는 일은 없을 것이다.

Cold Flow는 VOD와 같고, Hot Flow는 영화관과 같다. 중요한 키워드는 리스너의 존재다. 내가 관찰해야만 진행되는 데이터 스트림은 콜드 플로우다. 반면, 내가 없어도 알아서 착착 진행되는 데이터 스트림은 핫 플로우다.
 

#1-3 각각의 용도

Cold Flow 및 Hot Flow는 그 특성에 맞게, 서로 사용되는 용도가 다르다. Cold Flow는 리스너(Listener, 또는 Subcriber) 개개인이 보내는 데이터베이스 Query 요청이 대표적이다. Hot Flow는 온라인 RPG 게임에서 필드에 생성(리젠)되는 몬스터를 예로 들 수 있다. 내가 필드로 나가지 않고 마을에만 서있을지라도, 필드에는 몬스터가 차 있어야 한다. 다른 사람들과 동시 접속하는 온라인 RPG니까 말이다
 
여담

더보기
더보기
더보기

요즘에는 서버의 자원 낭비를 막기 위해서 필드에 플레이어가 한 명 이상 존재해야 몬스터를 생성하는 게임도 많다. 이 경우 몬스터의 생성 트리거는 Cold Flow와 관련이 있게 되지만, 생성이 완료된 몬스터 데이터는 여전히 Hot Flow다. 만약 생성된 몬스터 데이터가 Cold Flow라면, 몬스터와 싸우는 나의 모습이 다른 플레이어 입장에서는 허공에 칼질을 하는 모습으로 보일테니 말이다.

 

#2 코드

#2-1 build.gradle.kts의 dependencies에 Coroutines 라이브러리 추가

plugins {
    ...
}

...

repositories {
    ...
}

dependencies {
    ...

    // Coroutines
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3")
}

tasks.test {
    ...
}
kotlin {
    ...
}

#2-2 및 #2-3에서 사용할 라이브러리를 다운로드한다.

 

#2-2 Cold Flow의 예시

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking

fun main() {
    val coldFlow = flow {
        emit("1")
        delay(1000)
        emit("2")
        delay(1000)
        emit("3")
        // 더 이상 수행될 emit()이 없으므로 여기에서 flow는 종료된다.
    }

    runBlocking {
        coldFlow.collect { value ->
            println("(리스너 1) Collected value: $value")
        }
        println("(리스너 1) \'수집\' 완료")
    }

    // 한번 더 collect()
    runBlocking {
        coldFlow.collect { value ->
            println("(리스너 2) Collected value: $value")
        }
        println("(리스너 2) \'수집\' 완료")
    }
}

/* ↑ ↑ ↑ 출력 결과
(리스너 1) Collected value: 1
(리스너 1) Collected value: 2
(리스너 1) Collected value: 3
(리스너 1) '수집' 완료
(리스너 2) Collected value: 1
(리스너 2) Collected value: 2
(리스너 2) Collected value: 3
(리스너 2) '수집' 완료
*/

collect()할 때마다 리스너가 새로 등록된다. 즉, #1-2의 예시에 있는 VOD 서비스가 2번 요청된 셈이다.
 

#2-3 Hot Flow의 예시 (SharedFlow)

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableSharedFlow

fun main() {
    /* MutableSharedFlow<Int>(replay = 3)과 같이 매개변수를 넣어줄 수도 있다.
     * replay에 인수를 전달하면,
     * 리스너가 새로 생겨날 때,
     * replay 값만큼 최근에 방출(emit)된 데이터를 다시 받을 수 있다.
     * 말 그대로 'replay'다.
     */
    val hotFlow = MutableSharedFlow<Int>()

    CoroutineScope(Dispatchers.Default).launch {
        hotFlow.collect { value ->
            println("(리스너 1) Collected value: $value")
        }
    }

    runBlocking {
        delay(1000) // emit(1) 전까지 위에 있는 collect() 구문이 넉넉히 완료되도록, delay를 넣어줌.
        hotFlow.emit(1)
        delay(1000)
        hotFlow.emit(2)
        delay(1000)
        hotFlow.emit(3)
        delay(1000)
        /* 더 이상 수행될 emit()이 없지만,
         * Hot Flow는 종료되지 않는다.
         * 왜냐하면 미래에 다시 emit()될 가능성이 있기 때문이다.
         */
    }

    // 한번 더 collect()
    CoroutineScope(Dispatchers.Default).launch {
        hotFlow.collect { value ->
            println("(리스너 2) Collected value: $value")
        }
    }

    runBlocking {
        delay(1000)
        hotFlow.emit(4)
        delay(1000)
        hotFlow.emit(5)
        delay(1000)
    }
}

/* ↑ ↑ ↑ 출력 결과
(리스너 1) Collected value: 1
(리스너 1) Collected value: 2
(리스너 1) Collected value: 3
(리스너 1) Collected value: 4
(리스너 2) Collected value: 4
(리스너 1) Collected value: 5
(리스너 2) Collected value: 5
*/

여기서 사용된 Flow는 SharedFlow로, 코틀린의 대표적인 Hot Flow 인터페이스다. #2-2의 코드와는 다르게 '수집' 완료라는 메시지를 출력하는 코드가 없다. Hot Flow는 무한한 탄성을 지닌 고무줄처럼 그 끝이 없기 때문이다. 예를 들어, 위 코드의 마지막 부분에 hotFlow.emit(6)를 넣으면 두 리스너의 수명은 그만큼 더 늘어난다.
 
그렇기 때문에 #2-2의 코드와는 다르게 collect()를 runBlocking { ... }이 아니라 CoroutineScope.launch { ... } 블록에 넣은 것이다. #2-2에 있는 runBlocking { ... } 속에 들어있는 것은 Cold Flow의 collect()라서 언젠간 끝나며, 따라서 runBlocking { ... } 블록을 탈출할 것을 기대할 수 있다. 하지만 그 안에 Cold Flow가 아닌 Hot Flow의 collect()가 들어있게 된다면, 그 runBlocking { ... }은 영원히 끝나지 않는다.
 
실제로 runBlocking { ... } 블록에 Hot Flow의 collect()를 넣으면 그 runBlocking { ... } 블록 뒤에 있는 모든 코드에 "Unreachable code"라는 경고 메시지가 뜬다.
 

#3 요약

Cold Flow는 VOD와 같고, Hot Flow는 영화관과 같다.