Kotlin SharedFlow 源码解析

作者: 唠嗑008 | 来源:发表于2023-06-24 15:16 被阅读0次

    前言

    对于SharedFlow使用可以看之前的文章 Kotlin SharedFlow 使用。在这篇文章中已经通过多个demo实战帮大家总结了SharedFlow的一些特性和使用场景。但是也遗留了一些疑惑,所以本文打算通过轻度阅读源码的方式给大家答疑解惑。放心,不会从头到尾的说。

    SharedFlow源码分析的重点

    在源码中可以分析的点很多,但我认为最重要的点有2点。emit和collect是如何关联的,缓存机制是怎样的。

    从上一篇文章可以了解到emit和collect是挂起函数,但是否被挂起是有一定条件的。而且生产者,消费者出现的时机也会影响数据流的执行,这些问题就是本文要研究的重点。

    下面分析emit和collect的小节中还会提出几个问题,带着大家通过解决问题的方式阅读源码。

    回顾下之前文章的demo
    为什么先发射数据,再collect就收不到数据呢

     runBlocking {
            val sharedFlow = MutableSharedFlow<Int>()
            sharedFlow.emit(1)
    
            launch {
                sharedFlow.collect {
                    println("collect: $it")
                }
            }
        }
    

    为什么设置了replay缓存,超出其缓存数量的时候,会丢失前面的数据,只能收到最新replay数量的数据呢?比如下面demo,只能收到2和3

      runBlocking {
            //默认参数情况,先emit,再collect收不到数据
            val sharedFlow = MutableSharedFlow<Int>(replay = 2, extraBufferCapacity = 0)
    
            sharedFlow.emit(1)
            sharedFlow.emit(2)
            sharedFlow.emit(3)
    
            launch {
                sharedFlow.collect {
                    println("collect: $it")
                }
            }
        }
    

    emit发射数据

    对于初学者来说,读懂这个方法确实有一定难度,不过我换了一个角度来带大家理解,从不同业务场景,使用角度来逐步分析,大致分为4种。

    • 没有收集者,无replay缓存
    • 没有收集者,有replay缓存(extraBufferCapacity缓存无影响)
    • 有收集者,无replay缓存
    • 有收集者,有replay缓存
      override suspend fun emit(value: T) {
    //尝试发送数据,这是一个快速路径,可以提高发送数据的效率。 
            if (tryEmit(value)) return // fast-path
    //在挂起的协程中发送数据
            emitSuspend(value)
        }
    

    emit方法先用不需要挂起的方式发数据,发失败之后采用挂起的方式发。

    以不挂起的方式发射数据

     override fun tryEmit(value: T): Boolean {
            var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
            val emitted = synchronized(this) {
    //尝试发射数据,发射成功返回true,失败返回false
                if (tryEmitLocked(value)) {
                  //找到需要恢复的协程,并将结果保存到  resumes  数组中,
                    resumes = findSlotsToResumeLocked(resumes)
                    true
                } else {
                    false
                }
            }
    //上面已经找到了需要恢复的协程,这里只需要恢复协程的执行
            for (cont in resumes) cont?.resume(Unit)
            return emitted
        }
    
     private fun tryEmitLocked(value: T): Boolean {
            // 没有收集者,一定返回true
            if (nCollectors == 0) return tryEmitNoCollectorsLocked(value) 
            // 有收集者,缓存区已满,超过replay + extraBufferCapacity数量,且消费者没有消费最旧的数据(replayIndex)
            if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
    //执行下面的缓存溢出策略
                when (onBufferOverflow) {
                    BufferOverflow.SUSPEND -> return false // will suspend
                    BufferOverflow.DROP_LATEST -> return true // just drop incoming
                    BufferOverflow.DROP_OLDEST -> {} //丢弃最旧的数据,这里暂不处理
                }
            }
    //将数据加入到缓存数组中,这里不会挂起emit所在的协程
            enqueueLocked(value)
            bufferSize++ //缓存数组长度
            // 上面的缓存溢出策略,丢弃最老数据是没做处理的,实际上延迟在这里处理
            if (bufferSize > bufferCapacity) dropOldestLocked()
            //  如果replayCache中数据的数量超过了最大容量
            if (replaySize > replay) {
    // 更新replayIndex的值,replayIndex向前移动一位
                updateBufferLocked(replayIndex + 1, minCollectorIndex, bufferEndIndex, queueEndIndex)
            }
            return true
        }
    

    这个方法已经涉及到shareflow缓存机制,所以有必要先来张图大概了解下缓存机制。

    image.png

    shareflow缓存是个数组,大小由 bufferSize 控制,而缓存容量由 bufferCapacity 控制。缓存由3部分组成,replay缓存的数量,extraBufferCapacity缓存的数量,这2部分加起来就是buffered values,还有就是挂起时候的Emitter对象。图中还展示了两个慢速收集器的位置,即可能收集缓存队列中的值的最慢速度的收集器的位置。这两个位置分别由 minCollectorIndex 和 replayIndex 控制。

    这个tryEmitLocked方法很重要,因为它可以解释之前文章中一些业务场景的困惑。 下面我们就来跟大家一起解剖下这个方法

    tryEmitLocked方法没有收集者

      //走到这里,说明没有收集者
     private fun tryEmitNoCollectorsLocked(value: T): Boolean {
            assert { nCollectors == 0 }
    //replay缓存为0,就丢弃数据,emit方法就结束了
            if (replay == 0) return true 
            enqueueLocked(value) // 加入到缓存数组
            bufferSize++ // value was added to buffer
             //若是emit发射的数量超过了重放个数,则丢弃最旧的值
            if (bufferSize > replay) dropOldestLocked()
            minCollectorIndex = head + bufferSize // a default value (max allowed)
            return true
        }
    

    这段代码也解释了之前的2个疑惑

    • 在不配置replay缓存的情况下,先emit发数据再collect是收不到数据的
    • 在配置了replay的情况下,先emit再collect是能收到数据,但是 emit发射的数量超过了replay的话,就只能收到最新的replay个数的数据

    如下2段代码,运行结果都只能收到2和3

     runBlocking {
            //默认参数情况,先emit,再collect收不到数据
            val sharedFlow = MutableSharedFlow<Int>(replay = 2, extraBufferCapacity = 0)
    
            sharedFlow.emit(1)
            sharedFlow.emit(2)
            sharedFlow.emit(3)
    
            launch {
                sharedFlow.collect {
                    println("collect: $it")
                }
            }
        }
    
    
     runBlocking {
            //这里配置了extraBufferCapacity根本不会起到效果
            val sharedFlow = MutableSharedFlow<Int>(replay = 2, extraBufferCapacity = 2)
    
            sharedFlow.emit(1)
            sharedFlow.emit(2)
            sharedFlow.emit(3)
    
            launch {
                sharedFlow.collect {
                    println("collect: $it")
                }
            }
        }
    

    从上面demo和tryEmitNoCollectorsLocked源码分析可以看出:如果emit发射到buffered values的数据数量超过了replay的值,会丢弃最旧的数据,保持buffered values中数据的数量最大为replay。

    当有新的订阅者时,会先从replayCache中获取数据,在buffered values中,replayCache前的数据只对已经订阅的订阅者有用,而此时又没有订阅者,因此缓存超过replayCache最大容量的数据只会占用更多内存,是没有意义的。记住:没有收集者时,extraBufferCapacity是不会起作用的

    tryEmitLocked方法有收集者
    上面已经分析了这个方法没有收集者的情况,接下来就分析下有收集者的情况

     private fun tryEmitLocked(value: T): Boolean {
            // 没有收集者,一定返回true
            if (nCollectors == 0) return tryEmitNoCollectorsLocked(value) 
            // 有收集者,缓存区已满,超过replay + extraBufferCapacity数量,且消费者没有消费最旧的数据(replayIndex)
            if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
    //执行下面的缓存溢出策略
                when (onBufferOverflow) {
                    BufferOverflow.SUSPEND -> return false // will suspend
                    BufferOverflow.DROP_LATEST -> return true // just drop incoming
                    BufferOverflow.DROP_OLDEST -> {} //丢弃最旧的数据,这里暂不处理
                }
            }
    //将数据加入到缓存数组中,这里不会挂起emit所在的协程
            enqueueLocked(value)
            bufferSize++ //缓存数组长度
            // 上面的缓存溢出策略,丢弃最老数据是没做处理的,实际上延迟在这里处理
            if (bufferSize > bufferCapacity) dropOldestLocked()
            //  如果replayCache中数据的数量超过了最大容量
            if (replaySize > replay) {
    // 更新replayIndex的值,replayIndex向前移动一位
                updateBufferLocked(replayIndex + 1, minCollectorIndex, bufferEndIndex, queueEndIndex)
            }
            return true
        }
    

    有收集者,但没有配置任何缓存

     runBlocking {
            //默认参数情况,先emit,再collect收不到数据
            val sharedFlow = MutableSharedFlow<Int>(replay = 0, extraBufferCapacity = 0)
    
            launch {
                sharedFlow.collect {
                    println("collect: $it")
                }
            }
    
            delay(200) //确保已经订阅
            sharedFlow.emit(1)
            sharedFlow.emit(2)
            sharedFlow.emit(3)
        }
    

    上面demo,是能收到1,2,3的。
    从源码可以看出,tryEmitLocked方法中,有订阅者的情况,即使没有配置缓存也会执行enqueueLocked(value)方法把数据加入到缓存数组。

    有订阅者,且配置了replay或者extraBufferCapacity缓存,会多了一个缓存溢出策略。有3种策略,挂起协程,丢弃最老的数据,丢弃最新的数据。

    emitSuspend挂起方式发数据

    private suspend fun emitSuspend(value: T) = suspendCancellableCoroutine<Unit> sc@{ cont ->
            var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
            val emitter = synchronized(this) lock@{
                //再次检查,确保缓存区满了,因为满了才会执行下面的逻辑
                if (tryEmitLocked(value)) {
                    cont.resume(Unit)
                    resumes = findSlotsToResumeLocked(resumes)
                    return@lock null
                }
                // 创建Emitter,加入到buffer里
                //可以去看下前面的那张图,Emitter是加到缓存区的什么位置的
                Emitter(this, head + totalSize, value, cont).also {
                    //加到之前的缓存数组
                    enqueueLocked(it)
                    queueSize++ // Emitter是挂起的,单独记录下数量
                    // 如果buffered value缓存没有数据,则收集已经挂起的订阅者的续体,保存到局部变量resumes中
                    if (bufferCapacity == 0) resumes = findSlotsToResumeLocked(resumes)
                }
            }
            // outside of the lock: register dispose on cancellation
            emitter?.let { cont.disposeOnCancellation(it) }
            // 恢复挂起的订阅者
            for (r in resumes) r?.resume(Unit)
        }
    

    这个方法比较简单,缓存区满了,创建Emitter加到buffer缓存区。

    小结

    阅读emit发射数据的流程,可以分2部分完成,不挂起发射和挂起发射。

    如果没有收集者,emit永远不会挂起。

    如果有收集者,并且buffered values缓存容量已满并且最旧的数据没有被消费,则emit有机会被挂起,当然这取决于你的溢出策略。

    collect消费数据

     override suspend fun collect(collector: FlowCollector<T>): Nothing {
             //分配槽位
            val slot = allocateSlot()
            try {
                if (collector is SubscribedFlowCollector) collector.onSubscription()
                val collectorJob = currentCoroutineContext()[Job]
                while (true) {
                    var newValue: Any?
                    while (true) {
                        //尝试获取值,获取到了就跳出循环,获取不到就挂起等待
                        newValue = tryTakeValue(slot) // attempt no-suspend fast path first
                        if (newValue !== NO_VALUE) break
                        awaitValue(slot) // await signal that the new value is available
                    }
                    //判断订阅者所在协程是否是存活
                    collectorJob?.ensureActive()
                  //回调到collect方法的lambda
                    collector.emit(newValue as T)
                }
            } finally {
                freeSlot(slot)
            }
        }
    

    关于slot的理解,可以看下图


    slot.png
    • 消费者开始collect,根据index找到buffer下标为0的元素即为可以消费的元素;
    • 拿到0号数据后,slot.index=1,找到buffer下标为1的元素
      index++,重复上诉步骤
     private fun tryTakeValue(slot: SharedFlowSlot): Any? {
            var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
            val value = synchronized(this) {
                // 从slot中获取index
               // index表示当前应该从缓存数组的index位置中获取数据
                val index = tryPeekLocked(slot)
                if (index < 0) {
                    //没有数据,返回空数据的标识
                    NO_VALUE
                } else {
                    val oldIndex = slot.index
                    //从缓存数组buffer中获取index对应的数据
                    val newValue = getPeekedValueLockedAt(index)
                  //slot索引加1,表示获取下个数据的位置
                    slot.index = index + 1 // points to the next index after peeked one
                    resumes = updateCollectorIndexLocked(oldIndex)
                    newValue
                }
            }
          //恢复协程
            for (resume in resumes) resume?.resume(Unit)
            return value
        }
    

    tryPeekLocked方法,是判断数据所在的位置是否符合要求。

     private suspend fun awaitValue(slot: SharedFlowSlot): Unit = suspendCancellableCoroutine { cont ->
            synchronized(this) lock@{
                //再次检查index
                val index = tryPeekLocked(slot) // recheck under this lock
                if (index < 0) {
                //保存续体cont到slot
                    slot.cont = cont // Ok -- suspending
                } else {
                    //说明有值,不需要再继续挂起了,通过resume恢复协程
                    cont.resume(Unit) // has value, no need to suspend
                    return@lock
                }
                slot.cont = cont // suspend, waiting
            }
        }
    

    suspendCancellableCoroutine 是一个挂起函数,用于创建可取消的协程。在协程中调用 suspendCancellableCoroutine 函数时,它会创建一个 CancellableContinuation 对象,并将其传递给一个lambda表达式。该lambda表达式中的代码可以使用 suspend 关键字挂起当前协程,并在某个条件满足时或协程被取消时恢复协程。

    collect小结
    collect方法会构造Slot对象,然后开启死循环去不断匹配缓存区的数据。具体是根据Slot 中的index匹配缓存区buffer中的数据,如果匹配到了,执行collect闭包;匹配不到就挂起协程,挂起的协程会在有新数据时被生产者所恢复。无论是否有生产者,只要没拿到数据,collect都会被挂起。

    相关文章

      网友评论

        本文标题:Kotlin SharedFlow 源码解析

        本文链接:https://www.haomeiwen.com/subject/dhzjydtx.html