Kotlin SharedFlow 源码解析

作者: 唠嗑008 | 来源:发表于2023-06-24 15:16 被阅读0次

前言

对于SharedFlow使用可以看之前的文章 Kotlin SharedFlow 使用。在这篇文章中已经通过多个demo实战帮大家总结了SharedFlow的一些特性和使用场景。但是也遗留了一些疑惑,所以本文打算通过轻度阅读源码的方式给大家答疑解惑。放心,不会从头到尾的说。

SharedFlow源码分析的重点

在源码中可以分析的点很多,但我认为最重要的点有2点。emit和collect是如何关联的,缓存机制是怎样的。

从上一篇文章可以了解到emit和collect是挂起函数,但是否被挂起是有一定条件的。而且生产者,消费者出现的时机也会影响数据流的执行,这些问题就是本文要研究的重点。

下面分析emit和collect的小节中还会提出几个问题,带着大家通过解决问题的方式阅读源码。

回顾下之前文章的demo
为什么先发射数据,再collect就收不到数据呢

 runBlocking {
        val sharedFlow = MutableSharedFlow<Int>()
        sharedFlow.emit(1)

        launch {
            sharedFlow.collect {
                println("collect: $it")
            }
        }
    }

为什么设置了replay缓存,超出其缓存数量的时候,会丢失前面的数据,只能收到最新replay数量的数据呢?比如下面demo,只能收到2和3

  runBlocking {
        //默认参数情况,先emit,再collect收不到数据
        val sharedFlow = MutableSharedFlow<Int>(replay = 2, extraBufferCapacity = 0)

        sharedFlow.emit(1)
        sharedFlow.emit(2)
        sharedFlow.emit(3)

        launch {
            sharedFlow.collect {
                println("collect: $it")
            }
        }
    }

emit发射数据

对于初学者来说,读懂这个方法确实有一定难度,不过我换了一个角度来带大家理解,从不同业务场景,使用角度来逐步分析,大致分为4种。

  • 没有收集者,无replay缓存
  • 没有收集者,有replay缓存(extraBufferCapacity缓存无影响)
  • 有收集者,无replay缓存
  • 有收集者,有replay缓存
  override suspend fun emit(value: T) {
//尝试发送数据,这是一个快速路径,可以提高发送数据的效率。 
        if (tryEmit(value)) return // fast-path
//在挂起的协程中发送数据
        emitSuspend(value)
    }

emit方法先用不需要挂起的方式发数据,发失败之后采用挂起的方式发。

以不挂起的方式发射数据

 override fun tryEmit(value: T): Boolean {
        var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
        val emitted = synchronized(this) {
//尝试发射数据,发射成功返回true,失败返回false
            if (tryEmitLocked(value)) {
              //找到需要恢复的协程,并将结果保存到  resumes  数组中,
                resumes = findSlotsToResumeLocked(resumes)
                true
            } else {
                false
            }
        }
//上面已经找到了需要恢复的协程,这里只需要恢复协程的执行
        for (cont in resumes) cont?.resume(Unit)
        return emitted
    }
 private fun tryEmitLocked(value: T): Boolean {
        // 没有收集者,一定返回true
        if (nCollectors == 0) return tryEmitNoCollectorsLocked(value) 
        // 有收集者,缓存区已满,超过replay + extraBufferCapacity数量,且消费者没有消费最旧的数据(replayIndex)
        if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
//执行下面的缓存溢出策略
            when (onBufferOverflow) {
                BufferOverflow.SUSPEND -> return false // will suspend
                BufferOverflow.DROP_LATEST -> return true // just drop incoming
                BufferOverflow.DROP_OLDEST -> {} //丢弃最旧的数据,这里暂不处理
            }
        }
//将数据加入到缓存数组中,这里不会挂起emit所在的协程
        enqueueLocked(value)
        bufferSize++ //缓存数组长度
        // 上面的缓存溢出策略,丢弃最老数据是没做处理的,实际上延迟在这里处理
        if (bufferSize > bufferCapacity) dropOldestLocked()
        //  如果replayCache中数据的数量超过了最大容量
        if (replaySize > replay) {
// 更新replayIndex的值,replayIndex向前移动一位
            updateBufferLocked(replayIndex + 1, minCollectorIndex, bufferEndIndex, queueEndIndex)
        }
        return true
    }

这个方法已经涉及到shareflow缓存机制,所以有必要先来张图大概了解下缓存机制。

image.png

shareflow缓存是个数组,大小由 bufferSize 控制,而缓存容量由 bufferCapacity 控制。缓存由3部分组成,replay缓存的数量,extraBufferCapacity缓存的数量,这2部分加起来就是buffered values,还有就是挂起时候的Emitter对象。图中还展示了两个慢速收集器的位置,即可能收集缓存队列中的值的最慢速度的收集器的位置。这两个位置分别由 minCollectorIndex 和 replayIndex 控制。

