美文网首页
Kotlin协程:Flow基础原理

Kotlin协程:Flow基础原理

作者: _Jun | 来源:发表于2022-09-06 11:03 被阅读0次

    本文分析示例代码如下:

    launch(Dispatchers.Main) {
        flow {
            emit(1)
            emit(2)
        }.collect {
            delay(1000)
    
            withContext(Dispatchers.IO) {
                Log.d("liduo", "$it")
            }
    
            Log.d("liduo", "$it")
        }
    }
    

    一.Flow的创建

    在协程中,可以通过flow方法创建一个Flow对象,一个Flow对象代表一个冷流。其中参数block是FlowCollector的扩展方法,并且可挂起。代码入下:

    public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
    

    FlowCollector是一个接口,用于收集上游的流发出的值,代码如下:

    public interface FlowCollector<in T> {
        // 可挂起,非线程安全
        public suspend fun emit(value: T)
    }
    

    调用flow方法,会返回一个Flow接口指向的对象,代码如下:

    public interface Flow<out T> {
    
        @InternalCoroutinesApi
        public suspend fun collect(collector: FlowCollector<T>)
    }
    

    这里flow方法的返回对象是一个SafeFlow类型的对象。至此Flow就创建完毕了。

    二.Flow的消费

    在协程中,当需要消费流时,会调用collect方法,触发流的消费,代码如下:

    public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
        collect(object : FlowCollector<T> {
            override suspend fun emit(value: T) = action(value)
        })
    

    这里的collect方法不是Flow接口定义的方法,而是Flow的扩展方法,内部创建了一个匿名的FlowCollector对象,并且把action封装到了FlowCollector对象的emit方法中,最后将FlowCollector对象作为参数传入到了另一个collect方法,这个collect方法才是Flow接口定义的方法。

    1.SafeFlow类

    根据上面的分析,Flow对象最后返回的是一个SafeFlow类型的对象。因此,这里调用的另一个collect方法,就是SafeFlow类中的collect方法,代码如下:

    private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
        override suspend fun collectSafely(collector: FlowCollector<T>) {
            collector.block()
        }
    }
    
    SafeFlow类继承自AbstractFlow类,类中重写了collectSafely方法。因此调用的collect方法实际上是AbstractFlow类的方法。
    

    2.AbstractFlow类

    AbstractFlow类是一个抽象类,实现了Flow接口和CancellableFlow接口。实际上CancellableFlow接口继承自Flow接口,因此AbstractFlow类只重写了collect方法,代码如下:

    @FlowPreview
    public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
    
         // 核心方法 
        @InternalCoroutinesApi
        public final override suspend fun collect(collector: FlowCollector<T>) {
            // 创建SafeCollector对象,对collector进行包裹
            val safeCollector = SafeCollector(collector, coroutineContext)
            try {
                // 调用collectSafely方法
                collectSafely(safeCollector)
            } finally {
                // 释放拦截的续体
                safeCollector.releaseIntercepted()
            }
        }
    
        public abstract suspend fun collectSafely(collector: FlowCollector<T>)
    }
    

    collect方法内部调用了collectSafely方法,collectSafely方法在SafeFlow中被重写。collectSafely方法中会调用flow中的block,并提供一个SafeCollector类的环境。

    3. SafeCollector类

    当flow方法中的代码在执行时,会调用emit方法发射数据,这时由于block执行在SafeCollector类的环境中,因此调用的emit方法是SafeCollector类的方法。

    SafeCollector类实现了FlowCollector接口并且继承自ContinuationImpl类,代码如下:

    internal actual class SafeCollector<T> actual constructor(
        @JvmField internal actual val collector: FlowCollector<T>,
        @JvmField internal actual val collectContext: CoroutineContext
    ) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame {
    
        ...
        // 保存上下文中元素数量,用于检查上下文是否变化
        @JvmField
        internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 }
        // 保存上一次的上下文
        private var lastEmissionContext: CoroutineContext? = null
        // 执行结束后的续体
        private var completion: Continuation<Unit>? = null
    
        // 协程上下文
        override val context: CoroutineContext
            get() = completion?.context ?: EmptyCoroutineContext
    
        // 挂起的核心方法
        override fun invokeSuspend(result: Result<Any?>): Any? {
            result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) }
            completion?.resumeWith(result as Result<Unit>)
            return COROUTINE_SUSPENDED
        }
    
        // 释放拦截的续体
        public actual override fun releaseIntercepted() {
            super.releaseIntercepted()
        }
    
        // 发射数据
        override suspend fun emit(value: T) {
            // 获取当前suspend方法续体
            return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
                try {
                    // 调用重载的方法
                    emit(uCont, value)
                } catch (e: Throwable) {
                     // 出现异常时,将异常封装成上下文,保存到lastEmissionContext
                    lastEmissionContext = DownstreamExceptionElement(e)
                    // 抛出异常
                    throw e
                }
            }
        }
    
        // 重载的emit方法
        private fun emit(uCont: Continuation<Unit>, value: T): Any? {
            // 从续体中获取上下文
            val currentContext = uCont.context
            // 保证当前协程的Job是active的
            currentContext.ensureActive()
            // 获取上次的上下文
            val previousContext = lastEmissionContext
            // 如果前后上下文发生变化
            if (previousContext !== currentContext) {
                // 检查上下文是否发生异常
                checkContext(currentContext, previousContext, value)
            }
            // 保存续体
            completion = uCont
            // 调用emitFun方法,传入collector,value,continuation
            return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
        }
    
        // 检查上下文变化,防止并发
        private fun checkContext(
            currentContext: CoroutineContext,
            previousContext: CoroutineContext?,
            value: T
        ) {
            // 如果上次执行过程中发生了异常
            if (previousContext is DownstreamExceptionElement) {
                // 抛出异常
                exceptionTransparencyViolated(previousContext, value)
            }
            // 检查上下文是否发生变化,如果变化,则抛出异常
            checkContext(currentContext)
            lastEmissionContext = currentContext
        }
    
        // 用于抛出异常
        private fun exceptionTransparencyViolated(exception: DownstreamExceptionElement, value: Any?) {
            error("""
                Flow exception transparency is violated:
                    Previous 'emit' call has thrown exception ${exception.e}, but then emission attempt of value '$value' has been detected.
                    Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
                    For a more detailed explanation, please refer to Flow documentation.
                """.trimIndent())
        }
    }
    

    emit方法最终会调用emitFun方法方法,代码如下:

    private val emitFun =
        FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>
    

    emitFun是一个lambda表达式,它将只有一个参数的emit方法转换成三个参数的方法。emitFun方法在编译时会被编译器处理,反编译后的代码逻辑大致如下:

    @Nullable
    public final Object invoke(@NotNull FlowCollector p1, @Nullable Object p2, @NotNull Continuation continuation) {
       InlineMarker.mark(0);
       // 核心执行
       Object var10000 = p1.emit(p2, continuation);
       InlineMarker.mark(2);
       InlineMarker.mark(1);
       return var10000;
    }
    

    可以看到,emitFun方法内部会调用FlowCollector类对象的emit方法,同时传入value和continuation作为参数。

    而这个FlowCollector类对象就是一开始的collect方法封装的匿名类对象,代码如下:

    public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
        collect(object : FlowCollector<T> {
            override suspend fun emit(value: T) = action(value)
        })
    

    调用它的emit方法,会直接调用action的invoke方法,并传入发射的数据,流在这里被最终消费。

    通过上面的分析,可以知道消费的过程是在emit方法中被调用的,如果在消费的过程,没有发生挂起,那么emit方法执行完毕后,会继续执行flow方法里剩下的代码,而如果在消费的过程中发生了挂起,情况会稍有不同。

    4.消费过程中的挂起

    如果消费过程中发生挂起,那么emit方法会返回一个COROUTINE_SUSPENDED对象,suspendCoroutineUninterceptedOrReturn方法在收到COROUTINE_SUSPENDED对象后,会挂起当前协程。代码如下:

    override suspend fun emit(value: T) {
        // 获取当前suspend方法续体
        return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
            try {
                // 调用重载的方法
                emit(uCont, value)
            } catch (e: Throwable) {
                // 出现异常时,将异常封装成上下文,保存到lastEmissionContext
                lastEmissionContext = DownstreamExceptionElement(e)
                // 抛出异常
                throw e
            }
        }
    }
    

    当消费过程执行完毕时,会通过传入的续体唤起外部协程恢复挂起状态。根据emitFun可以知道,这里传入的续体为this,也就是当前的SafeCollector类对象,代码如下:

    emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
    

    恢复挂起需要调用续体的resumeWith方法,上面提到SafeCollector类继承自ContinuationImpl类,SafeCollector类中没有重写resumeWith方法,而ContinuationImpl类中也没有重写resumeWith方法,因此实际调用的是ContinuationImpl类的父类BaseContinuationImpl类的resumeWith方法。如下图所示:

    Kotlin协程:创建、启动、挂起、恢复中提到过,调用BaseContinuationImpl类的resumeWith方法,内部会调用invokeSuspend方法,而SafeCollector类重写了invokeSuspend方法,代码如下:

    override fun invokeSuspend(result: Result<Any?>): Any? {
        // 尝试获取异常
        result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) }
        // 如果没有异常,则恢复flow方法续体的执行
        completion?.resumeWith(result as Result<Unit>)
        // 返回挂起标识,这里挂起的是消费过程
        return COROUTINE_SUSPENDED
    }
    

    在invokeSuspend方法中,会调用resumeWith方法恢复生产过程——flow方法的执行,同时挂起消费过程的执行。全部过程如下图所示:

    作者:李萧蝶
    链接:https://juejin.cn/post/7137647612286468132

    相关文章

      网友评论

          本文标题:Kotlin协程:Flow基础原理

          本文链接:https://www.haomeiwen.com/subject/oisinrtx.html