#1 Coroutines Flow
#1-1 ๊ฐ์
Flow
Flow An asynchronous data stream that sequentially emits values and completes normally or with an exception. Intermediate operators on the flow such as map, filter, take, zip, etc are functions that are applied to the upstream flow or flows and return a do
kotlinlang.org
Flow๋ ๋ด๋ถ์ ์ผ๋ก Coroutine์ ์ฌ์ฉํด ๋น๋๊ธฐ์ ์ผ๋ก ๋ฐ์ดํฐ ์คํธ๋ฆผ์ ์ฒ๋ฆฌํ๋ API๋ค. ์ด๋ฅผ ๋ฐ์ํ ํ๋ก๊ทธ๋๋ฐ์ด๋ผ๊ณ ๋ ํ๋ค. ๋ฐ์ํ ํ๋ก๊ทธ๋๋ฐ์ ํ ๋ง๋๋ก ์ ์ํ๋ฉด, "๋ฐ์ดํฐ์ ๋ณ๊ฒฝ์ ๊ฐ์งํ๊ณ ์ ์ธ์ ์ผ๋ก ํ๋ก๊ทธ๋๋ฐํ์ฌ View๋ฅผ ์
๋ฐ์ดํธ"ํ๋ ํ๋ก๊ทธ๋๋ฐ ๋ฐฉ์์ด๋ค.
#1-2 CoroutineScope.produce์์ ๋น๊ต
[Kotlin] Coroutines - Coroutine builder
#1 Coroutine builder kotlinx-coroutines-coreCore primitives to work with coroutines. Coroutine builder functions: Coroutine dispatchers implementing CoroutineDispatcher: More context elements: Synchronization primitives for coroutines: Top-level suspendin
kenel.tistory.com
๋ฐ์ดํฐ ์คํธ๋ฆผํ๋ฉด, ์ ๊ฒ์๊ธ์ #5์ ๋์์๋ Coroutine builder์ธ produce()๊ฐ ์๊ฐ๋ ๊ฒ์ด๋ค. ๊ทธ๋ฆฌ๊ณ ์ญํ ๋ ์๋ก ๊ฐ๋ค. ๋ฌผ๋ก ์์ ํ ๊ฐ์ง๋ ์์๋ฐ ๊ทธ ์ฐจ์ด๋ฅผ ๋น์ ํ์๋ฉด, produce()๊ฐ Sqlite๋ผ๋ฉด Flow๋ Room๋ผ๊ณ ํ ์ ์๋ค. ์ฆ, produce()๋ Flow์ ๋นํด ๊ฐ๋ฐ์์ ์ง์ (๋ฐ์ดํฐ ์คํธ๋ฆผ ์ฑ๋ ๋ฑ์) ๊ด๋ฆฌ๊ฐ ์๊ตฌ๋๋ฉฐ ๋ ์ ์์ค์ด๋ค. ๋ฐ๋ฉด Flow๋ ๋ ์ ์ธ์ ์ธ API๋ก ์ฝ๋๊ฐ ๊ตฌ์ฑ๋์ด์๊ณ , ์์์ ์ธ ๋ฐฉ๋ฒ์ผ๋ก ์ฝ๋์ ์์ ์ฑ์ ์ฌ๋ ค์ฃผ๋ ๋ฑ ๋ ๊ณ ์์ค์ ์ฝ๋๋ค.
์ฌ๋ด์ผ๋ก Room์ ๋๋๊ณ Sqlite์ ์ถ์ํ ๊ณ์ธต์ธ ๋ฐ๋ฉด, Flow๋ ๋ด๋ถ์ ์ผ๋ก ์ฝ๋ฃจํด์ ์ฌ์ฉํ ๋ฟ produce()์ ์ง์ ์ ์ธ ์ถ์ํ ๊ณ์ธต์ด๋ผ๊ณ ๊น์ง ๋งํ๊ธฐ๋ ์ด๋ ต๋ค๊ณ ํ๋ค.
#2 ๊ตฌ์กฐ - ๊ธฐ์ด
#2-1 produce()
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.produce
// ReceiveChannel ์์ฑ ํจ์
@OptIn(ExperimentalCoroutinesApi::class)
fun produceNumbers(): ReceiveChannel<Int> {
return CoroutineScope(Dispatchers.Default).produce {
for (x: Int in 1..5) {
delay(1000)
send(x) // ๊ฐ์ ์ฑ๋๋ก ๋ณด๋
}
}
}
fun main() {
val channel: ReceiveChannel<Int> = produceNumbers() // ReceiveChannel ์์ฑ
runBlocking {
for (value in channel) { // value = channel.receive()์ ๊ฐ์ ๋์
println("produce() $value")
}
}
println("Done receiving")
channel.cancel() // ReceiveChannel์ ๋ซ์
}
/* ↑ ↑ ↑ ์ถ๋ ฅ ๊ฒฐ๊ณผ
produce() 1
produce() 2
produce() 3
produce() 4
produce() 5
Done receiving
*/
๋จผ์ , ์ด๋ฏธ ๋ฐฐ์์ ์น์ํ CoroutineScope.produce๋ฅผ ์ฌ์ฉํ ์ฝ๋๋ค. ์ฝ๋ ์ for๋ฌธ์ ๋ํด ์ค๋ช
ํ์๋ฉด, ReceiveChannel ์ธํฐํ์ด์ค๋ ChannelIterator๋ฅผ ์ ๊ณตํ๋ค. ์ด ChannelIterator๋ ์ฑ๋๋ก๋ถํฐ ๋ ๋ฐ์ ๊ฐ์ด ์๋์ง, ์ฑ๋์ด ๋ซํ์ง ์์๋์ง๋ฅผ ๊ฐ์งํ๋ฉฐ ์ํ๋ฅผ ์ํํ๋ค. ์ฑ๋๋ก๋ถํฐ ๋ ๋ฐ์ ๊ฐ์ด ์์ผ๋ฉฐ ์ฑ๋์ด ๋ซํ์ง ์์๋ค๋ฉด ๋ฌดํ์ ์ผ๋ก ์ํํ๋ค. ์์ ์ฝ๋์์๋ ๋ ์ด์ ๋ณด๋ผ ๊ฐ์ด ์์ผ๋ฏ๋ก for๋ฌธ ์ํ๊ฐ ์ข
๋ฃ๋๋ค.
์ด ์ฝ๋์ ๊ฐ์ ์ญํ ์ ํ๋ ์ฝ๋๋ฅผ Flow๋ฅผ ์ด์ฉํด ์ง๋ณด๋ฉด,
#2-2 Flow
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
// flow ์์ฑ ํจ์
fun numberFlow(): Flow<Int> = flow {
for (i in 1..5) {
delay(1000)
emit(i) // ์ซ์ i๋ฅผ '๋ฐฉ์ถ'
}
}
fun main() {
val flow: Flow<Int> = numberFlow()
runBlocking {
flow.collect { value ->
println("coroutineFlow $value")
}
}
println("Done collecting")
}
/* ↑ ↑ ↑ ์ถ๋ ฅ ๊ฒฐ๊ณผ
coroutineFlow 1
coroutineFlow 2
coroutineFlow 3
coroutineFlow 4
coroutineFlow 5
Done collecting
*/
#2-1์ ๊ฐ์ ์ญํ ์ ํ๋ Flow ์ฝ๋๋ค. #2-1์ receive()๋ ๋ฐ๋ก ์์ ์๋ collect() ๋ชจ๋ suspend ํค์๋๊ฐ ๋ถ๋ ํจ์์ด๊ธฐ ๋๋ฌธ์, runBlocking { ... } ๋ธ๋ก ๋ด์์ ๋ชจ๋ ์์
์ด ์๋ฃ๋ ํ์์ผ runBlocking { ... } ๋ธ๋ก ๋ฐ์ ์ฝ๋๊ฐ ์คํ๋๋ค. ์ถ๋ ฅ ๊ฒฐ๊ณผ์ ๊ฐ์ด ๋ง์ด๋ค. ์๋์์ Flow์ ์ฝ๋๋ฅผ ๊ตฌ์๊ตฌ์ ์ดํด๋ณธ๋ค.
return๊ณผ emit()
์๋ฅผ ๋ค์ด, ๋ฐํ ํ์
์ด Int์ธ '์ผ๋ฐ์ ์ธ' ํจ์๋ฅผ ํธ์ถํ๋ฉด ํ ๋ฒ์ ํจ์ ํธ์ถ๋ก ๋จ ํ๋์ ๋ฐํ ๊ฐ(Int)์ ์ป๊ฒ ๋๋ค. ํ์ง๋ง, Int ํ์
์ Flow๋ฅผ ์ฌ์ฉํ๋ฉด ์ฌ๋ฌ ์๊ฐ ๋์ ์ฌ๋ฌ ๋ฐํ ๊ฐ(Int)์ ์ป๊ฒ ๋๋ค. ์ผ๋ฐ์ ์ธ ํจ์๊ฐ ๋ฐํ์ ์ํด return์ ์ฐ๋ฏ, Flow์์๋ ๋ฐํ์ ์ํด emit()์ ์ฌ์ฉํ๋ค. ์ ์ฝ๋์์๋ emit()์ ์ด์ฉํ '๋ฐํ'์ ์ด 5๋ฒ ์ํํ๋ค.
interface Flow<out T>
produce()๋ ReceiveChannel<T>๋ผ๋ ๋ฐํํ์ ์ง๋๋ค. ์ฆ, T๋ฅผ ReceiveChannel๋ก ๊ฐ์ธ ์บก์ํํ์ฌ ๋ฐํํ๋ค. ๋ฐ๋ฉด, Flow๋ ์บก์ํ์์ด ๊ฐ์ ๊ทธ๋๋ก ๋ฐํํ๋ค. ์ด๋ก์ธํด Flow๋ ์ฝ๋๊ฐ ๋ ๊ฐ๊ฒฐํ๊ณ ์ ์ธ์ ์ด๋ค. ํ์ง๋ง, ๋์์ ๋ฐ์ดํฐ์ ์์ฐ ๋ฐ ์๋น๋ฅผ ๋ ๋ช
์์ ์ผ๋ก ์ธ๋ฐํ๊ฒ ์กฐ์ ํ๊ธฐ๋ ์ด๋ ต๋ค๋ผ๋ (์บก์ํ๋ฅผ ์ํด์ ์๊ธฐ๋) ๋จ์ ๋ ์๋ค. ์ด ๋จ์ ์ ๊ทน๋ณตํ ์ ์๋ channelFlow()๋ผ๋ Flow๋ ์๋ค. ์ด ํจ์๋ Flow์ channel์ ์ฅ์ ์ ๊ฒฐํฉํด ์ฑ๋ ๊ธฐ๋ฐ์ ๋ฐ์ดํฐ ํ๋ฆ์ ์ ์ธ์ ์ผ๋ก ์ฒ๋ฆฌํ ์ ์๊ฒ ๋ง๋ค์ด์ค๋ค๊ณ ํ๋ค.
suspend fun collect()
Flow๋ก๋ถํฐ, ์ฆ ๋ฐ์ดํฐ ์คํธ๋ฆผ์ผ๋ก๋ถํฐ ๋ฐํ๋ ๊ฐ์ '์์ง(collect)'ํ๋ค. ReceiveChannel.receive()์ ๋์๋๋ ํจ์๋ค. collect()๋ ์์งํ๋ ๋์ Flow๊ฐ '์๋ฃ'๋ ๋๊น์ง ํ๋ก๊ทธ๋๋จธ๊ฐ ์ง์ ํ ์์
์ ๋ฐ๋ณตํด์ ์คํํ๋ค.
๋ช
์์ ์ข
๋ฃ ๋ถ๊ฐ๋ฅ (์ง์ ์ ํจ)
์ด๋ค flow { ... }์์ ๋ชจ๋ emit()์ด ์ํ๋๊ณ flow { ... } ๋ธ๋ก์ด ๋๋๋ฉด Flow๊ฐ '์๋ฃ' ์ฒ๋ฆฌ๋๊ณ , ํด๋น Flow๋ฅผ collect()ํ๋ ์์
๋ํ ์๋์ผ๋ก ์ข
๋ฃ๋๋ค. ์ด ์ธ์ ์ง์ ์ ์ผ๋ก Flow๋ฅผ ์ค์ง์ํค๋ ๋ฐฉ๋ฒ์ ์๋ค. ReceiveChannel์ด close()๋ผ๋ ๋ฉค๋ฒ ํจ์๋ฅผ ์ ๊ณตํด ๋ช
์์ ์ข
๋ฃ๊ฐ ๊ฐ๋ฅํ ๊ฒ๊ณผ ๋์กฐ๋๋ค (์ฌ๋ด์ผ๋ก, ์์์ ๋์จ channelFlow { ... } ๋ด์์๋ close() ์ฌ์ฉ์ด ๊ฐ๋ฅํ๋ค). Flow๋ฅผ ์ด๋ป๊ฒ๋ ์ค์ง์ํค๋ ค๋ฉด ํด๋น Flow๋ฅผ ๋ณ๋์ ์ฝ๋ฃจํด์ ๋ฃ๊ณ ๊ทธ ์ฝ๋ฃจํด์ cancel()์ํค๋ ๊ฐ์ ์ ์ธ ๋ฐฉ์์ ์ฌ์ฉํด์ผ ํ๋ค. ๋๋, ๋ช
์์ ์ผ๋ก ์ข
๋ฃ ๊ฐ๋ฅํ collect()๋ฅผ ๋์ ์ค์งํจ์ผ๋ก์ Flow๋ฅผ ์ค์งํ๋ ๊ฒ๊ณผ ๊ฐ์ ํจ๊ณผ๋ฅผ ๋ณผ ์๋ ์๋ค.
#3 Flow๊ฐ '์๋ฃ'๋์๋ค๋ ๊ฒ์ ์๋ฏธ
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
suspend fun main() {
val myFlow = flow {
emit("1")
delay(1000)
emit("2")
delay(1000)
emit("3")
// ๋ ์ด์ ์ํ๋ emit()์ด ์์ผ๋ฏ๋ก ์ฌ๊ธฐ์์ flow๋ ์ข
๋ฃ๋๋ค.
}
runBlocking {
myFlow.collect { value ->
println("Collected value: $value")
}
println("\'์์ง\' ์๋ฃ")
}
// ํ๋ฒ ๋ collect()
runBlocking {
myFlow.collect { value ->
println("Collected value: $value")
}
println("\'์์ง\' ์๋ฃ")
}
}
/* ↑ ↑ ↑ ์ถ๋ ฅ ๊ฒฐ๊ณผ
Collected value: 1
Collected value: 2
Collected value: 3
'์์ง' ์๋ฃ
Collected value: 1
Collected value: 2
Collected value: 3
'์์ง' ์๋ฃ
*/
#2์์ ์ค๋ช
ํ ๋ฐ์ ๋ฐ๋ฅด๋ฉด, Flow๋ ๋ช
์์ ์ข
๋ฃ๊ฐ ๋ถ๊ฐ๋ฅํ๋ค. ๋ฌผ๋ก , ์ ์ฝ๋๋ ๋ช
์์ ์ผ๋ก ์ข
๋ฃํ ๊ฒ๋ ์์ด ๋ ์ด์ ์ถ๋ ฅํ emit()์ด ์์ผ๋ myFlow๊ฐ ์์์ ์ผ๋ก ์๋ฃ๋๊ณ , collect() ๋ํ ์์์ ์ข
๋ฃ๋๋ค. ํ์ง๋ง, "myFlow๋ผ๋ ํ๋กํผํฐ์ ๋ด๊ธด flow { ... } ์ธ์คํด์ค ์์ฒด๊ฐ ์๋ฉธํ ๊ฒ์ ์๋์ง ์์๊ฐ?" ๋ผ๋ ๊ถ๊ธ์ฆ์ด ๋ค์ด์ myFlow๋ฅผ ํ๋ฒ ๋ collect()ํด๋ดค๋ค.
๊ฒฐ๊ณผ๋ ์ฒซ๋ฒ์งธ collect()์ ๋์ผํ๋ค. myFlow๋ ๋ถ๋ช
์๋ฃ๋์์ผ๋ฏ๋ก ์๋ฌด๊ฒ๋ ์ถ๋ ฅ๋์ง ์์์ผ ์ ์์ด ์๋๊ฐ? myFlow์ ์๋ฃ๋ ๋์ฒด ์ด๋ค ์๋ฏธ์ผ๊น? ์ด์ ๋ํ ๋ต์ ๋ฐ๋ก ์ฝํ๋ฆฐ์ ๊ธฐ๋ณธ Flow๊ฐ Cold Flow ๋ฐฉ์์ ๋ฐ์ดํฐ ์คํธ๋ฆผ์ด๋ผ๋ ๋ฐ์ ์๋ค. ์๋ ๊ฒ์๊ธ์์ ์์ธํ ์ค๋ช
ํด๋ณธ๋ค.
Cold Flow์ Hot Flow
#1 Cold Flow์ Hot Flow#1-1 ๊ฐ์๋ฐ์ดํฐ ์คํธ๋ฆผ์ ํฌ๊ฒ Cold Flow์ Hot Flow๋ก ๋๋ ์ ์๋ค. ์ด ๋ถ๋ฅ ๊ธฐ์ค์ ๋ํด ์์๋ณธ๋ค. ๋, Kotlin์ Coroutines Flow๋ฅผ ํ์ฉํ ์์ ์ฝ๋๋ก Cold Flow ๋ฐ Hot Flow๋ฅผ ๊ตฌํํด๋ณธ๋ค. #
kenel.tistory.com
#4 ์์ฝ
Flow๋ ๋ฐ์ดํฐ ์คํธ๋ฆผ ์ฒ๋ฆฌ๋ฅผ ์ํด ๊ณ ์๋ ๊ณ ์์ค API๋ค.
'๊นจ์ ๊ฐ๋ ๐ > Kotlin' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[Kotlin] Coroutines Flow - Intermediate operator (0) | 2024.08.16 |
---|---|
[Kotlin] Coroutines Flow - Back pressure์ ๊ทธ ์ฒ๋ฆฌ (0) | 2024.08.15 |
[Kotlin] Coroutines - ํ Scope ๋ด์์์ ๊ณ์ธต ๊ด๊ณ (0) | 2024.07.31 |
[Kotlin] ์์ ํ๋กํผํฐ (Delegated properties) (0) | 2024.07.22 |
[Kotlin] Coroutines - ViewModelScope (0) | 2024.02.20 |