前言
对于SharedFlow使用可以看之前的文章 Kotlin SharedFlow 使用。在这篇文章中已经通过多个demo实战帮大家总结了SharedFlow的一些特性和使用场景。但是也遗留了一些疑惑,所以本文打算通过轻度阅读源码的方式给大家答疑解惑。放心,不会从头到尾的说。
SharedFlow源码分析的重点
在源码中可以分析的点很多,但我认为最重要的点有2点。emit和collect是如何关联的,缓存机制是怎样的。
从上一篇文章可以了解到emit和collect是挂起函数,但是否被挂起是有一定条件的。而且生产者,消费者出现的时机也会影响数据流的执行,这些问题就是本文要研究的重点。
下面分析emit和collect的小节中还会提出几个问题,带着大家通过解决问题的方式阅读源码。
回顾下之前文章的demo
为什么先发射数据,再collect就收不到数据呢
runBlocking {
val sharedFlow = MutableSharedFlow<Int>()
sharedFlow.emit(1)
launch {
sharedFlow.collect {
println("collect: $it")
}
}
}
为什么设置了replay缓存,超出其缓存数量的时候,会丢失前面的数据,只能收到最新replay数量的数据呢?比如下面demo,只能收到2和3
runBlocking {
//默认参数情况,先emit,再collect收不到数据
val sharedFlow = MutableSharedFlow<Int>(replay = 2, extraBufferCapacity = 0)
sharedFlow.emit(1)
sharedFlow.emit(2)
sharedFlow.emit(3)
launch {
sharedFlow.collect {
println("collect: $it")
}
}
}
emit发射数据
对于初学者来说,读懂这个方法确实有一定难度,不过我换了一个角度来带大家理解,从不同业务场景,使用角度来逐步分析,大致分为4种。
- 没有收集者,无replay缓存
- 没有收集者,有replay缓存(extraBufferCapacity缓存无影响)
- 有收集者,无replay缓存
- 有收集者,有replay缓存
override suspend fun emit(value: T) {
//尝试发送数据,这是一个快速路径,可以提高发送数据的效率。
if (tryEmit(value)) return // fast-path
//在挂起的协程中发送数据
emitSuspend(value)
}
emit方法先用不需要挂起的方式发数据,发失败之后采用挂起的方式发。
以不挂起的方式发射数据
override fun tryEmit(value: T): Boolean {
var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
val emitted = synchronized(this) {
//尝试发射数据,发射成功返回true,失败返回false
if (tryEmitLocked(value)) {
//找到需要恢复的协程,并将结果保存到 resumes 数组中,
resumes = findSlotsToResumeLocked(resumes)
true
} else {
false
}
}
//上面已经找到了需要恢复的协程,这里只需要恢复协程的执行
for (cont in resumes) cont?.resume(Unit)
return emitted
}
private fun tryEmitLocked(value: T): Boolean {
// 没有收集者,一定返回true
if (nCollectors == 0) return tryEmitNoCollectorsLocked(value)
// 有收集者,缓存区已满,超过replay + extraBufferCapacity数量,且消费者没有消费最旧的数据(replayIndex)
if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
//执行下面的缓存溢出策略
when (onBufferOverflow) {
BufferOverflow.SUSPEND -> return false // will suspend
BufferOverflow.DROP_LATEST -> return true // just drop incoming
BufferOverflow.DROP_OLDEST -> {} //丢弃最旧的数据,这里暂不处理
}
}
//将数据加入到缓存数组中,这里不会挂起emit所在的协程
enqueueLocked(value)
bufferSize++ //缓存数组长度
// 上面的缓存溢出策略,丢弃最老数据是没做处理的,实际上延迟在这里处理
if (bufferSize > bufferCapacity) dropOldestLocked()
// 如果replayCache中数据的数量超过了最大容量
if (replaySize > replay) {
// 更新replayIndex的值,replayIndex向前移动一位
updateBufferLocked(replayIndex + 1, minCollectorIndex, bufferEndIndex, queueEndIndex)
}
return true
}
这个方法已经涉及到shareflow缓存机制,所以有必要先来张图大概了解下缓存机制。
image.pngshareflow缓存是个数组,大小由 bufferSize 控制,而缓存容量由 bufferCapacity 控制。缓存由3部分组成,replay缓存的数量,extraBufferCapacity缓存的数量,这2部分加起来就是buffered values,还有就是挂起时候的Emitter对象。图中还展示了两个慢速收集器的位置,即可能收集缓存队列中的值的最慢速度的收集器的位置。这两个位置分别由 minCollectorIndex 和 replayIndex 控制。
这个tryEmitLocked
方法很重要,因为它可以解释之前文章中一些业务场景的困惑。 下面我们就来跟大家一起解剖下这个方法
tryEmitLocked方法没有收集者
//走到这里,说明没有收集者
private fun tryEmitNoCollectorsLocked(value: T): Boolean {
assert { nCollectors == 0 }
//replay缓存为0,就丢弃数据,emit方法就结束了
if (replay == 0) return true
enqueueLocked(value) // 加入到缓存数组
bufferSize++ // value was added to buffer
//若是emit发射的数量超过了重放个数,则丢弃最旧的值
if (bufferSize > replay) dropOldestLocked()
minCollectorIndex = head + bufferSize // a default value (max allowed)
return true
}
这段代码也解释了之前的2个疑惑
- 在不配置replay缓存的情况下,先emit发数据再collect是收不到数据的
- 在配置了replay的情况下,先emit再collect是能收到数据,但是 emit发射的数量超过了replay的话,就只能收到最新的replay个数的数据
如下2段代码,运行结果都只能收到2和3
runBlocking {
//默认参数情况,先emit,再collect收不到数据
val sharedFlow = MutableSharedFlow<Int>(replay = 2, extraBufferCapacity = 0)
sharedFlow.emit(1)
sharedFlow.emit(2)
sharedFlow.emit(3)
launch {
sharedFlow.collect {
println("collect: $it")
}
}
}
runBlocking {
//这里配置了extraBufferCapacity根本不会起到效果
val sharedFlow = MutableSharedFlow<Int>(replay = 2, extraBufferCapacity = 2)
sharedFlow.emit(1)
sharedFlow.emit(2)
sharedFlow.emit(3)
launch {
sharedFlow.collect {
println("collect: $it")
}
}
}
从上面demo和
tryEmitNoCollectorsLocked
源码分析可以看出:如果emit
发射到buffered values的数据数量超过了replay的值,会丢弃最旧的数据,保持buffered values中数据的数量最大为replay。
当有新的订阅者时,会先从replayCache中获取数据,在buffered values中,replayCache前的数据只对已经订阅的订阅者有用,而此时又没有订阅者,因此缓存超过replayCache最大容量的数据只会占用更多内存,是没有意义的。记住:没有收集者时,extraBufferCapacity是不会起作用的
tryEmitLocked方法有收集者
上面已经分析了这个方法没有收集者的情况,接下来就分析下有收集者的情况
private fun tryEmitLocked(value: T): Boolean {
// 没有收集者,一定返回true
if (nCollectors == 0) return tryEmitNoCollectorsLocked(value)
// 有收集者,缓存区已满,超过replay + extraBufferCapacity数量,且消费者没有消费最旧的数据(replayIndex)
if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
//执行下面的缓存溢出策略
when (onBufferOverflow) {
BufferOverflow.SUSPEND -> return false // will suspend
BufferOverflow.DROP_LATEST -> return true // just drop incoming
BufferOverflow.DROP_OLDEST -> {} //丢弃最旧的数据,这里暂不处理
}
}
//将数据加入到缓存数组中,这里不会挂起emit所在的协程
enqueueLocked(value)
bufferSize++ //缓存数组长度
// 上面的缓存溢出策略,丢弃最老数据是没做处理的,实际上延迟在这里处理
if (bufferSize > bufferCapacity) dropOldestLocked()
// 如果replayCache中数据的数量超过了最大容量
if (replaySize > replay) {
// 更新replayIndex的值,replayIndex向前移动一位
updateBufferLocked(replayIndex + 1, minCollectorIndex, bufferEndIndex, queueEndIndex)
}
return true
}
有收集者,但没有配置任何缓存
runBlocking {
//默认参数情况,先emit,再collect收不到数据
val sharedFlow = MutableSharedFlow<Int>(replay = 0, extraBufferCapacity = 0)
launch {
sharedFlow.collect {
println("collect: $it")
}
}
delay(200) //确保已经订阅
sharedFlow.emit(1)
sharedFlow.emit(2)
sharedFlow.emit(3)
}
上面demo,是能收到1,2,3的。
从源码可以看出,tryEmitLocked
方法中,有订阅者的情况,即使没有配置缓存也会执行enqueueLocked(value)方法把数据加入到缓存数组。
有订阅者,且配置了replay或者extraBufferCapacity缓存,会多了一个缓存溢出策略。有3种策略,挂起协程,丢弃最老的数据,丢弃最新的数据。
emitSuspend挂起方式发数据
private suspend fun emitSuspend(value: T) = suspendCancellableCoroutine<Unit> sc@{ cont ->
var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
val emitter = synchronized(this) lock@{
//再次检查,确保缓存区满了,因为满了才会执行下面的逻辑
if (tryEmitLocked(value)) {
cont.resume(Unit)
resumes = findSlotsToResumeLocked(resumes)
return@lock null
}
// 创建Emitter,加入到buffer里
//可以去看下前面的那张图,Emitter是加到缓存区的什么位置的
Emitter(this, head + totalSize, value, cont).also {
//加到之前的缓存数组
enqueueLocked(it)
queueSize++ // Emitter是挂起的,单独记录下数量
// 如果buffered value缓存没有数据,则收集已经挂起的订阅者的续体,保存到局部变量resumes中
if (bufferCapacity == 0) resumes = findSlotsToResumeLocked(resumes)
}
}
// outside of the lock: register dispose on cancellation
emitter?.let { cont.disposeOnCancellation(it) }
// 恢复挂起的订阅者
for (r in resumes) r?.resume(Unit)
}
这个方法比较简单,缓存区满了,创建Emitter加到buffer缓存区。
小结
阅读emit发射数据的流程,可以分2部分完成,不挂起发射和挂起发射。
如果没有收集者,emit永远不会挂起。
如果有收集者,并且buffered values缓存容量已满并且最旧的数据没有被消费,则emit有机会被挂起,当然这取决于你的溢出策略。
collect消费数据
override suspend fun collect(collector: FlowCollector<T>): Nothing {
//分配槽位
val slot = allocateSlot()
try {
if (collector is SubscribedFlowCollector) collector.onSubscription()
val collectorJob = currentCoroutineContext()[Job]
while (true) {
var newValue: Any?
while (true) {
//尝试获取值,获取到了就跳出循环,获取不到就挂起等待
newValue = tryTakeValue(slot) // attempt no-suspend fast path first
if (newValue !== NO_VALUE) break
awaitValue(slot) // await signal that the new value is available
}
//判断订阅者所在协程是否是存活
collectorJob?.ensureActive()
//回调到collect方法的lambda
collector.emit(newValue as T)
}
} finally {
freeSlot(slot)
}
}
关于slot的理解,可以看下图
slot.png
- 消费者开始collect,根据index找到buffer下标为0的元素即为可以消费的元素;
- 拿到0号数据后,slot.index=1,找到buffer下标为1的元素
index++,重复上诉步骤
private fun tryTakeValue(slot: SharedFlowSlot): Any? {
var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
val value = synchronized(this) {
// 从slot中获取index
// index表示当前应该从缓存数组的index位置中获取数据
val index = tryPeekLocked(slot)
if (index < 0) {
//没有数据,返回空数据的标识
NO_VALUE
} else {
val oldIndex = slot.index
//从缓存数组buffer中获取index对应的数据
val newValue = getPeekedValueLockedAt(index)
//slot索引加1,表示获取下个数据的位置
slot.index = index + 1 // points to the next index after peeked one
resumes = updateCollectorIndexLocked(oldIndex)
newValue
}
}
//恢复协程
for (resume in resumes) resume?.resume(Unit)
return value
}
tryPeekLocked
方法,是判断数据所在的位置是否符合要求。
private suspend fun awaitValue(slot: SharedFlowSlot): Unit = suspendCancellableCoroutine { cont ->
synchronized(this) lock@{
//再次检查index
val index = tryPeekLocked(slot) // recheck under this lock
if (index < 0) {
//保存续体cont到slot
slot.cont = cont // Ok -- suspending
} else {
//说明有值,不需要再继续挂起了,通过resume恢复协程
cont.resume(Unit) // has value, no need to suspend
return@lock
}
slot.cont = cont // suspend, waiting
}
}
suspendCancellableCoroutine
是一个挂起函数,用于创建可取消的协程。在协程中调用suspendCancellableCoroutine
函数时,它会创建一个CancellableContinuation
对象,并将其传递给一个lambda表达式。该lambda表达式中的代码可以使用suspend
关键字挂起当前协程,并在某个条件满足时或协程被取消时恢复协程。
collect小结
collect方法会构造Slot对象,然后开启死循环去不断匹配缓存区的数据。具体是根据Slot 中的index匹配缓存区buffer中的数据,如果匹配到了,执行collect闭包;匹配不到就挂起协程,挂起的协程会在有新数据时被生产者所恢复。无论是否有生产者,只要没拿到数据,collect都会被挂起。
网友评论