美文网首页
Kotlin协程-Flow

Kotlin协程-Flow

作者: 头秃到底 | 来源:发表于2024-02-17 18:22 被阅读0次

    前言

    Flow是kotlin协程中的流。RxJava就是流式编程的库。Flow属于冷流对应RxJava中的Observable Flowable Single MayBe和Completable等。Kotlin协程中的热流实现MutableSharedFlow和MutableStateFlow等,对应RxJava中热流PublisherSubject和BehaviorSubject。

    • 冷流:较少的访问和修改
    • 热流:频繁地读取和更新

    Flow使用

    fun main() {
        runBlocking (Dispatchers.Default){
            // 发送10个元素,从0到9
            val myFlow = flow {
                repeat(10){
                    emit(it)
                }
            }
            launch {
                myFlow.collect{
                    println("Coroutine1:$it")
                }
            }
            launch {
                myFlow.collect{
                    println("Coroutine2:$it")
                }
            }
        }
    }
    
    

    协程1和2通过Flow.collect订阅Flow。

    fun main() {
        runBlocking (Dispatchers.Default){
            // 发送10个元素,从0到9
            val myFlow = flow {
                repeat(10){
                    // 修改原来的CoroutineContext,会异常
                    withContext(Dispatchers.IO){
                        emit(it)
                    }
                }
            }
            launch {
                myFlow.collect{
                    println("Coroutine1:$it")
                }
            }
        }
    }
    
    

    Flow限制,不能修改原来的CoroutineContext。可以使用ChannelFlow就能正常使用。

    fun main() {
        runBlocking (Dispatchers.Default){
            // 发送10个元素,从0到9
            val myFlow = channelFlow {
                repeat(10){
                    // 可以修改原来的CoroutineContext
                    withContext(Dispatchers.IO){
                        channel.send(it)
                    }
                }
            }
            launch {
                myFlow.collect{
                    println("Coroutine1:$it")
                }
            }
        }
    }
    
    
    fun main() {
        runBlocking(Dispatchers.Default) {
            // 发送10个元素,从0到9
            val myFlow = flow {
                repeat(10) {
                    try {
                        emit(it)
                    } catch (e: Throwable) {
                        emit(22)
                    }
                }
            }
            launch {
                myFlow.collect {
                    if (it == 2) {
                        // 这里出现异常后,collect订阅就结束了(只打印2次,第三次就异常了)
                        error("Error")
                    }
                    println("Coroutine1:$it")
                }
            }
        }
    }
    
    

    Flow中collect异常,那么订阅就结束了。

    Flow工作原理

    public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
    
    private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
        override suspend fun collectSafely(collector: FlowCollector<T>) {
            collector.block()
        }
    }
    
    public final override suspend fun collect(collector: FlowCollector<T>) {
        val safeCollector = SafeCollector(collector, coroutineContext)
        try {
            collectSafely(safeCollector)
        } finally {
            safeCollector.releaseIntercepted()
        }
    }
    
    

    创建SafeFlow,继承于AbstractFlow,订阅调用的是collect。检查当前CoroutineContext和调用的collect方法传入的是否一致,不一致就抛出异常。 Flow被SafeCollector代理去检查异常。 转换前的流称上游Upstream,处理后再发送到下游Downstream

    flatMap操作符

    类似于RxJava中的concatMap操作符

    fun main() {
        runBlocking(Dispatchers.Default) {
            val myFlow = flow {
                repeat(3) {
                    emit(it)
                }
            }
            launch {
                // 将原来的流元素构建成一个新的流(按照原来的流元素输出)
                myFlow.flatMapConcat { upstreamValue ->
                    flow {
                        delay(1000L - upstreamValue * 100)
                        repeat(2) {
                            emit(upstreamValue * 10 + it)
                        }
                    }
                }.collect {
                    println("collect $it")
                }
            }
        }
    }
    输出:
    collect 0
    collect 1
    collect 10
    collect 11
    collect 20
    collect 21
    
    

    将原来发送3个元素,通过flatMapConcat()发送两个元素。越是先发送的元素延迟时间越长,然后按顺序输出6个元素。

    flatMapMerge

    类似于RxJava中的flatMap

    fun main() {
        runBlocking(Dispatchers.Default) {
            val myFlow = flow {
                repeat(3) {
                    emit(it)
                }
            }
            launch {
                // 将原来的流元素构建成一个新的流(默认并发16个)谁先执行完就发送谁
                myFlow.flatMapMerge { upstreamValue ->
                    flow {
                        delay(1000L - upstreamValue * 100)
                        repeat(2) {
                            emit(upstreamValue * 10 + it)
                        }
                    }
                }.collect {
                    println("collect $it")
                }
            }
        }
    }
    输出:
    collect 20
    collect 21
    collect 10
    collect 11
    collect 0
    collect 1
    
    

    不会保证原来的顺序,哪个流先处理完就先发送数据。concurrency默认值16,并行执行的数量。当concurrency为1时和flatMapConcat一样。

    fun main() {
        runBlocking(Dispatchers.Default) {
            val myFlow = flow {
                repeat(3) {
                    emit(it)
                }
            }
            launch {
                // 将原来的流元素构建成一个新的流(并发数是2,达到2个的时候等待然后再执行下一个)
                myFlow.flatMapMerge(2) { upstreamValue ->
                    flow {
                        delay(1000L - upstreamValue * 100)
                        repeat(2) {
                            emit(upstreamValue * 10 + it)
                        }
                    }
                }.collect {
                    println("collect $it")
                }
            }
        }
    }
    输出:
    collect 10
    collect 11
    collect 0
    collect 1
    collect 20
    collect 21
    
    

    flatMapLatest

    类似于RxJava中的switchMap

    fun main() {
        runBlocking(Dispatchers.Default) {
            val myFlow = flow {
                repeat(3) {
                    emit(it)
                }
            }
            launch {
                // 前面没执行完的Flow会被取消,然后被后续的Flow替换
                myFlow.flatMapLatest  { upstreamValue ->
                    flow {
                        delay(1000L - upstreamValue * 100)
                        repeat(2) {
                            emit(upstreamValue * 10 + it)
                        }
                    }
                }.collect {
                    println("collect $it")
                }
            }
        }
    }
    输出:
    collect 20
    collect 21
    
    

    总结

    介绍了Flow常用的操作符map flatMap(串式) flatMapMerge(并发) flatmapLatest(取代旧的)等简单使用。

    相关文章

      网友评论

          本文标题:Kotlin协程-Flow

          本文链接:https://www.haomeiwen.com/subject/lchpadtx.html