๊นจ์•Œ ๊ฐœ๋… ๐Ÿ“‘/Kotlin

[Kotlin] Coroutines Flow - Back pressure์™€ ๊ทธ ์ฒ˜๋ฆฌ

interfacer_han 2024. 8. 15. 13:30

#1 ๊ฐœ์š”

Coroutines Flow๋ฅผ ์‚ฌ์šฉํ•  ๋•Œ ์ƒ๊ธธ ์ˆ˜ ์žˆ๋Š” ํ˜„์ƒ์ธ ๋ฐฑ ํ”„๋ ˆ์…”(Back pressure)์— ๋Œ€ํ•ด ์‚ดํŽด๋ณธ๋‹ค. ๋˜, ๋ฐฑ ํ”„๋ ˆ์…”๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š” ๋ฐฉ๋ฒ•๋„ ์‚ดํŽด๋ณธ๋‹ค. ์ด ๋•Œ, ๋ฐฑ ํ”„๋ ˆ์…”๋Š” ์—๋Ÿฌ๊ฐ€ ์•„๋‹Œ ์ž์—ฐ์Šค๋Ÿฌ์šด ํ˜„์ƒ์ผ ๋ฟ์ด๋‹ค. ๋”ฐ๋ผ์„œ, ๋ฐฑ ํ”„๋ ˆ์…”๋ฅผ ํ•ด๊ฒฐํ•œ๋‹ค๋Š” ํ‘œํ˜„์€ ์ •ํ™•ํ•˜์ง€ ์•Š๋‹ค. ๋ฐฑ ํ”„๋ ˆ์…”์— ๋Œ€์ฒ˜ํ•œ๋‹ค๋Š” ํ‘œํ˜„์ด ์˜ณ๋‹ค.
 

#2 ๋ฐฑ ํ”„๋ ˆ์…”

#2-1 ๋ฐฑ ํ”„๋ ˆ์…”๊ฐ€ ์—†๋Š” ์ฝ”๋“œ

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.flow

// 1์ดˆ๋งˆ๋‹ค 1์”ฉ ์ฆ๊ฐ€ํ•˜๋Š” count
fun startCountUp(count: MutableStateFlow<Int>) {
    CoroutineScope(Dispatchers.Default).launch {
        while (true) {
            delay(1000)
            count.value++
        }
    }
}

fun main() {
    // count ์„ ์–ธ ๋ฐ count๋ฅผ 1์ดˆ๋งˆ๋‹ค 1์”ฉ ์ฆ๊ฐ€์‹œํ‚ค๋Š” ์ฝ”๋ฃจํ‹ด ์‹œ์ž‘
    val count = MutableStateFlow(1)
    startCountUp(count)

    // 1, 2, 3, ...์„ ๋ฐฉ์ถœํ•˜๋Š” Flow
    val numberFlow: Flow<Int> = flow {
        for (i in 1..5) {
            delay(1000)
            println("Flow ${i}: ์ƒ์‚ฐ๋จ")
            emit(i) // ์ˆซ์ž i๋ฅผ '๋ฐฉ์ถœ'
        }
    }

    // Flow๋ฅผ collect
    runBlocking {
        numberFlow.collect { value ->
            for (i: Int in 1..11) {
                when (i) {
                    11 -> println("Flow ${value}: ์†Œ๋น„๋จ (์‹œ๊ฐ: ${count.value}์ดˆ)")
                    else -> {
                        delay(30) // ๋ฐ›์€ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š” ์‹œ๊ฐ„ ํ‘œํ˜„
                        println("Flow ${value}: ์ฒ˜๋ฆฌ์ค‘... (${i})")
                    }
                }
            }
        }
    }
}

