flowOn 연산자

Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
		Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@2b4a1161, BlockingEventLoop@338da300],
		but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@1178e2a5, Dispatchers.Default].
		Please refer to 'flow' documentation or use 'flowOn' instead
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        Thread.sleep(100)
        log("Emitting $i")
        emit(i)
    }
}.flowOn(Dispatchers.Default)

fun main() = runBlocking {
    simple().collect { log("Collected $it")}
}

실행결과
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3

Buffering

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.measureTimeMillis

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100)
        println("emit 호출")
        emit(i)
    }
}

fun main() = runBlocking {
    val time = measureTimeMillis {
        simple().collect {
            delay(300)
            println(it)
        }
    }
    println("Collected in $time ms")
}

실행결과
emit 호출
1
emit 호출
2
emit 호출
3
Collected in 1242 ms
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.measureTimeMillis

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100)
        println("emit 호출")
        emit(i)
    }
}

fun main() = runBlocking {
    val time = measureTimeMillis {
        simple()
            .buffer()
            .collect {
                delay(300)
                println(it)
            }
    }
    println("Collected in $time ms")
}

실행결과
emit 호출
emit 호출
emit 호출
1
2
3
Collected in 1046 ms

Conflation

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.measureTimeMillis

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100)
        println("emit 호출")
        emit(i)
    }
}

fun main() = runBlocking {
    val time = measureTimeMillis {
        simple()
            .conflate() 
            .collect { value ->
                delay(300) 
                println(value)
            }
    }
    println("Collected in $time ms")
}

실행결과
emit 호출
emit 호출
emit 호출
1
3
Collected in 741 ms