这个tryEmitLocked方法很重要,因为它可以解释之前文章中一些业务场景的困惑。 下面我们就来跟大家一起解剖下这个方法

tryEmitLocked方法没有收集者

  //走到这里,说明没有收集者
 private fun tryEmitNoCollectorsLocked(value: T): Boolean {
        assert { nCollectors == 0 }
//replay缓存为0,就丢弃数据,emit方法就结束了
        if (replay == 0) return true 
        enqueueLocked(value) // 加入到缓存数组
        bufferSize++ // value was added to buffer
         //若是emit发射的数量超过了重放个数,则丢弃最旧的值
        if (bufferSize > replay) dropOldestLocked()
        minCollectorIndex = head + bufferSize // a default value (max allowed)
        return true
    }

这段代码也解释了之前的2个疑惑

  • 在不配置replay缓存的情况下,先emit发数据再collect是收不到数据的
  • 在配置了replay的情况下,先emit再collect是能收到数据,但是 emit发射的数量超过了replay的话,就只能收到最新的replay个数的数据

如下2段代码,运行结果都只能收到2和3

 runBlocking {
        //默认参数情况,先emit,再collect收不到数据
        val sharedFlow = MutableSharedFlow<Int>(replay = 2, extraBufferCapacity = 0)

        sharedFlow.emit(1)
        sharedFlow.emit(2)
        sharedFlow.emit(3)

        launch {
            sharedFlow.collect {
                println("collect: $it")
            }
        }
    }


 runBlocking {
        //这里配置了extraBufferCapacity根本不会起到效果
        val sharedFlow = MutableSharedFlow<Int>(replay = 2, extraBufferCapacity = 2)

        sharedFlow.emit(1)
        sharedFlow.emit(2)
        sharedFlow.emit(3)

        launch {
            sharedFlow.collect {
                println("collect: $it")
            }
        }
    }

从上面demo和tryEmitNoCollectorsLocked源码分析可以看出:如果emit发射到buffered values的数据数量超过了replay的值,会丢弃最旧的数据,保持buffered values中数据的数量最大为replay。

当有新的订阅者时,会先从replayCache中获取数据,在buffered values中,replayCache前的数据只对已经订阅的订阅者有用,而此时又没有订阅者,因此缓存超过replayCache最大容量的数据只会占用更多内存,是没有意义的。记住:没有收集者时,extraBufferCapacity是不会起作用的

tryEmitLocked方法有收集者
上面已经分析了这个方法没有收集者的情况,接下来就分析下有收集者的情况

 private fun tryEmitLocked(value: T): Boolean {
        // 没有收集者,一定返回true
        if (nCollectors == 0) return tryEmitNoCollectorsLocked(value) 
        // 有收集者,缓存区已满,超过replay + extraBufferCapacity数量,且消费者没有消费最旧的数据(replayIndex)
        if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
//执行下面的缓存溢出策略
            when (onBufferOverflow) {
                BufferOverflow.SUSPEND -> return false // will suspend
                BufferOverflow.DROP_LATEST -> return true // just drop incoming
                BufferOverflow.DROP_OLDEST -> {} //丢弃最旧的数据,这里暂不处理
            }
        }
//将数据加入到缓存数组中,这里不会挂起emit所在的协程
        enqueueLocked(value)
        bufferSize++ //缓存数组长度
        // 上面的缓存溢出策略,丢弃最老数据是没做处理的,实际上延迟在这里处理
        if (bufferSize > bufferCapacity) dropOldestLocked()
        //  如果replayCache中数据的数量超过了最大容量
        if (replaySize > replay) {
// 更新replayIndex的值,replayIndex向前移动一位
            updateBufferLocked(replayIndex + 1, minCollectorIndex, bufferEndIndex, queueEndIndex)
        }
        return true
    }

有收集者,但没有配置任何缓存

 runBlocking {
        //默认参数情况,先emit,再collect收不到数据
        val sharedFlow = MutableSharedFlow<Int>(replay = 0, extraBufferCapacity = 0)

        launch {
            sharedFlow.collect {
                println("collect: $it")
            }
        }

        delay(200) //确保已经订阅
        sharedFlow.emit(1)
        sharedFlow.emit(2)
        sharedFlow.emit(3)
    }

上面demo,是能收到1,2,3的。
从源码可以看出,tryEmitLocked方法中,有订阅者的情况,即使没有配置缓存也会执行enqueueLocked(value)方法把数据加入到缓存数组。

有订阅者,且配置了replay或者extraBufferCapacity缓存,会多了一个缓存溢出策略。有3种策略,挂起协程,丢弃最老的数据,丢弃最新的数据。

emitSuspend挂起方式发数据