/* ↑ ↑ ↑ ์ถœ๋ ฅ ๊ฒฐ๊ณผ
Flow 1: ์ƒ์‚ฐ๋จ
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (1)
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (2)
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (3)
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (4)
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (5)
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (6)
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (7)
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (8)
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (9)
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (10)
Flow 1: ์†Œ๋น„๋จ (์‹œ๊ฐ: 2์ดˆ)
Flow 2: ์ƒ์‚ฐ๋จ
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (1)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (2)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (3)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (4)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (5)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (6)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (7)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (8)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (9)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (10)
Flow 2: ์†Œ๋น„๋จ (์‹œ๊ฐ: 3์ดˆ)
Flow 3: ์ƒ์‚ฐ๋จ
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (1)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (2)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (3)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (4)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (5)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (6)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (7)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (8)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (9)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (10)
Flow 3: ์†Œ๋น„๋จ (์‹œ๊ฐ: 4์ดˆ)
Flow 4: ์ƒ์‚ฐ๋จ
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (1)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (2)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (3)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (4)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (5)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (6)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (7)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (8)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (9)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (10)
Flow 4: ์†Œ๋น„๋จ (์‹œ๊ฐ: 6์ดˆ)
Flow 5: ์ƒ์‚ฐ๋จ
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (1)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (2)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (3)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (4)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (5)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (6)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (7)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (8)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (9)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (10)
Flow 5: ์†Œ๋น„๋จ (์‹œ๊ฐ: 7์ดˆ)
*/

Flow์—์„œ emit()๋˜๋Š” ์ฆ‰์‹œ ๋ฐ”๋กœ๋ฐ”๋กœ ์ถœ๋ ฅํ•˜๋Š” ํ‰๋ฒ”ํ•œ Cold Flow ์ฝ”๋“œ๋‹ค. ์ด ์ฝ”๋“œ๋ฅผ ์‚ด์ง ์ˆ˜์ •ํ•ด๋ณด๊ฒ ๋‹ค. 
 

#2-2 ๋ฐฑ ํ”„๋ ˆ์…”๊ฐ€ ๋ฐœ์ƒํ•˜๋Š” ์ฝ”๋“œ

...

fun main() {
    ...
    
    runBlocking {
        numberFlow.collect { value ->
            for (i: Int in 1..11) {
                when (i) {
                    11 -> ...
                    else -> {
                        delay(300) // 30์—์„œ 300์œผ๋กœ ๋ณ€๊ฒฝ
                        ...
                    }
                }
            }
        }
    }
}

/* ↑ ↑ ↑ ์ถœ๋ ฅ ๊ฒฐ๊ณผ
Flow 1: ์ƒ์‚ฐ๋จ
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (1)
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (2)
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (3)
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (4)
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (5)
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (6)
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (7)
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (8)
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (9)
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (10)
Flow 1: ์†Œ๋น„๋จ (์‹œ๊ฐ: 5์ดˆ)
Flow 2: ์ƒ์‚ฐ๋จ
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (1)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (2)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (3)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (4)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (5)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (6)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (7)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (8)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (9)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (10)
Flow 2: ์†Œ๋น„๋จ (์‹œ๊ฐ: 9์ดˆ)
Flow 3: ์ƒ์‚ฐ๋จ
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (1)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (2)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (3)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (4)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (5)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (6)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (7)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (8)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (9)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (10)
Flow 3: ์†Œ๋น„๋จ (์‹œ๊ฐ: 13์ดˆ)
Flow 4: ์ƒ์‚ฐ๋จ
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (1)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (2)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (3)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (4)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (5)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (6)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (7)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (8)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (9)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (10)
Flow 4: ์†Œ๋น„๋จ (์‹œ๊ฐ: 17์ดˆ)
Flow 5: ์ƒ์‚ฐ๋จ
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (1)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (2)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (3)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (4)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (5)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (6)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (7)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (8)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (9)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (10)
Flow 5: ์†Œ๋น„๋จ (์‹œ๊ฐ: 21์ดˆ)
*/

