깨알 개념/Kotlin

[Kotlin] Coroutines Flow - Back pressure와 그 처리

interfacer_han 2024. 8. 15. 13:30

#1 개요

Coroutines Flow를 사용할 때 생길 수 있는 현상인 백 프레셔(Back pressure)에 대해 살펴본다. 또, 백 프레셔를 처리하는 방법도 살펴본다. 이 때, 백 프레셔는 에러가 아닌 자연스러운 현상일 뿐이다. 따라서, 백 프레셔를 해결한다는 표현은 정확하지 않다. 백 프레셔에 대처한다는 표현이 옳다.
 

#2 백 프레셔

#2-1 백 프레셔가 없는 코드

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.flow

// 1초마다 1씩 증가하는 count
fun startCountUp(count: MutableStateFlow<Int>) {
    CoroutineScope(Dispatchers.Default).launch {
        while (true) {
            delay(1000)
            count.value++
        }
    }
}

fun main() {
    // count 선언 및 count를 1초마다 1씩 증가시키는 코루틴 시작
    val count = MutableStateFlow(1)
    startCountUp(count)

    // 1, 2, 3, ...을 방출하는 Flow
    val numberFlow: Flow<Int> = flow {
        for (i in 1..5) {
            delay(1000)
            println("Flow ${i}: 생산됨")
            emit(i) // 숫자 i를 '방출'
        }
    }

    // Flow를 collect
    runBlocking {
        numberFlow.collect { value ->
            for (i: Int in 1..11) {
                when (i) {
                    11 -> println("Flow ${value}: 소비됨 (시각: ${count.value}초)")
                    else -> {
                        delay(30) // 받은 데이터를 처리하는 시간 표현
                        println("Flow ${value}: 처리중... (${i})")
                    }
                }
            }
        }
    }
}

/* ↑ ↑ ↑ 출력 결과
Flow 1: 생산됨
Flow 1: 처리중... (1)
Flow 1: 처리중... (2)
Flow 1: 처리중... (3)
Flow 1: 처리중... (4)
Flow 1: 처리중... (5)
Flow 1: 처리중... (6)
Flow 1: 처리중... (7)
Flow 1: 처리중... (8)
Flow 1: 처리중... (9)
Flow 1: 처리중... (10)
Flow 1: 소비됨 (시각: 2초)
Flow 2: 생산됨
Flow 2: 처리중... (1)
Flow 2: 처리중... (2)
Flow 2: 처리중... (3)
Flow 2: 처리중... (4)
Flow 2: 처리중... (5)
Flow 2: 처리중... (6)
Flow 2: 처리중... (7)
Flow 2: 처리중... (8)
Flow 2: 처리중... (9)
Flow 2: 처리중... (10)
Flow 2: 소비됨 (시각: 3초)
Flow 3: 생산됨
Flow 3: 처리중... (1)
Flow 3: 처리중... (2)
Flow 3: 처리중... (3)
Flow 3: 처리중... (4)
Flow 3: 처리중... (5)
Flow 3: 처리중... (6)
Flow 3: 처리중... (7)
Flow 3: 처리중... (8)
Flow 3: 처리중... (9)
Flow 3: 처리중... (10)
Flow 3: 소비됨 (시각: 4초)
Flow 4: 생산됨
Flow 4: 처리중... (1)
Flow 4: 처리중... (2)
Flow 4: 처리중... (3)
Flow 4: 처리중... (4)
Flow 4: 처리중... (5)
Flow 4: 처리중... (6)
Flow 4: 처리중... (7)
Flow 4: 처리중... (8)
Flow 4: 처리중... (9)
Flow 4: 처리중... (10)
Flow 4: 소비됨 (시각: 6초)
Flow 5: 생산됨
Flow 5: 처리중... (1)
Flow 5: 처리중... (2)
Flow 5: 처리중... (3)
Flow 5: 처리중... (4)
Flow 5: 처리중... (5)
Flow 5: 처리중... (6)
Flow 5: 처리중... (7)
Flow 5: 처리중... (8)
Flow 5: 처리중... (9)
Flow 5: 처리중... (10)
Flow 5: 소비됨 (시각: 7초)
*/