private suspend fun emitSuspend(value: T) = suspendCancellableCoroutine<Unit> sc@{ cont ->
        var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
        val emitter = synchronized(this) lock@{
            //再次检查,确保缓存区满了,因为满了才会执行下面的逻辑
            if (tryEmitLocked(value)) {
                cont.resume(Unit)
                resumes = findSlotsToResumeLocked(resumes)
                return@lock null
            }
            // 创建Emitter,加入到buffer里
            //可以去看下前面的那张图,Emitter是加到缓存区的什么位置的
            Emitter(this, head + totalSize, value, cont).also {
                //加到之前的缓存数组
                enqueueLocked(it)
                queueSize++ // Emitter是挂起的,单独记录下数量
                // 如果buffered value缓存没有数据,则收集已经挂起的订阅者的续体,保存到局部变量resumes中
                if (bufferCapacity == 0) resumes = findSlotsToResumeLocked(resumes)
            }
        }
        // outside of the lock: register dispose on cancellation
        emitter?.let { cont.disposeOnCancellation(it) }
        // 恢复挂起的订阅者
        for (r in resumes) r?.resume(Unit)
    }

这个方法比较简单,缓存区满了,创建Emitter加到buffer缓存区。

小结

阅读emit发射数据的流程,可以分2部分完成,不挂起发射和挂起发射。

如果没有收集者,emit永远不会挂起。

如果有收集者,并且buffered values缓存容量已满并且最旧的数据没有被消费,则emit有机会被挂起,当然这取决于你的溢出策略。

collect消费数据

 override suspend fun collect(collector: FlowCollector<T>): Nothing {
         //分配槽位
        val slot = allocateSlot()
        try {
            if (collector is SubscribedFlowCollector) collector.onSubscription()
            val collectorJob = currentCoroutineContext()[Job]
            while (true) {
                var newValue: Any?
                while (true) {
                    //尝试获取值,获取到了就跳出循环,获取不到就挂起等待
                    newValue = tryTakeValue(slot) // attempt no-suspend fast path first
                    if (newValue !== NO_VALUE) break
                    awaitValue(slot) // await signal that the new value is available
                }
                //判断订阅者所在协程是否是存活
                collectorJob?.ensureActive()
              //回调到collect方法的lambda
                collector.emit(newValue as T)
            }
        } finally {
            freeSlot(slot)
        }
    }

关于slot的理解,可以看下图


slot.png
  • 消费者开始collect,根据index找到buffer下标为0的元素即为可以消费的元素;
  • 拿到0号数据后,slot.index=1,找到buffer下标为1的元素
    index++,重复上诉步骤
 private fun tryTakeValue(slot: SharedFlowSlot): Any? {
        var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
        val value = synchronized(this) {
            // 从slot中获取index
           // index表示当前应该从缓存数组的index位置中获取数据
            val index = tryPeekLocked(slot)
            if (index < 0) {
                //没有数据,返回空数据的标识
                NO_VALUE
            } else {
                val oldIndex = slot.index
                //从缓存数组buffer中获取index对应的数据
                val newValue = getPeekedValueLockedAt(index)
              //slot索引加1,表示获取下个数据的位置
                slot.index = index + 1 // points to the next index after peeked one
                resumes = updateCollectorIndexLocked(oldIndex)
                newValue
            }
        }
      //恢复协程
        for (resume in resumes) resume?.resume(Unit)
        return value
    }

tryPeekLocked方法,是判断数据所在的位置是否符合要求。

 private suspend fun awaitValue(slot: SharedFlowSlot): Unit = suspendCancellableCoroutine { cont ->
        synchronized(this) lock@{
            //再次检查index
            val index = tryPeekLocked(slot) // recheck under this lock
            if (index < 0) {
            //保存续体cont到slot
                slot.cont = cont // Ok -- suspending
            } else {
                //说明有值,不需要再继续挂起了,通过resume恢复协程
                cont.resume(Unit) // has value, no need to suspend
                return@lock
            }
            slot.cont = cont // suspend, waiting
        }
    }

suspendCancellableCoroutine 是一个挂起函数,用于创建可取消的协程。在协程中调用 suspendCancellableCoroutine 函数时,它会创建一个 CancellableContinuation 对象,并将其传递给一个lambda表达式。该lambda表达式中的代码可以使用 suspend 关键字挂起当前协程,并在某个条件满足时或协程被取消时恢复协程。

collect小结
collect方法会构造Slot对象,然后开启死循环去不断匹配缓存区的数据。具体是根据Slot 中的index匹配缓存区buffer中的数据,如果匹配到了,执行collect闭包;匹配不到就挂起协程,挂起的协程会在有新数据时被生产者所恢复。无论是否有生产者,只要没拿到数据,collect都会被挂起。

相关文章

网友评论

    本文标题:Kotlin SharedFlow 源码解析

    本文链接:https://www.haomeiwen.com/subject/dhzjydtx.html