#2-1 ์ฝ”๋“œ๋ฅผ ์ˆ˜์ •ํ•œ ๋ฒ„์ „์ด๋‹ค. numberFlow ์ž์ฒด๋Š” ์•„๋ฌด๋Ÿฐ ๋ณ€ํ™”๊ฐ€ ์—†๋‹ค. ์ฆ‰ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐœ์ƒ์‹œํ‚ค๋Š” ๊ณณ์—์„œ๋Š” ์•„๋ฌด๋Ÿฐ ๋ณ€ํ™”๊ฐ€ ์—†๋‹ค. ๊ทธ๋Ÿฌ๋‚˜ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š” ๊ณณ์˜ ๋”œ๋ ˆ์ด๋ฅผ 10๋ฐฐ ์ฆ๊ฐ€์‹œํ‚ค์ž, ๋ฐ์ดํ„ฐ ์ถœ๋ ฅ์— ์ง€์—ฐ์ด ๋ฐœ์ƒํ–ˆ๋‹ค.
 
์ด์™€ ๊ฐ™์ด, ๋ฐ์ดํ„ฐ์˜ ์ƒ์‚ฐ ์†๋„๋ฅผ ๋ฐ์ดํ„ฐ์˜ ์†Œ๋น„ ์†๋„๊ฐ€ ๋”ฐ๋ผ๊ฐ€์ง€ ๋ชปํ•˜๋Š” ํ˜„์ƒ์„ ๋ฐฑ ํ”„๋ ˆ์…”๋ผ๊ณ  ์ผ์ปซ๋Š”๋‹ค. ์ด๋ฅธ๋ฐ” ๊ณต๊ธ‰ ๊ณผ์ž‰์ด๋‹ค.

#3 ๋ฐฑ ํ”„๋ ˆ์…” ๋Œ€์ฒ˜๋ฒ•

#3-1 buffer()๋ฅผ ์ด์šฉํ•ด collect() ๋ณ‘๋ ฌํ™”

...
import kotlinx.coroutines.flow.buffer

...

fun main() {
    ...

    runBlocking {
        numberFlow.buffer().collect { value -> // Flow ๋Œ€์‹  Flow.buffer()๋ฅผ collect()
            ...
        }
    }
}

/* ↑ ↑ ↑ ์ถœ๋ ฅ ๊ฒฐ๊ณผ
Flow 1: ์ƒ์‚ฐ๋จ
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (1)
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (2)
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (3)
Flow 2: ์ƒ์‚ฐ๋จ
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (4)
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (5)
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (6)
Flow 3: ์ƒ์‚ฐ๋จ
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (7)
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (8)
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (9)
Flow 4: ์ƒ์‚ฐ๋จ
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (10)
Flow 1: ์†Œ๋น„๋จ (์‹œ๊ฐ: 5์ดˆ)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (1)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (2)
Flow 5: ์ƒ์‚ฐ๋จ
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (3)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (4)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (5)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (6)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (7)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (8)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (9)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (10)
Flow 2: ์†Œ๋น„๋จ (์‹œ๊ฐ: 8์ดˆ)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (1)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (2)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (3)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (4)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (5)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (6)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (7)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (8)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (9)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (10)
Flow 3: ์†Œ๋น„๋จ (์‹œ๊ฐ: 11์ดˆ)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (1)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (2)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (3)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (4)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (5)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (6)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (7)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (8)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (9)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (10)
Flow 4: ์†Œ๋น„๋จ (์‹œ๊ฐ: 14์ดˆ)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (1)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (2)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (3)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (4)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (5)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (6)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (7)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (8)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (9)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (10)
Flow 5: ์†Œ๋น„๋จ (์‹œ๊ฐ: 17์ดˆ)
*/

๋จผ์ €, Flow ๋Œ€์‹  Flow.buffer()๋ฅผ collect()ํ•œ๋‹ค. ์ถœ๋ ฅ ๊ฒฐ๊ณผ๋ฅผ ๋ณด๋ฉด, emit๋œ ๋ฐ์ดํ„ฐ๋ฅผ ์†Œ๋น„ ์™„๋ฃŒํ•˜์ง€ ์•Š์€ ์ƒํƒœ์ž„์—๋„ ์ตœ์‹  ๋ฐ์ดํ„ฐ๋ฅผ ์ƒ์‚ฐํ•˜๊ณ  ์žˆ์Œ์„ ์•Œ ์ˆ˜ ์žˆ๋‹ค. ์ƒ์‚ฐ๋ถ„์€ ์ƒ์‚ฐ๋˜๋Š”๋Œ€๋กœ ์ผ๋‹จ buffer์— ์ €์žฅํ•˜๊ณ , ์†Œ๋น„๋Š” ๋‚˜์ค‘์— ์•Œ์•„์„œ ํ•˜๋Š” ์‹์ด๋‹ค.

