#1 개요
Coroutines Flow를 사용할 때 생길 수 있는 현상인 백 프레셔(Back pressure)에 대해 살펴본다. 또, 백 프레셔를 처리하는 방법도 살펴본다. 이 때, 백 프레셔는 에러가 아닌 자연스러운 현상일 뿐이다. 따라서, 백 프레셔를 해결한다는 표현은 정확하지 않다. 백 프레셔에 대처한다는 표현이 옳다.
#2 백 프레셔
#2-1 백 프레셔가 없는 코드
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.flow
// 1초마다 1씩 증가하는 count
fun startCountUp(count: MutableStateFlow<Int>) {
CoroutineScope(Dispatchers.Default).launch {
while (true) {
delay(1000)
count.value++
}
}
}
fun main() {
// count 선언 및 count를 1초마다 1씩 증가시키는 코루틴 시작
val count = MutableStateFlow(1)
startCountUp(count)
// 1, 2, 3, ...을 방출하는 Flow
val numberFlow: Flow<Int> = flow {
for (i in 1..5) {
delay(1000)
println("Flow ${i}: 생산됨")
emit(i) // 숫자 i를 '방출'
}
}
// Flow를 collect
runBlocking {
numberFlow.collect { value ->
for (i: Int in 1..11) {
when (i) {
11 -> println("Flow ${value}: 소비됨 (시각: ${count.value}초)")
else -> {
delay(30) // 받은 데이터를 처리하는 시간 표현
println("Flow ${value}: 처리중... (${i})")
}
}
}
}
}
}
/* ↑ ↑ ↑ 출력 결과
Flow 1: 생산됨
Flow 1: 처리중... (1)
Flow 1: 처리중... (2)
Flow 1: 처리중... (3)
Flow 1: 처리중... (4)
Flow 1: 처리중... (5)
Flow 1: 처리중... (6)
Flow 1: 처리중... (7)
Flow 1: 처리중... (8)
Flow 1: 처리중... (9)
Flow 1: 처리중... (10)
Flow 1: 소비됨 (시각: 2초)
Flow 2: 생산됨
Flow 2: 처리중... (1)
Flow 2: 처리중... (2)
Flow 2: 처리중... (3)
Flow 2: 처리중... (4)
Flow 2: 처리중... (5)
Flow 2: 처리중... (6)
Flow 2: 처리중... (7)
Flow 2: 처리중... (8)
Flow 2: 처리중... (9)
Flow 2: 처리중... (10)
Flow 2: 소비됨 (시각: 3초)
Flow 3: 생산됨
Flow 3: 처리중... (1)
Flow 3: 처리중... (2)
Flow 3: 처리중... (3)
Flow 3: 처리중... (4)
Flow 3: 처리중... (5)
Flow 3: 처리중... (6)
Flow 3: 처리중... (7)
Flow 3: 처리중... (8)
Flow 3: 처리중... (9)
Flow 3: 처리중... (10)
Flow 3: 소비됨 (시각: 4초)
Flow 4: 생산됨
Flow 4: 처리중... (1)
Flow 4: 처리중... (2)
Flow 4: 처리중... (3)
Flow 4: 처리중... (4)
Flow 4: 처리중... (5)
Flow 4: 처리중... (6)
Flow 4: 처리중... (7)
Flow 4: 처리중... (8)
Flow 4: 처리중... (9)
Flow 4: 처리중... (10)
Flow 4: 소비됨 (시각: 6초)
Flow 5: 생산됨
Flow 5: 처리중... (1)
Flow 5: 처리중... (2)
Flow 5: 처리중... (3)
Flow 5: 처리중... (4)
Flow 5: 처리중... (5)
Flow 5: 처리중... (6)
Flow 5: 처리중... (7)
Flow 5: 처리중... (8)
Flow 5: 처리중... (9)
Flow 5: 처리중... (10)
Flow 5: 소비됨 (시각: 7초)
*/
Flow에서 emit()되는 즉시 바로바로 출력하는 평범한 Cold Flow 코드다. 이 코드를 살짝 수정해보겠다.
#2-2 백 프레셔가 발생하는 코드
...
fun main() {
...
runBlocking {
numberFlow.collect { value ->
for (i: Int in 1..11) {
when (i) {
11 -> ...
else -> {
delay(300) // 30에서 300으로 변경
...
}
}
}
}
}
}
/* ↑ ↑ ↑ 출력 결과
Flow 1: 생산됨
Flow 1: 처리중... (1)
Flow 1: 처리중... (2)
Flow 1: 처리중... (3)
Flow 1: 처리중... (4)
Flow 1: 처리중... (5)
Flow 1: 처리중... (6)
Flow 1: 처리중... (7)
Flow 1: 처리중... (8)
Flow 1: 처리중... (9)
Flow 1: 처리중... (10)
Flow 1: 소비됨 (시각: 5초)
Flow 2: 생산됨
Flow 2: 처리중... (1)
Flow 2: 처리중... (2)
Flow 2: 처리중... (3)
Flow 2: 처리중... (4)
Flow 2: 처리중... (5)
Flow 2: 처리중... (6)
Flow 2: 처리중... (7)
Flow 2: 처리중... (8)
Flow 2: 처리중... (9)
Flow 2: 처리중... (10)
Flow 2: 소비됨 (시각: 9초)
Flow 3: 생산됨
Flow 3: 처리중... (1)
Flow 3: 처리중... (2)
Flow 3: 처리중... (3)
Flow 3: 처리중... (4)
Flow 3: 처리중... (5)
Flow 3: 처리중... (6)
Flow 3: 처리중... (7)
Flow 3: 처리중... (8)
Flow 3: 처리중... (9)
Flow 3: 처리중... (10)
Flow 3: 소비됨 (시각: 13초)
Flow 4: 생산됨
Flow 4: 처리중... (1)
Flow 4: 처리중... (2)
Flow 4: 처리중... (3)
Flow 4: 처리중... (4)
Flow 4: 처리중... (5)
Flow 4: 처리중... (6)
Flow 4: 처리중... (7)
Flow 4: 처리중... (8)
Flow 4: 처리중... (9)
Flow 4: 처리중... (10)
Flow 4: 소비됨 (시각: 17초)
Flow 5: 생산됨
Flow 5: 처리중... (1)
Flow 5: 처리중... (2)
Flow 5: 처리중... (3)
Flow 5: 처리중... (4)
Flow 5: 처리중... (5)
Flow 5: 처리중... (6)
Flow 5: 처리중... (7)
Flow 5: 처리중... (8)
Flow 5: 처리중... (9)
Flow 5: 처리중... (10)
Flow 5: 소비됨 (시각: 21초)
*/
#2-1 코드를 수정한 버전이다. numberFlow 자체는 아무런 변화가 없다. 즉 데이터를 발생시키는 곳에서는 아무런 변화가 없다. 그러나 데이터를 처리하는 곳의 딜레이를 10배 증가시키자, 데이터 출력에 지연이 발생했다.
이와 같이, 데이터의 생산 속도를 데이터의 소비 속도가 따라가지 못하는 현상을 백 프레셔라고 일컫는다. 이른바 공급 과잉이다.
#3 백 프레셔 대처법
#3-1 buffer()를 이용해 collect() 병렬화
...
import kotlinx.coroutines.flow.buffer
...
fun main() {
...
runBlocking {
numberFlow.buffer().collect { value -> // Flow 대신 Flow.buffer()를 collect()
...
}
}
}
/* ↑ ↑ ↑ 출력 결과
Flow 1: 생산됨
Flow 1: 처리중... (1)
Flow 1: 처리중... (2)
Flow 1: 처리중... (3)
Flow 2: 생산됨
Flow 1: 처리중... (4)
Flow 1: 처리중... (5)
Flow 1: 처리중... (6)
Flow 3: 생산됨
Flow 1: 처리중... (7)
Flow 1: 처리중... (8)
Flow 1: 처리중... (9)
Flow 4: 생산됨
Flow 1: 처리중... (10)
Flow 1: 소비됨 (시각: 5초)
Flow 2: 처리중... (1)
Flow 2: 처리중... (2)
Flow 5: 생산됨
Flow 2: 처리중... (3)
Flow 2: 처리중... (4)
Flow 2: 처리중... (5)
Flow 2: 처리중... (6)
Flow 2: 처리중... (7)
Flow 2: 처리중... (8)
Flow 2: 처리중... (9)
Flow 2: 처리중... (10)
Flow 2: 소비됨 (시각: 8초)
Flow 3: 처리중... (1)
Flow 3: 처리중... (2)
Flow 3: 처리중... (3)
Flow 3: 처리중... (4)
Flow 3: 처리중... (5)
Flow 3: 처리중... (6)
Flow 3: 처리중... (7)
Flow 3: 처리중... (8)
Flow 3: 처리중... (9)
Flow 3: 처리중... (10)
Flow 3: 소비됨 (시각: 11초)
Flow 4: 처리중... (1)
Flow 4: 처리중... (2)
Flow 4: 처리중... (3)
Flow 4: 처리중... (4)
Flow 4: 처리중... (5)
Flow 4: 처리중... (6)
Flow 4: 처리중... (7)
Flow 4: 처리중... (8)
Flow 4: 처리중... (9)
Flow 4: 처리중... (10)
Flow 4: 소비됨 (시각: 14초)
Flow 5: 처리중... (1)
Flow 5: 처리중... (2)
Flow 5: 처리중... (3)
Flow 5: 처리중... (4)
Flow 5: 처리중... (5)
Flow 5: 처리중... (6)
Flow 5: 처리중... (7)
Flow 5: 처리중... (8)
Flow 5: 처리중... (9)
Flow 5: 처리중... (10)
Flow 5: 소비됨 (시각: 17초)
*/
먼저, Flow 대신 Flow.buffer()를 collect()한다. 출력 결과를 보면, emit된 데이터를 소비 완료하지 않은 상태임에도 최신 데이터를 생산하고 있음을 알 수 있다. 생산분은 생산되는대로 일단 buffer에 저장하고, 소비는 나중에 알아서 하는 식이다.
하지만, Flow 5가 소비 완료된 시각은 17초로 #2-2의 코드와 크게 다르지 않다. buffer가 있든 없든, collect의 동작은 여전히 단일 스레드에서 이뤄지므로 데이터 처리 시간에 유의미한 차이가 생기지 읺았다. 이 때 buffer에는 생산되는 족족 데이터가 쌓인다는 점에 착안하여, 소비하는 측에서도 buffer에 데이터가 들어오는대로 Coroutine을 수행한다면 소비 시간을 줄일 수 있을 것이다. 아래 코드를 보자.
...
fun main() {
...
runBlocking {
numberFlow.buffer().collect { value ->
launch { // 자식 코루틴 생성
...
}
}
}
}
/* ↑ ↑ ↑ 출력 결과
Flow 1: 생산됨
Flow 1: 처리중... (1)
Flow 1: 처리중... (2)
Flow 1: 처리중... (3)
Flow 2: 생산됨
Flow 1: 처리중... (4)
Flow 2: 처리중... (1)
Flow 1: 처리중... (5)
Flow 2: 처리중... (2)
Flow 1: 처리중... (6)
Flow 2: 처리중... (3)
Flow 3: 생산됨
Flow 1: 처리중... (7)
Flow 2: 처리중... (4)
Flow 3: 처리중... (1)
Flow 1: 처리중... (8)
Flow 2: 처리중... (5)
Flow 3: 처리중... (2)
Flow 1: 처리중... (9)
Flow 2: 처리중... (6)
Flow 3: 처리중... (3)
Flow 4: 생산됨
Flow 1: 처리중... (10)
Flow 1: 소비됨 (시각: 5초)
Flow 2: 처리중... (7)
Flow 3: 처리중... (4)
Flow 4: 처리중... (1)
Flow 2: 처리중... (8)
Flow 3: 처리중... (5)
Flow 4: 처리중... (2)
Flow 2: 처리중... (9)
Flow 3: 처리중... (6)
Flow 4: 처리중... (3)
Flow 5: 생산됨
Flow 2: 처리중... (10)
Flow 2: 소비됨 (시각: 6초)
Flow 3: 처리중... (7)
Flow 4: 처리중... (4)
Flow 5: 처리중... (1)
Flow 3: 처리중... (8)
Flow 4: 처리중... (5)
Flow 5: 처리중... (2)
Flow 3: 처리중... (9)
Flow 4: 처리중... (6)
Flow 5: 처리중... (3)
Flow 3: 처리중... (10)
Flow 3: 소비됨 (시각: 7초)
Flow 4: 처리중... (7)
Flow 5: 처리중... (4)
Flow 4: 처리중... (8)
Flow 5: 처리중... (5)
Flow 4: 처리중... (9)
Flow 5: 처리중... (6)
Flow 4: 처리중... (10)
Flow 4: 소비됨 (시각: 8초)
Flow 5: 처리중... (7)
Flow 5: 처리중... (8)
Flow 5: 처리중... (9)
Flow 5: 처리중... (10)
Flow 5: 소비됨 (시각: 9초)
*/
collect() 내부에 자식 코루틴을 만들어 데이터 소비를 병렬화했다.
#3-2 collectLatest()를 이용해 데이터 선별
...
fun main() {
...
runBlocking {
numberFlow.collectLatest { value -> // collect() 대신 collectLatest() 사용
...
}
}
}
/* ↑ ↑ ↑ 출력 결과
Flow 1: 생산됨
Flow 1: 처리중... (1)
Flow 1: 처리중... (2)
Flow 1: 처리중... (3)
Flow 2: 생산됨
Flow 2: 처리중... (1)
Flow 2: 처리중... (2)
Flow 2: 처리중... (3)
Flow 3: 생산됨
Flow 3: 처리중... (1)
Flow 3: 처리중... (2)
Flow 3: 처리중... (3)
Flow 4: 생산됨
Flow 4: 처리중... (1)
Flow 4: 처리중... (2)
Flow 4: 처리중... (3)
Flow 5: 생산됨
Flow 5: 처리중... (1)
Flow 5: 처리중... (2)
Flow 5: 처리중... (3)
Flow 5: 처리중... (4)
Flow 5: 처리중... (5)
Flow 5: 처리중... (6)
Flow 5: 처리중... (7)
Flow 5: 처리중... (8)
Flow 5: 처리중... (9)
Flow 5: 처리중... (10)
Flow 5: 소비됨 (시각: 9초)
*/
collectLatest()는 최신 데이터만을 선별한다. 출력 결과를 보면, 이전 데이터를 소비하는 도중 최신 데이터가 생산되면 이전 데이터를 그냥 버린다는 걸 알 수 있다. 예를 들어 현재 날씨 데이터의 경우, 최신 데이터가 생산된 이상 이전 데이터는 가치가 0에 수렴한다. 이런 경우 사용할만하다.
#4 요약
백 프레셔는 공급 과잉이다. 공급을 병렬화(buffer + coroutine)하거나, 선별(collectLatest)하여 대처한다.
'깨알 개념 > Kotlin' 카테고리의 다른 글
[Kotlin] Coroutines Flow - StateFlow (0) | 2024.08.17 |
---|---|
[Kotlin] Coroutines Flow - Intermediate operator (0) | 2024.08.16 |
[Kotlin] Coroutines Flow - 기초 (0) | 2024.08.01 |
[Kotlin] Coroutines - 한 Scope 내에서의 계층 관계 (0) | 2024.07.31 |
[Kotlin] 위임 프로퍼티 (Delegated properties) (0) | 2024.07.22 |