本文分析示例代码如下:
launch(Dispatchers.Main) {
flow {
emit(1)
emit(2)
}.collect {
delay(1000)
withContext(Dispatchers.IO) {
Log.d("liduo", "$it")
}
Log.d("liduo", "$it")
}
}
一.Flow的创建
在协程中,可以通过flow方法创建一个Flow对象,一个Flow对象代表一个冷流。其中参数block是FlowCollector的扩展方法,并且可挂起。代码入下:
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
FlowCollector是一个接口,用于收集上游的流发出的值,代码如下:
public interface FlowCollector<in T> {
// 可挂起,非线程安全
public suspend fun emit(value: T)
}
调用flow方法,会返回一个Flow接口指向的对象,代码如下:
public interface Flow<out T> {
@InternalCoroutinesApi
public suspend fun collect(collector: FlowCollector<T>)
}
这里flow方法的返回对象是一个SafeFlow类型的对象。至此Flow就创建完毕了。
二.Flow的消费
在协程中,当需要消费流时,会调用collect方法,触发流的消费,代码如下:
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})
这里的collect方法不是Flow接口定义的方法,而是Flow的扩展方法,内部创建了一个匿名的FlowCollector
对象,并且把action封装到了FlowCollector对象的emit方法中,最后将FlowCollector对象作为参数传入到了另一个collect方法,这个collect方法才是Flow接口定义的方法。
1.SafeFlow类
根据上面的分析,Flow对象最后返回的是一个SafeFlow类型的对象。因此,这里调用的另一个collect方法,就是SafeFlow类中的collect方法,代码如下:
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
SafeFlow类继承自AbstractFlow类,类中重写了collectSafely方法。因此调用的collect方法实际上是AbstractFlow类的方法。
2.AbstractFlow类
AbstractFlow类是一个抽象类,实现了Flow接口和CancellableFlow接口。实际上CancellableFlow接口继承自Flow接口,因此AbstractFlow类只重写了collect方法,代码如下:
@FlowPreview
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
// 核心方法
@InternalCoroutinesApi
public final override suspend fun collect(collector: FlowCollector<T>) {
// 创建SafeCollector对象,对collector进行包裹
val safeCollector = SafeCollector(collector, coroutineContext)
try {
// 调用collectSafely方法
collectSafely(safeCollector)
} finally {
// 释放拦截的续体
safeCollector.releaseIntercepted()
}
}
public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}
collect方法内部调用了collectSafely方法,collectSafely方法在SafeFlow中被重写。collectSafely方法中会调用flow中的block,并提供一个SafeCollector类的环境。
3. SafeCollector类
当flow方法中的代码在执行时,会调用emit方法发射数据,这时由于block执行在SafeCollector类的环境中,因此调用的emit方法是SafeCollector类的方法。
SafeCollector类实现了FlowCollector接口并且继承自ContinuationImpl类,代码如下:
internal actual class SafeCollector<T> actual constructor(
@JvmField internal actual val collector: FlowCollector<T>,
@JvmField internal actual val collectContext: CoroutineContext
) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame {
...
// 保存上下文中元素数量,用于检查上下文是否变化
@JvmField
internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 }
// 保存上一次的上下文
private var lastEmissionContext: CoroutineContext? = null
// 执行结束后的续体
private var completion: Continuation<Unit>? = null
// 协程上下文
override val context: CoroutineContext
get() = completion?.context ?: EmptyCoroutineContext
// 挂起的核心方法
override fun invokeSuspend(result: Result<Any?>): Any? {
result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) }
completion?.resumeWith(result as Result<Unit>)
return COROUTINE_SUSPENDED
}
// 释放拦截的续体
public actual override fun releaseIntercepted() {
super.releaseIntercepted()
}
// 发射数据
override suspend fun emit(value: T) {
// 获取当前suspend方法续体
return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
try {
// 调用重载的方法
emit(uCont, value)
} catch (e: Throwable) {
// 出现异常时,将异常封装成上下文,保存到lastEmissionContext
lastEmissionContext = DownstreamExceptionElement(e)
// 抛出异常
throw e
}
}
}
// 重载的emit方法
private fun emit(uCont: Continuation<Unit>, value: T): Any? {
// 从续体中获取上下文
val currentContext = uCont.context
// 保证当前协程的Job是active的
currentContext.ensureActive()
// 获取上次的上下文
val previousContext = lastEmissionContext
// 如果前后上下文发生变化
if (previousContext !== currentContext) {
// 检查上下文是否发生异常
checkContext(currentContext, previousContext, value)
}
// 保存续体
completion = uCont
// 调用emitFun方法,传入collector,value,continuation
return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
}
// 检查上下文变化,防止并发
private fun checkContext(
currentContext: CoroutineContext,
previousContext: CoroutineContext?,
value: T
) {
// 如果上次执行过程中发生了异常
if (previousContext is DownstreamExceptionElement) {
// 抛出异常
exceptionTransparencyViolated(previousContext, value)
}
// 检查上下文是否发生变化,如果变化,则抛出异常
checkContext(currentContext)
lastEmissionContext = currentContext
}
// 用于抛出异常
private fun exceptionTransparencyViolated(exception: DownstreamExceptionElement, value: Any?) {
error("""
Flow exception transparency is violated:
Previous 'emit' call has thrown exception ${exception.e}, but then emission attempt of value '$value' has been detected.
Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
For a more detailed explanation, please refer to Flow documentation.
""".trimIndent())
}
}
emit方法最终会调用emitFun方法方法,代码如下:
private val emitFun =
FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>
emitFun是一个lambda表达式,它将只有一个参数的emit方法转换成三个参数的方法。emitFun方法在编译时会被编译器处理,反编译后的代码逻辑大致如下:
@Nullable
public final Object invoke(@NotNull FlowCollector p1, @Nullable Object p2, @NotNull Continuation continuation) {
InlineMarker.mark(0);
// 核心执行
Object var10000 = p1.emit(p2, continuation);
InlineMarker.mark(2);
InlineMarker.mark(1);
return var10000;
}
可以看到,emitFun方法内部会调用FlowCollector类对象的emit方法,同时传入value和continuation作为参数。
而这个FlowCollector类对象就是一开始的collect方法封装的匿名类对象,代码如下:
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})
调用它的emit方法,会直接调用action的invoke方法,并传入发射的数据,流在这里被最终消费。
通过上面的分析,可以知道消费的过程是在emit方法中被调用的,如果在消费的过程,没有发生挂起,那么emit方法执行完毕后,会继续执行flow方法里剩下的代码,而如果在消费的过程中发生了挂起,情况会稍有不同。
4.消费过程中的挂起
如果消费过程中发生挂起,那么emit方法会返回一个COROUTINE_SUSPENDED
对象,suspendCoroutineUninterceptedOrReturn
方法在收到COROUTINE_SUSPENDED对象后,会挂起当前协程。代码如下:
override suspend fun emit(value: T) {
// 获取当前suspend方法续体
return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
try {
// 调用重载的方法
emit(uCont, value)
} catch (e: Throwable) {
// 出现异常时,将异常封装成上下文,保存到lastEmissionContext
lastEmissionContext = DownstreamExceptionElement(e)
// 抛出异常
throw e
}
}
}
当消费过程执行完毕时,会通过传入的续体唤起外部协程恢复挂起状态。根据emitFun
可以知道,这里传入的续体为this,也就是当前的SafeCollector
类对象,代码如下:
emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
恢复挂起需要调用续体的resumeWith方法,上面提到SafeCollector类继承自ContinuationImpl类,SafeCollector类中没有重写resumeWith方法,而ContinuationImpl类中也没有重写resumeWith方法,因此实际调用的是ContinuationImpl类的父类BaseContinuationImpl类的resumeWith方法。如下图所示:
在Kotlin协程:创建、启动、挂起、恢复中提到过,调用BaseContinuationImpl类的resumeWith方法,内部会调用invokeSuspend方法,而SafeCollector类重写了invokeSuspend方法,代码如下:
override fun invokeSuspend(result: Result<Any?>): Any? {
// 尝试获取异常
result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) }
// 如果没有异常,则恢复flow方法续体的执行
completion?.resumeWith(result as Result<Unit>)
// 返回挂起标识,这里挂起的是消费过程
return COROUTINE_SUSPENDED
}
在invokeSuspend方法中,会调用resumeWith方法恢复生产过程——flow方法的执行,同时挂起消费过程的执行。全部过程如下图所示:
网友评论