ํ•˜์ง€๋งŒ, Flow 5๊ฐ€ ์†Œ๋น„ ์™„๋ฃŒ๋œ ์‹œ๊ฐ์€ 17์ดˆ๋กœ #2-2์˜ ์ฝ”๋“œ์™€ ํฌ๊ฒŒ ๋‹ค๋ฅด์ง€ ์•Š๋‹ค. buffer๊ฐ€ ์žˆ๋“  ์—†๋“ , collect์˜ ๋™์ž‘์€ ์—ฌ์ „ํžˆ ๋‹จ์ผ ์Šค๋ ˆ๋“œ์—์„œ ์ด๋ค„์ง€๋ฏ€๋กœ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ์‹œ๊ฐ„์— ์œ ์˜๋ฏธํ•œ ์ฐจ์ด๊ฐ€ ์ƒ๊ธฐ์ง€ ์บ์•˜๋‹ค. ์ด ๋•Œ buffer์—๋Š” ์ƒ์‚ฐ๋˜๋Š” ์กฑ์กฑ ๋ฐ์ดํ„ฐ๊ฐ€ ์Œ“์ธ๋‹ค๋Š” ์ ์— ์ฐฉ์•ˆํ•˜์—ฌ, ์†Œ๋น„ํ•˜๋Š” ์ธก์—์„œ๋„ buffer์— ๋ฐ์ดํ„ฐ๊ฐ€ ๋“ค์–ด์˜ค๋Š”๋Œ€๋กœ Coroutine์„ ์ˆ˜ํ–‰ํ•œ๋‹ค๋ฉด ์†Œ๋น„ ์‹œ๊ฐ„์„ ์ค„์ผ ์ˆ˜ ์žˆ์„ ๊ฒƒ์ด๋‹ค. ์•„๋ž˜ ์ฝ”๋“œ๋ฅผ ๋ณด์ž.
 

...

fun main() {
    ...
    
    runBlocking {
        numberFlow.buffer().collect { value ->
            launch { // ์ž์‹ ์ฝ”๋ฃจํ‹ด ์ƒ์„ฑ
                ...
            }
        }
    }
}

/* ↑ ↑ ↑ ์ถœ๋ ฅ ๊ฒฐ๊ณผ
Flow 1: ์ƒ์‚ฐ๋จ
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (1)
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (2)
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (3)
Flow 2: ์ƒ์‚ฐ๋จ
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (4)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (1)
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (5)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (2)
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (6)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (3)
Flow 3: ์ƒ์‚ฐ๋จ
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (7)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (4)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (1)
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (8)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (5)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (2)
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (9)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (6)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (3)
Flow 4: ์ƒ์‚ฐ๋จ
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (10)
Flow 1: ์†Œ๋น„๋จ (์‹œ๊ฐ: 5์ดˆ)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (7)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (4)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (1)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (8)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (5)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (2)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (9)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (6)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (3)
Flow 5: ์ƒ์‚ฐ๋จ
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (10)
Flow 2: ์†Œ๋น„๋จ (์‹œ๊ฐ: 6์ดˆ)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (7)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (4)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (1)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (8)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (5)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (2)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (9)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (6)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (3)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (10)
Flow 3: ์†Œ๋น„๋จ (์‹œ๊ฐ: 7์ดˆ)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (7)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (4)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (8)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (5)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (9)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (6)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (10)
Flow 4: ์†Œ๋น„๋จ (์‹œ๊ฐ: 8์ดˆ)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (7)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (8)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (9)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (10)
Flow 5: ์†Œ๋น„๋จ (์‹œ๊ฐ: 9์ดˆ)
*/

