美文网首页Android开发经验谈Android开发Android技术知识
Kotlin Flow 背压和线程切换竟然如此相似

Kotlin Flow 背压和线程切换竟然如此相似

作者: 小鱼人爱编程 | 来源:发表于2022-12-04 00:01 被阅读0次

    前言

    上篇分析了Kotlin Flow原理,大部分操作符实现比较简单,相较而言背压和线程切换比较复杂,遗憾的是,纵观网上大部分文章,关于Flow背压和协程切换这块的原理说得比较少,语焉不详,鉴于此,本篇重点分析两者的原理及使用。
    通过本篇文章,你将了解到:

    1. 什么是背压?
    2. 如何处理背压?
    3. Flow buffer的原理
    4. Flow 线程切换的使用
    5. Flow 线程切换的原理

    1. 什么是背压?

    先看自然界的水流:


    image.png

    为了充分利用水资源,人类建立了大坝,以大坝为分界点将水流分为上游和下游。

    当上游的流速大于下游的流速,日积月累,最终导致大坝溢出,此种现象称为背压的出现

    而对于Kotlin里的Flow,也有上游(生产者)、下游(消费者)的概念,如:

        suspend fun testBuffer1() {
            var flow = flow {
                //生产者
                (1..3).forEach {
                    println("emit $it")
                    emit(it)
                }
            }
    
            flow.collect {
                //消费者
                println("collect:$it")
            }
        }
    

    通过collect操作符触发了流,从生产者生产数据(flow闭包),到消费者接收并处理数据(collect闭包),这就完成了流从上游到下游的一次流动过程。

    2. 如何处理背压?

    模拟一个生产者消费者速度不一致的场景:

        suspend fun testBuffer3() {
            var flow = flow {
                (1..3).forEach {
                    delay(1000)
                    println("emit $it")
                    emit(it)
                }
            }
    
            var time = measureTimeMillis {
                flow.collect {
                    delay(2000)
                    println("collect:$it")
                }
            }
            println("use time:${time} ms")
        }
    

    计算流从生产到消费的整个时间:


    image.png

    生产者的速度比消费者的速度快,而它俩都是在同一个线程里顺序执行的,生产者必须等待消费者消费完毕后才会进行下一次生产。
    因此,整个流的耗时=生产者耗时(3 * 1000ms)+消费者耗时(3 * 2000ms)=9s。

    显而易见,消费者影响了生产者的速度,这种情况下该怎么优化呢?
    最简单的解决方案:

    生产者和消费者分别在不同的线程执行

    如:

        suspend fun testBuffer4() {
            var flow = flow {
                (1..3).forEach {
                    delay(1000)
                    println("emit $it in thread:${Thread.currentThread()}")
                    emit(it)
                }
            }.flowOn(Dispatchers.IO)
    
            var time = measureTimeMillis {
                flow.collect {
                    delay(2000)
                    println("collect:$it in thread:${Thread.currentThread()}")
                }
            }
            println("use time:${time} ms")
        }
    

    添加了flowOn()函数,它的存在使得它前面的代码在指定的线程里执行,如flow闭包了的代码都在IO线程执行,也就是生产者在IO线程执行。
    而消费者在当前线程执行,因此两者无需相互等待,节省了总时间:


    image.png

    确实是减少了时间,提升了效率。但我们知道开启线程代价还是挺大的,既然都在协程里运行了,能否借助协程的特性:协程挂起不阻塞线程 来完成此事呢?
    此时,Buffer出场了,先看看它是如何表演的:

        suspend fun testBuffer5() {
            var flow = flow {
                (1..3).forEach {
                    delay(1000)
                    println("emit $it in thread:${Thread.currentThread()}")
                    emit(it)
                }
            }.buffer(5)
    
            var time = measureTimeMillis {
                flow.collect {
                    delay(2000)
                    println("collect:$it in thread:${Thread.currentThread()}")
                }
            }
            println("use time:${time} ms")
        }
    

    这次没有使用flowOn,取而代之的是buffer。
    运行结果如下:


    image.png

    可以看出,生产者消费者都是在同一线程执行,但总耗时却和不在同一线程运行时相差无几。
    那么它是如何做到的呢?这就得从buffer的源码说起。

    3. Flow buffer的原理

    无buffer

    先看看没有buffer时的耗时:

        suspend fun testBuffer3() {
            var flow = flow {
                (1..3).forEach {
                    delay(1000)
                    println("emit $it")
                    emit(it)
                }
            }
    
            var time = measureTimeMillis {
                flow.collect {
                    delay(2000)
                    println("collect:$it")
                }
            }
            println("use time:${time} ms")
        }
    
    image.png

    从collect开始,依次执行flow闭包,通过emit调用到collect闭包,因为flow闭包里包含了几次emit,因此整个流程会有几次发射。
    如上图,从步骤1到步骤8,因为是在同一个线程里,因此是串行执行的,整个流的耗时即为生产者到消费者(步骤1~步骤8)的耗时。

    有buffer

    在没看源码之前,我们先猜测一下它的流程:


    image.png

    每次emit都发送到buffer里,然后立刻回来继续发送,如此一来生产者没有被消费者的速度拖累。
    而消费者会检测Buffer里是否有数据,有则取出来。

    根据之前的经验我们知道:collect调用到emit最后到buffer是线性调用的,放入buffer后继续循环emit,那么问题来了:

    是谁触发了collect闭包的调用呢?

    接下来深入源码,探究答案。

    buffer源码流程分析

    创建Flow

    public fun <T> Flow<T>.buffer(capacity: Int = Channel.BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow<T> {
        var capacity = capacity//buffer容量
        var onBufferOverflow = onBufferOverflow//buffer满之后的处理策略
        if (capacity == Channel.CONFLATED) {
            capacity = 0
            onBufferOverflow = BufferOverflow.DROP_OLDEST
        }
        // create a flow
        return when (this) {
            is FusibleFlow -> fuse(capacity = capacity, onBufferOverflow = onBufferOverflow)
            //走else 分支,构造ChannelFlowOperatorImpl
            else -> ChannelFlowOperatorImpl(this, capacity = capacity, onBufferOverflow = onBufferOverflow)
        }
    }
    

    buffer 返回Flow实例,其间涉及几个重要的类和函数:


    image.png

    调用collect
    当调用Flow.collect时:

    public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
        collect(object : FlowCollector<T> {
            override suspend fun emit(value: T) = action(value)
        })
    

    构造了匿名内部类FlowCollector,并实现了emit方法,它的实现为collect的闭包。

    调用ChannelFlowOperatorImpl.collect最终会调用ChannelFlow.collect:

        override suspend fun collect(collector: FlowCollector<T>): Unit =
            coroutineScope {
                collector.emitAll(produceImpl(this))
            }
    
        public open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
            scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun)
    
    

    produceImpl 创建了Channel,内部开启了协程,返回ReceiveChannel。

    再来看emitAll函数:

    private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) {
        ensureActive()
        var cause: Throwable? = null
        try {
            while (true) {
                //挂起等待Channel数据
                val result = run { channel.receiveCatching() }
                if (result.isClosed) {
                    //Channel关闭后才会退出循环
                    result.exceptionOrNull()?.let { throw it }
                    break // returns normally when result.closeCause == null
                }
                //发送数据
                emit(result.getOrThrow())
            }
        } catch (e: Throwable) {
            cause = e
            throw e
        } finally {
            if (consume) channel.cancelConsumed(cause)
        }
    }
    

    Channel此时并没有数据,因此协程会挂起等待。

    Channel发送
    Channel什么时候有数据呢?当然是在调用了Channel.send()函数后。
    前面提到过collect之后开启了协程:

      public open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
            scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun)
    
      internal val collectToFun: suspend (ProducerScope<T>) -> Unit
            get() = { collectTo(it) }
    
      protected override suspend fun collectTo(scope: ProducerScope<T>) =
            flowCollect(SendingCollector(scope))
    

    此时传入的参数为:collectToFun,最后构造了:

    public class SendingCollector<T>(
        private val channel: SendChannel<T>
    ) : FlowCollector<T> {
        override suspend fun emit(value: T): Unit = channel.send(value)
    }
    

    当协程得到执行时,会调用collectToFun-->collectTo(it)-->flowCollect(SendingCollector(scope)),最终调用到:

    #ChannelFlowOperatorImpl
        override suspend fun flowCollect(collector: FlowCollector<T>) =
            flow.collect(collector)
    

    而该flow为最开始的flow,collector为SendingCollector。
    flow.collect后会调用到flow的闭包,进而调用到emit函数:

        private fun emit(uCont: Continuation<Unit>, value: T): Any? {
            val currentContext = uCont.context
            currentContext.ensureActive()
            //...
            completion = uCont
            return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
        }
    

    emitFun本质上会调用collector里的emit函数,而此时的collector即为SendingCollector,最后调用channel.send(value)

    如此一来,Channel就将数据发送出去了,此时channel.receiveCatching()被唤醒,接下来执行emit(result.getOrThrow()),这函数最后会流转到最初始的collect的闭包里。
    上面的分析即为生产者到消费者的流转过程,单看源码可能比较乱,看图解惑:

    image.png

    红色部分和绿色部分分别为不同的协程,它俩的关联点即是蓝色部分。

    Flow buffer的本质上是利用了Channel进行数据的发送和接收

    buffer为啥能提升效率

    前面分析过无buffer时生产者消费者的流程图,作为对比,我们也将加入buffer后生产者消费者的流程图。


    image.png

    还是以相同的demo,阐述其流程:

    1. 生产者挂起1s,当1s结束后调用emit发射数据,此时数据放入buffer里,生产者调用delay继续挂起
    2. 此时消费者被唤醒,然后挂起 2s等待
    3. 第2s到来之时,生产者调用emit发送数据到buffer里,继续挂起
    4. 第2s到来之时,消费者结束挂起,消费数据,然后继续挂起2s
    5. 第3s到来之时,生产者继续生产数据,而后生产者退出生产
    6. 第5s到来之时,消费者挂起结束,消费数据,然后继续挂起2s
    7. 第7s到来之时,消费者挂起结束,消费结束,此时因为channel里已经没有数据了,退出循环,最终消费者退出

    由此可见,总共花费了7s。

    image.png
    ps:协程调度时机不同,打印顺序可能略有差异,但总体耗时不变。

    至此,我们找到了buffer能够提高效率的原因:

    生产者、消费者运行在不同的协程,挂起操作不阻塞对方

    抛出一个比较有意思的问题:以下代码加buffer之后效率会有提升吗?

        suspend fun testBuffer6() {
            var flow = flow {
                (1..3).forEach {
                    println("emit $it")
                    emit(it)
                }
            }
            var time = measureTimeMillis {
                flow.collect {
                    delay(2000)
                    println("collect:$it")
                }
            }
            println("use time:${time} ms")
        }
    

    在未实验之前,如果你已经有答案,恭喜你已经弄懂了buffer的本质。

    4. Flow 线程切换的使用

        suspend fun testBuffer4() {
            var flow = flow {
                (1..3).forEach {
                    delay(1000)
                    println("emit $it in thread:${Thread.currentThread()}")
                    emit(it)
                }
            }.flowOn(Dispatchers.IO)
    
            var time = measureTimeMillis {
                flow.collect {
                    delay(2000)
                    println("collect:$it in thread:${Thread.currentThread()}")
                }
            }
            println("use time:${time} ms")
        }
    

    flowOn(Dispatchers.IO)表示其之前的操作符(函数)都在IO线程执行,如这里的意思是flow闭包里的代码在IO线程执行。
    而其之后的操作符(函数)在当前的线程执行。
    通常用在子线程里获取网络数据(flow闭包),然后再collect闭包里(主线程)更新UI。

    5. Flow 线程切换的原理

    public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
        checkFlowContext(context)
        return when {
            context == EmptyCoroutineContext -> this
            this is FusibleFlow -> fuse(context = context)
            else -> ChannelFlowOperatorImpl(this, context = context)
        }
    }
    

    看到这你可能已经有答案了:这不就和buffer一样的方式吗?
    但仔细看,此处多了个上下文:CoroutineContext。
    CoroutineContext的作用就是用来决定协程运行在哪个线程。

    前面分析的buffer时,我们的协程的作用域是runBlocking,即使生产者、消费者在不同的协程,但是它们始终在同一个线程里执行。
    而使用了flowOn指定线程,此时生产者、消费者在不同的线程运行协程。
    因此,只要弄懂了buffer原理,flowOn原理自然而然就懂了。


    image.png

    以上为Flow背压和线程切换的全部内容,下篇将分析Flow的热流。
    本文基于Kotlin 1.5.3,文中完整Demo请点击

    您若喜欢,请点赞、关注、收藏,您的鼓励是我前进的动力

    持续更新中,和我一起步步为营系统、深入学习Android/Kotlin

    1、Android各种Context的前世今生
    2、Android DecorView 必知必会
    3、Window/WindowManager 不可不知之事
    4、View Measure/Layout/Draw 真明白了
    5、Android事件分发全套服务
    6、Android invalidate/postInvalidate/requestLayout 彻底厘清
    7、Android Window 如何确定大小/onMeasure()多次执行原因
    8、Android事件驱动Handler-Message-Looper解析
    9、Android 键盘一招搞定
    10、Android 各种坐标彻底明了
    11、Android Activity/Window/View 的background
    12、Android Activity创建到View的显示过
    13、Android IPC 系列
    14、Android 存储系列
    15、Java 并发系列不再疑惑
    16、Java 线程池系列
    17、Android Jetpack 前置基础系列
    18、Android Jetpack 易懂易学系列
    19、Kotlin 轻松入门系列
    20、Kotlin 协程系列全面解读

    相关文章

      网友评论

        本文标题:Kotlin Flow 背压和线程切换竟然如此相似

        本文链接:https://www.haomeiwen.com/subject/lghoxdtx.html