Flow에서 emit()되는 즉시 바로바로 출력하는 평범한 Cold Flow 코드다. 이 코드를 살짝 수정해보겠다. 
 

#2-2 백 프레셔가 발생하는 코드

...

fun main() {
    ...
    
    runBlocking {
        numberFlow.collect { value ->
            for (i: Int in 1..11) {
                when (i) {
                    11 -> ...
                    else -> {
                        delay(300) // 30에서 300으로 변경
                        ...
                    }
                }
            }
        }
    }
}

/* ↑ ↑ ↑ 출력 결과
Flow 1: 생산됨
Flow 1: 처리중... (1)
Flow 1: 처리중... (2)
Flow 1: 처리중... (3)
Flow 1: 처리중... (4)
Flow 1: 처리중... (5)
Flow 1: 처리중... (6)
Flow 1: 처리중... (7)
Flow 1: 처리중... (8)
Flow 1: 처리중... (9)
Flow 1: 처리중... (10)
Flow 1: 소비됨 (시각: 5초)
Flow 2: 생산됨
Flow 2: 처리중... (1)
Flow 2: 처리중... (2)
Flow 2: 처리중... (3)
Flow 2: 처리중... (4)
Flow 2: 처리중... (5)
Flow 2: 처리중... (6)
Flow 2: 처리중... (7)
Flow 2: 처리중... (8)
Flow 2: 처리중... (9)
Flow 2: 처리중... (10)
Flow 2: 소비됨 (시각: 9초)
Flow 3: 생산됨
Flow 3: 처리중... (1)
Flow 3: 처리중... (2)
Flow 3: 처리중... (3)
Flow 3: 처리중... (4)
Flow 3: 처리중... (5)
Flow 3: 처리중... (6)
Flow 3: 처리중... (7)
Flow 3: 처리중... (8)
Flow 3: 처리중... (9)
Flow 3: 처리중... (10)
Flow 3: 소비됨 (시각: 13초)
Flow 4: 생산됨
Flow 4: 처리중... (1)
Flow 4: 처리중... (2)
Flow 4: 처리중... (3)
Flow 4: 처리중... (4)
Flow 4: 처리중... (5)
Flow 4: 처리중... (6)
Flow 4: 처리중... (7)
Flow 4: 처리중... (8)
Flow 4: 처리중... (9)
Flow 4: 처리중... (10)
Flow 4: 소비됨 (시각: 17초)
Flow 5: 생산됨
Flow 5: 처리중... (1)
Flow 5: 처리중... (2)
Flow 5: 처리중... (3)
Flow 5: 처리중... (4)
Flow 5: 처리중... (5)
Flow 5: 처리중... (6)
Flow 5: 처리중... (7)
Flow 5: 처리중... (8)
Flow 5: 처리중... (9)
Flow 5: 처리중... (10)
Flow 5: 소비됨 (시각: 21초)
*/

#2-1 코드를 수정한 버전이다. numberFlow 자체는 아무런 변화가 없다. 즉 데이터를 발생시키는 곳에서는 아무런 변화가 없다. 그러나 데이터를 처리하는 곳의 딜레이를 10배 증가시키자, 데이터 출력에 지연이 발생했다.
 
이와 같이, 데이터의 생산 속도를 데이터의 소비 속도가 따라가지 못하는 현상을 백 프레셔라고 일컫는다. 이른바 공급 과잉이다.

#3 백 프레셔 대처법

#3-1 buffer()를 이용해 collect() 병렬화

...
import kotlinx.coroutines.flow.buffer

...

fun main() {
    ...

    runBlocking {
        numberFlow.buffer().collect { value -> // Flow 대신 Flow.buffer()를 collect()
            ...
        }
    }
}