collect() ๋‚ด๋ถ€์— ์ž์‹ ์ฝ”๋ฃจํ‹ด์„ ๋งŒ๋“ค์–ด ๋ฐ์ดํ„ฐ ์†Œ๋น„๋ฅผ ๋ณ‘๋ ฌํ™”ํ–ˆ๋‹ค.
 

#3-2 collectLatest()๋ฅผ ์ด์šฉํ•ด ๋ฐ์ดํ„ฐ ์„ ๋ณ„

...

fun main() {
    ...
    
    runBlocking {
        numberFlow.collectLatest { value -> // collect() ๋Œ€์‹  collectLatest() ์‚ฌ์šฉ
            ...
        }
    }
}

/* ↑ ↑ ↑ ์ถœ๋ ฅ ๊ฒฐ๊ณผ
Flow 1: ์ƒ์‚ฐ๋จ
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (1)
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (2)
Flow 1: ์ฒ˜๋ฆฌ์ค‘... (3)
Flow 2: ์ƒ์‚ฐ๋จ
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (1)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (2)
Flow 2: ์ฒ˜๋ฆฌ์ค‘... (3)
Flow 3: ์ƒ์‚ฐ๋จ
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (1)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (2)
Flow 3: ์ฒ˜๋ฆฌ์ค‘... (3)
Flow 4: ์ƒ์‚ฐ๋จ
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (1)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (2)
Flow 4: ์ฒ˜๋ฆฌ์ค‘... (3)
Flow 5: ์ƒ์‚ฐ๋จ
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (1)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (2)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (3)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (4)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (5)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (6)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (7)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (8)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (9)
Flow 5: ์ฒ˜๋ฆฌ์ค‘... (10)
Flow 5: ์†Œ๋น„๋จ (์‹œ๊ฐ: 9์ดˆ)
*/

collectLatest()๋Š” ์ตœ์‹  ๋ฐ์ดํ„ฐ๋งŒ์„ ์„ ๋ณ„ํ•œ๋‹ค. ์ถœ๋ ฅ ๊ฒฐ๊ณผ๋ฅผ ๋ณด๋ฉด, ์ด์ „ ๋ฐ์ดํ„ฐ๋ฅผ ์†Œ๋น„ํ•˜๋Š” ๋„์ค‘ ์ตœ์‹  ๋ฐ์ดํ„ฐ๊ฐ€ ์ƒ์‚ฐ๋˜๋ฉด ์ด์ „ ๋ฐ์ดํ„ฐ๋ฅผ ๊ทธ๋ƒฅ ๋ฒ„๋ฆฐ๋‹ค๋Š” ๊ฑธ ์•Œ ์ˆ˜ ์žˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด ํ˜„์žฌ ๋‚ ์”จ ๋ฐ์ดํ„ฐ์˜ ๊ฒฝ์šฐ, ์ตœ์‹  ๋ฐ์ดํ„ฐ๊ฐ€ ์ƒ์‚ฐ๋œ ์ด์ƒ ์ด์ „ ๋ฐ์ดํ„ฐ๋Š” ๊ฐ€์น˜๊ฐ€ 0์— ์ˆ˜๋ ดํ•œ๋‹ค. ์ด๋Ÿฐ ๊ฒฝ์šฐ ์‚ฌ์šฉํ• ๋งŒํ•˜๋‹ค.
 

#4 ์š”์•ฝ

๋ฐฑ ํ”„๋ ˆ์…”๋Š” ๊ณต๊ธ‰ ๊ณผ์ž‰์ด๋‹ค. ๊ณต๊ธ‰์„ ๋ณ‘๋ ฌํ™”(buffer + coroutine)ํ•˜๊ฑฐ๋‚˜, ์„ ๋ณ„(collectLatest)ํ•˜์—ฌ ๋Œ€์ฒ˜ํ•œ๋‹ค.