/* ↑ ↑ ↑ 출력 결과
Flow 1: 생산됨
Flow 1: 처리중... (1)
Flow 1: 처리중... (2)
Flow 1: 처리중... (3)
Flow 2: 생산됨
Flow 1: 처리중... (4)
Flow 1: 처리중... (5)
Flow 1: 처리중... (6)
Flow 3: 생산됨
Flow 1: 처리중... (7)
Flow 1: 처리중... (8)
Flow 1: 처리중... (9)
Flow 4: 생산됨
Flow 1: 처리중... (10)
Flow 1: 소비됨 (시각: 5초)
Flow 2: 처리중... (1)
Flow 2: 처리중... (2)
Flow 5: 생산됨
Flow 2: 처리중... (3)
Flow 2: 처리중... (4)
Flow 2: 처리중... (5)
Flow 2: 처리중... (6)
Flow 2: 처리중... (7)
Flow 2: 처리중... (8)
Flow 2: 처리중... (9)
Flow 2: 처리중... (10)
Flow 2: 소비됨 (시각: 8초)
Flow 3: 처리중... (1)
Flow 3: 처리중... (2)
Flow 3: 처리중... (3)
Flow 3: 처리중... (4)
Flow 3: 처리중... (5)
Flow 3: 처리중... (6)
Flow 3: 처리중... (7)
Flow 3: 처리중... (8)
Flow 3: 처리중... (9)
Flow 3: 처리중... (10)
Flow 3: 소비됨 (시각: 11초)
Flow 4: 처리중... (1)
Flow 4: 처리중... (2)
Flow 4: 처리중... (3)
Flow 4: 처리중... (4)
Flow 4: 처리중... (5)
Flow 4: 처리중... (6)
Flow 4: 처리중... (7)
Flow 4: 처리중... (8)
Flow 4: 처리중... (9)
Flow 4: 처리중... (10)
Flow 4: 소비됨 (시각: 14초)
Flow 5: 처리중... (1)
Flow 5: 처리중... (2)
Flow 5: 처리중... (3)
Flow 5: 처리중... (4)
Flow 5: 처리중... (5)
Flow 5: 처리중... (6)
Flow 5: 처리중... (7)
Flow 5: 처리중... (8)
Flow 5: 처리중... (9)
Flow 5: 처리중... (10)
Flow 5: 소비됨 (시각: 17초)
*/

먼저, Flow 대신 Flow.buffer()를 collect()한다. 출력 결과를 보면, emit된 데이터를 소비 완료하지 않은 상태임에도 최신 데이터를 생산하고 있음을 알 수 있다. 생산분은 생산되는대로 일단 buffer에 저장하고, 소비는 나중에 알아서 하는 식이다.

하지만, Flow 5가 소비 완료된 시각은 17초로 #2-2의 코드와 크게 다르지 않다. buffer가 있든 없든, collect의 동작은 여전히 단일 스레드에서 이뤄지므로 데이터 처리 시간에 유의미한 차이가 생기지 읺았다. 이 때 buffer에는 생산되는 족족 데이터가 쌓인다는 점에 착안하여, 소비하는 측에서도 buffer에 데이터가 들어오는대로 Coroutine을 수행한다면 소비 시간을 줄일 수 있을 것이다. 아래 코드를 보자.
 

...

fun main() {
    ...
    
    runBlocking {
        numberFlow.buffer().collect { value ->
            launch { // 자식 코루틴 생성
                ...
            }
        }
    }
}

/* ↑ ↑ ↑ 출력 결과
Flow 1: 생산됨
Flow 1: 처리중... (1)
Flow 1: 처리중... (2)
Flow 1: 처리중... (3)
Flow 2: 생산됨
Flow 1: 처리중... (4)
Flow 2: 처리중... (1)
Flow 1: 처리중... (5)
Flow 2: 처리중... (2)
Flow 1: 처리중... (6)
Flow 2: 처리중... (3)
Flow 3: 생산됨
Flow 1: 처리중... (7)
Flow 2: 처리중... (4)
Flow 3: 처리중... (1)
Flow 1: 처리중... (8)
Flow 2: 처리중... (5)
Flow 3: 처리중... (2)
Flow 1: 처리중... (9)
Flow 2: 처리중... (6)
Flow 3: 처리중... (3)
Flow 4: 생산됨
Flow 1: 처리중... (10)
Flow 1: 소비됨 (시각: 5초)
Flow 2: 처리중... (7)
Flow 3: 처리중... (4)
Flow 4: 처리중... (1)
Flow 2: 처리중... (8)
Flow 3: 처리중... (5)
Flow 4: 처리중... (2)
Flow 2: 처리중... (9)
Flow 3: 처리중... (6)
Flow 4: 처리중... (3)
Flow 5: 생산됨
Flow 2: 처리중... (10)
Flow 2: 소비됨 (시각: 6초)
Flow 3: 처리중... (7)
Flow 4: 처리중... (4)
Flow 5: 처리중... (1)
Flow 3: 처리중... (8)
Flow 4: 처리중... (5)
Flow 5: 처리중... (2)
Flow 3: 처리중... (9)
Flow 4: 처리중... (6)
Flow 5: 처리중... (3)
Flow 3: 처리중... (10)
Flow 3: 소비됨 (시각: 7초)
Flow 4: 처리중... (7)
Flow 5: 처리중... (4)
Flow 4: 처리중... (8)
Flow 5: 처리중... (5)
Flow 4: 처리중... (9)
Flow 5: 처리중... (6)
Flow 4: 처리중... (10)
Flow 4: 소비됨 (시각: 8초)
Flow 5: 처리중... (7)
Flow 5: 처리중... (8)
Flow 5: 처리중... (9)
Flow 5: 처리중... (10)
Flow 5: 소비됨 (시각: 9초)
*/

collect() 내부에 자식 코루틴을 만들어 데이터 소비를 병렬화했다.
 

#3-2 collectLatest()를 이용해 데이터 선별

...

fun main() {
    ...
    
    runBlocking {
        numberFlow.collectLatest { value -> // collect() 대신 collectLatest() 사용
            ...
        }
    }
}

/* ↑ ↑ ↑ 출력 결과
Flow 1: 생산됨
Flow 1: 처리중... (1)
Flow 1: 처리중... (2)
Flow 1: 처리중... (3)
Flow 2: 생산됨
Flow 2: 처리중... (1)
Flow 2: 처리중... (2)
Flow 2: 처리중... (3)
Flow 3: 생산됨
Flow 3: 처리중... (1)
Flow 3: 처리중... (2)
Flow 3: 처리중... (3)
Flow 4: 생산됨
Flow 4: 처리중... (1)
Flow 4: 처리중... (2)
Flow 4: 처리중... (3)
Flow 5: 생산됨
Flow 5: 처리중... (1)
Flow 5: 처리중... (2)
Flow 5: 처리중... (3)
Flow 5: 처리중... (4)
Flow 5: 처리중... (5)
Flow 5: 처리중... (6)
Flow 5: 처리중... (7)
Flow 5: 처리중... (8)
Flow 5: 처리중... (9)
Flow 5: 처리중... (10)
Flow 5: 소비됨 (시각: 9초)
*/

collectLatest()는 최신 데이터을 선별한다. 출력 결과를 보면, 이전 데이터를 소비하는 도중 최신 데이터가 생산되면 이전 데이터를 그냥 버린다는 걸 알 수 있다. 예를 들어 현재 날씨 데이터의 경우, 최신 데이터가 생산된 이상 이전 데이터는 가치가 0에 수렴한다. 이런 경우 사용할만하다.
 

#4 요약

백 프레셔는 공급 과잉이다. 공급을 병렬화(buffer + coroutine)하거나, 선별(collectLatest)하여 대처한다.