美文网首页Kotlin-CoroutinesAndroid架构
抽丝剥茧Kotlin - 协程中绕不过的Flow

抽丝剥茧Kotlin - 协程中绕不过的Flow

作者: 九心_ | 来源:发表于2021-01-07 08:02 被阅读0次

    前言

    距离上一篇协程的文章发布已经四个月了,不出意外收到了读者大大们的催更:”都四个月了哈,你的协程三部曲咋还没有更完?“

    让我看看

    我一开始不打算更新的,但是谷歌爸爸一直在推 Kotlin,Android Jetpack 中也一直使用 Kotlin,最近的Paging 3 中也加入了协程,并使用了Flow,大有势不可挡之势,看来协程中的 Flow 是绕不过去啦~

    在进行 Flow 分析之前,你应该对协程的原理有一个初步的了解,不然,你可能不知道我们讨论的是什么。

    如果你还不了解协程的原理,又或者是还没使用过协程,可以看一下我之前的协程两部曲:

    《即学即用Kotlin - 协程》
    《抽丝剥茧Kotlin - 协程基础篇》
    《抽丝剥茧Kotlin - 协程中绕不过的Flow》

    老规矩,带着问题是去学习源码的最好方式,我想了解的是:

    • 问题一:Flow为什么是一个冷流?
    • 问题二:Flow流程是什么样的?
    • 问题三:Flow如何切线程?

    目录

    目录

    一、Flow流程是什么样的?

    在介绍协程的那篇文章的时候,我们了解到,Flow 的作用也就是数据发送,上游发送,下游消费。那它跟普通的数据发送有什么区别呢?

    在我看来,跟 RxJava 一样,一是切线程,而是数据转化。

    最爽的当然是切线程,因为 Flow 必须发生在协程中,所以协程可以帮我们指定 Flow 消费的线程,那数据生产的线程呢?别急,我们可以通过 Flow 的扩展方法 flowOn 实现。

    了解了这些,我们抛出一段简单的代码,使用场景是在 Fragment 中:

    Flow启动

    我们先不着急看整个流程,分别点击 flow<Int>().colletct() 两个方法中看看:

    // flow{} 方法
    public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
    
    // colletct 方法
    public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
        collect(object : FlowCollector<T> {
            override suspend fun emit(value: T) = action(value)
        })
    

    可以看到,上面的两个方法的参数都是一个 suspend 的闭包,也没有加入新的 CoroutineContext,那我们是不是就可以理解为上述代码仅仅在协程原有的基础上做了一些事呢?

    CoroutineContext 在分析协程原理的那篇文章中就重复分析过了,本篇就不重复分析了。

    1. flow{}中发生了什么

    flow {} 方法中发生了什么,这个我们要从 flow()方法说起。

    1.1 创建SafeFlow

    点进上述的方法:

    public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
    
    // Named anonymous object
    private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
        override suspend fun collectSafely(collector: FlowCollector<T>) {
            collector.block()
        }
    }
    

    可以看到,SafeFlow 继承自 AbstractFlow,并实现了其 AbstractFlow#collectSafely 方法,从名字上看,Flow 应该做了一层安全上的校验。

    1.2 AbstractFlow做了什么

    在谈 AbstractFlow 是什么之前,我们似乎要先介绍一下 Flow,它才是我们的主角。

    Flow 接口足够简单,它只定义了一个功能,就是找到数据的接收方:

    public interface Flow<out T> {
        @InternalCoroutinesApi
        public suspend fun collect(collector: FlowCollector<T>)
    }
    
    public interface FlowCollector<in T> {
        /**
         * Collects the value emitted by the upstream.
         * This method is not thread-safe and should not be invoked concurrently.
         */
        public suspend fun emit(value: T)
    }
    

    数据的接收方就是FlowCollector,它的接口定义也同样比较简单,只负责接受数据的FlowCollector#emit,它也是数据发射的入口。

    了解了 Flow,现在看看 AbstractFlow 到底实现了什么?

    public abstract class AbstractFlow<T> : Flow<T> {
    
        @InternalCoroutinesApi
        public final override suspend fun collect(collector: FlowCollector<T>) {
            // 1. collector 做一层包装
            val safeCollector = SafeCollector(collector, coroutineContext)
            try {
                // 2. 处理数据接收者
                collectSafely(safeCollector)
            } finally {
                // 3. 释放协程相关的参数
                safeCollector.releaseIntercepted()
            }
        }
    
        // collectSafely 方法应当遵循以下的约束
        // 1. 不应当在collectSafely方法里面切换线程,比如 withContext(Dispatchers.IO)
        // 2. collectSafely 默认不是线程安全的
        public abstract suspend fun collectSafely(collector: FlowCollector<T>)
    }
    
    private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
        override suspend fun collectSafely(collector: FlowCollector<T>) {
            collector.block()
        }
    }
    

    发现其主要做了三件事:

    1. 对数据接收方 FlowCollector 做了一层包装,也就是这个 SafeCollector,我们现在不用管它。
    2. 调用它里面的抽象方法 AbstractFlow#collectSafely 方法。
    3. 释放协程的一些信息。

    结合以下之前看的 SafeFlow,它实现了 AbstractFlow#collectSafely 方法,调用了 flow{} 的方法块,也就是闭包参数。现在有一点就很清晰了,为什么 Flow 是冷流,因为它会在每一次 collect 的时候才会去触发发送数据的动作。

    1.3 SafeCollector做了哪些包装?

    SafeCollector 从它的名字可以看出,它是一个安全的数据接收者,它不仅实现了 FlowCollector 接口,还继承了 ContinuationImpl 这个续体抽象类,你可能会有这样的一个疑问:

    实现 FlowCollector 接口可以理解,因为要处理接收数据,但是为啥要实现 ContinuationImpl?从官方的解释来看:

    /*
     * Implementor of ContinuationImpl (that will be preserved as ABI nearly forever)
     * in order to properly control 'intercepted()' lifecycle.
     */
    internal actual class SafeCollector<T> actual constructor(
        @JvmField internal actual val collector: FlowCollector<T>,
        @JvmField internal actual val collectContext: CoroutineContext
    ) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext) {
        // ... 省略
    }
    
    private object NoOpContinuation : Continuation<Any?> {
        override val context: CoroutineContext = EmptyCoroutineContext
    
        override fun resumeWith(result: Result<Any?>) {
            // Nothing
        }
    }
    

    实现 ContinuationImpl 是为了控制 intercepted() 生命周期,从代码来看,SafeCollector 继承 ContinuationImpl 时,仅仅放入两个空的参数,一个是 NoOpContinuation,另一个是 EmptyCoroutineContext,那我们可以不用特别注意 ContinuationImpl 这个类。

    1.4 数据接收者的处理

    通常我们会在 flow{} 方法里面发射数据,也就是调用 FollowCollector#emit 方法,具体实现也是在 SafeCollector 中:

    internal actual class SafeCollector<T> actual constructor(
        @JvmField internal actual val collector: FlowCollector<T>,
        @JvmField internal actual val collectContext: CoroutineContext
    ) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext) {
    
        @JvmField // Note, it is non-capturing lambda, so no extra allocation during init of SafeCollector
        internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 }
        private var lastEmissionContext: CoroutineContext? = null
        private var completion: Continuation<Unit>? = null
    
        // ContinuationImpl
        override val context: CoroutineContext
            get() = completion?.context ?: EmptyCoroutineContext
    
        
        override fun invokeSuspend(result: Result<Any?>): Any? {
            //  当result失败的时候,lastEmissionContext等于错误处理的类
            result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) }
            completion?.resumeWith(result as Result<Unit>)
            return COROUTINE_SUSPENDED
        }
    
        override suspend fun emit(value: T) {
            // suspendCoroutineUninterceptedOrReturn 保证只会被调用一次
            return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
                try {
                    emit(uCont, value)
                } catch (e: Throwable) {
                    // Save the fact that exception from emit (or even check context) has been thrown
                    lastEmissionContext = DownstreamExceptionElement(e)
                    throw e
                }
            }
        }
    
        private fun emit(uCont: Continuation<Unit>, value: T): Any? {
            val currentContext = uCont.context
            // 1. 保证当前currentContext有效
            currentContext.ensureActive()
            // 2. 检查 currentContext
            val previousContext = lastEmissionContext
            if (previousContext !== currentContext) {
                checkContext(currentContext, previousContext, value)
            }
            completion = uCont
            return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
        }
    
        private fun checkContext(
            currentContext: CoroutineContext,
            previousContext: CoroutineContext?,
            value: T
        ) {
            if (previousContext is DownstreamExceptionElement) {
                // 错误处理
                exceptionTransparencyViolated(previousContext, value)
            }
            checkContext(currentContext)
            lastEmissionContext = currentContext
        }
    
        // ... 省略
    }
    
    internal class DownstreamExceptionElement(@JvmField val e: Throwable) : CoroutineContext.Element {
        companion object Key : CoroutineContext.Key<DownstreamExceptionElement>
    
        override val key: CoroutineContext.Key<*> = Key
    }
    

    这里的代码有点长,不过仔细分析后就只有一点点东西:

    1. invokeSuspend 这个方法只做了失败时候的监听,将上次的 lastEmissionContext 替换为 DownstreamExceptionElement,仅此而已。
    2. 第一个 emit(value: T) 方法代码比较少,它就使用了 suspendCoroutineUninterceptedOrReturn 包裹了我们的另外一个 emit 方法(下面介绍),这个 suspendCoroutineUninterceptedOrReturn 的作用要么就是延迟下面方法的执行,要么就是直接返回结果,然后在里面抓取异常。
    3. 下面的一个 emit(uCont: Continuation<Unit>, value: T),里面检查了续体的 CoroutineContext 的状态。
    4. 接着进入checkContext(currentContext),这是一个比较重要的方法,主要做了两件事,第一件事是判断上一个 CoroutineContext是否是 DownstreamExceptionElement,如果是的话会报出异常。第二件事是判断当前的拦截器是否发生了切换,在 Flow 内部,它是不允许你再去做切线程的操作的,限于篇幅,这部分的源码就不贴出来了,感兴趣的同学可以看一下。
    5. 在该方法的最后就直接调用 FlowCollector<Any?>#emit 方法。

    这里我们可以得出结论,SafeCollector 就是对我们的数据接收者 FlowCollector 做一层安全校验,最后还是会调用 FlowCollector#emit 方法。

    总结一下 SafeFlow 为什么安全:

    1. Flow 方法的内部代码,禁止线程切换,比如说使用不同的 Dispatchers 创建一个子协程。
    2. 异常处理机制,遇到异常会抛出来,谁来处理呢?答案就是使用 Flow 的拓展方法 Flow#catch,我们可以自己决定是否使用该方法。
    3. AbstractFlow#collect 方法中,最终会调用 safeCollector.releaseIntercepted(),如果遇到异常,也能释放那些还在运行中的子任务。

    2. collect{}如何接收

    弄清这个问题真的很简单,看这个源码即可:

    public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
        collect(object : FlowCollector<T> {
            override suspend fun emit(value: T) = action(value)
        })
    

    在里面实现了一个 FlowCollector,然后调用了我们在 collect 方法中声明的闭包。

    如果我们不用 flowOn 去切线程,那么我们的过程就十分清晰了:

    Flow流程

    3. flowOn()如何做到切换协程

    如果不涉及到切线程,Flow 看着挺简单的。下面是 Flow 切线程的代码:

    Flow切线程的代码

    我们先想一下,Flow 是发生在协程中的,flowOn 只是增加了一个 Dispatchers.IO,从之前的协程分析的文章中我们知道,它是一个 ContinuationInterceptor,可以帮助我们来切线程,这里的作用应同理。

    3.1 flowOn()

    点进 flowOn 方法:

    public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
        checkFlowContext(context)
        return when {
            context == EmptyCoroutineContext -> this
            this is FusibleFlow -> fuse(context = context)
            else -> ChannelFlowOperatorImpl(this, context = context)
        }
    }
    

    可以看出,在 flowOn 主要做了两步。

    第一步检查了参数中的的 CoroutineContext,代码我就不放了,意思就是当前的CoroutineContext 不能包括 JobJob 使用 launch 返回的对象,因为 flowOn 里面正常存放调度器 Dipatchers,不排除有些人不会使用。

    第二步根据当前的情况返回不同的 Flow,这里有三种情况:

    1. 参数中的 CoroutineContextEmptyCoroutineContext,我们都知道,CoroutineContext 像一个集合,EmptyCoroutineContext 就是空的集合。这种情况就返回自身。
    2. 第二个就是连续使用多个 flowOn 的情况,比如 flow{}.flowOn().flowOn().collect{},第二个 flowOn 就是 FusibleFlow
    3. 如果当前是第一个 flowOn,返回一个 ChannelFlowOperatorImpl

    我们先从第三种情况开始分析。

    3.2 ChannelFlowOperatorImpl

    点进 ChannelFlowOperatorImpl 源码:

    internal class ChannelFlowOperatorImpl<T>(
        flow: Flow<T>,
        context: CoroutineContext = EmptyCoroutineContext,
        capacity: Int = Channel.OPTIONAL_CHANNEL
    ) : ChannelFlowOperator<T, T>(flow, context, capacity) {
        override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> =
            ChannelFlowOperatorImpl(flow, context, capacity)
    
        override suspend fun flowCollect(collector: FlowCollector<T>) =
            flow.collect(collector)
    }
    

    这个里面没有暴露出重要的信息,我们只需要看 Flow#collect 方法即可,点击它的父类:

    internal abstract class ChannelFlowOperator<S, T>(
        @JvmField val flow: Flow<S>,
        context: CoroutineContext,
        capacity: Int
    ) : ChannelFlow<T>(context, capacity) {
        protected abstract suspend fun flowCollect(collector: FlowCollector<T>)
    
        // Changes collecting context upstream to the specified newContext, while collecting in the original context
        private suspend fun collectWithContextUndispatched(collector: FlowCollector<T>, newContext: CoroutineContext) {
            val originalContextCollector = collector.withUndispatchedContextCollector(coroutineContext)
            // invoke flowCollect(originalContextCollector) in the newContext
            return withContextUndispatched(newContext, block = { flowCollect(it) }, value = originalContextCollector)
        }
    
        // Slow path when output channel is required
        protected override suspend fun collectTo(scope: ProducerScope<T>) =
            flowCollect(SendingCollector(scope))
    
        // Optimizations for fast-path when channel creation is optional
        override suspend fun collect(collector: FlowCollector<T>) {
            // Fast-path: When channel creation is optional (flowOn/flowWith operators without buffer)
            if (capacity == Channel.OPTIONAL_CHANNEL) {
                val collectContext = coroutineContext
                val newContext = collectContext + context // compute resulting collect context
                // #1: If the resulting context happens to be the same as it was -- fallback to plain collect
                if (newContext == collectContext)
                    return flowCollect(collector)
                // #2: If we don't need to change the dispatcher we can go without channels
                if (newContext[ContinuationInterceptor] == collectContext[ContinuationInterceptor])
                    return collectWithContextUndispatched(collector, newContext)
            }
            // Slow-path: create the actual channel
            super.collect(collector)
        }
    }
    

    Flow#collect 方法出来了,在这个方法里,先判断一下 capacity 是否等于 Channel.OPTIONAL_CHANNEL,默认是这个值。接着 newContext = collectContext + context, 这会将 newContext[ContinuationInterceptor] 替换成我们新的拦截器,这是用来切换线程的。然后分为三种情况:

    1. newContext == collectContext:直接调用 flowCollect 方法,这个方法在 ChannelFlowOperatorImpl 已经实现,直接使用了 flow{} 中返回的 Flow 对象去调用 flow.collect(collector),相当于没有切线程。
    2. newContext[ContinuationInterceptor] == collectContext[ContinuationInterceptor] 即拦截器一样,这个时候最终都会调用一个叫 withContextUndispatched 方法,从名字我们就可以看出,它不会走拦截器,同样页不会切线程。
    3. 第三个走父类的 collect 逻辑,我们还需看一下父类的实现。

    3.3 ChannelFlow

    ChannelFlowOperator 的父类是 ChannelFlow,它是我们的重点:

    public abstract class ChannelFlow<T>(
        // upstream context
        @JvmField public val context: CoroutineContext,
        // buffer capacity between upstream and downstream context
        @JvmField public val capacity: Int
    ) : FusibleFlow<T> {
       // ...
        public override fun fuse(context: CoroutineContext, capacity: Int): FusibleFlow<T> {
            // ... 省略
        }
    
        public open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
            scope.produce(context, produceCapacity, start = CoroutineStart.ATOMIC, block = collectToFun)
    
        override suspend fun collect(collector: FlowCollector<T>): Unit =
            coroutineScope {
                collector.emitAll(produceImpl(this))
            }
    
        // ...
    }
    

    ChannelFlow 实现了 FusibleFlow 接口,它跟连续多个 flowOn 的处理有关,后面再分析,还是回到 Flow#collect 方法,首先,coroutineScope{} 没做什么事,可以忽略,然后重点看 collector.emitAll(produceImpl(this))

    点进 emitAll 方法:

    public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>): Unit =
        emitAllImpl(channel, consume = true)
    
    private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) {
        try {
            while (true) {
                val result = run { channel.receiveOrClosed() }
                if (result.isClosed) {
                    result.closeCause?.let { throw it }
                    break // returns normally when result.closeCause == null
                }
                emit(result.value)
            }
        } catch (e: Throwable) {
            cause = e
            throw e
        } finally {
            if (consume) channel.cancelConsumed(cause)
        }
    }
    

    代码逻辑很清晰,先开一个无限循环,然后使用 Flow 中的 Channel 去接收数据,只有在接收到关闭的命令才会退出。结合协程的知识,我们知道 Channel 可以用来两个协程之间传送数据,这里是不是用来这么做的呢?

    接收数据的部分看完了,现在看一下生产数据的部分,produceImpl 方法在 ChannelFlow 已经给出了,它里面调用了 CoroutineScope 一个扩展方法:

    public fun <E> CoroutineScope.produce(
        context: CoroutineContext = EmptyCoroutineContext,
        capacity: Int = 0,
        start: CoroutineStart = CoroutineStart.DEFAULT,
        onCompletion: CompletionHandler? = null,
        @BuilderInference block: suspend ProducerScope<E>.() -> Unit
    ): ReceiveChannel<E> {
        val channel = Channel<E>(capacity)
        val newContext = newCoroutineContext(context)
        val coroutine = ProducerCoroutine(newContext, channel)
        if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
        coroutine.start(start, coroutine, block)
        return coroutine
    }
    

    coroutine.start(start, coroutine, block) 是不是看着有点熟悉,没错,我们在使用 launch 启动协程的时候也会使用该方法,这里就是启动了一个子协程。原理可以看上篇文章。

    下面找到 Channel 发射数据的地方就行了。

    这个部分仍然有点长:

    public abstract class ChannelFlow<T>(
        // upstream context
        @JvmField public val context: CoroutineContext,
        // buffer capacity between upstream and downstream context
        @JvmField public val capacity: Int
    ) : FusibleFlow<T> {
    
        // shared code to create a suspend lambda from collectTo function in one place
        internal val collectToFun: suspend (ProducerScope<T>) -> Unit
            get() = { collectTo(it) }
    
        // ...
    
        protected abstract suspend fun collectTo(scope: ProducerScope<T>)
    
        public open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
            scope.produce(context, produceCapacity, start = CoroutineStart.ATOMIC, block = collectToFun)
    
        override suspend fun collect(collector: FlowCollector<T>): Unit =
            coroutineScope {
                collector.emitAll(produceImpl(this))
            }
    }
    
    internal abstract class ChannelFlowOperator<S, T>(
        @JvmField val flow: Flow<S>,
        context: CoroutineContext,
        capacity: Int
    ) : ChannelFlow<T>(context, capacity) {
        protected abstract suspend fun flowCollect(collector: FlowCollector<T>)
    
        // Slow path when output channel is required
        protected override suspend fun collectTo(scope: ProducerScope<T>) =
            flowCollect(SendingCollector(scope))
    
        // ...
    }
    
    internal class ChannelFlowOperatorImpl<T>(
        flow: Flow<T>,
        context: CoroutineContext = EmptyCoroutineContext,
        capacity: Int = Channel.OPTIONAL_CHANNEL
    ) : ChannelFlowOperator<T, T>(flow, context, capacity) {
        //...
        override suspend fun flowCollect(collector: FlowCollector<T>) =
            flow.collect(collector)
    }
    
    public class SendingCollector<T>(
        private val channel: SendChannel<T>
    ) : FlowCollector<T> {
        override suspend fun emit(value: T): Unit = channel.send(value)
    }
    

    这个过程有点绕,先看一下关系,ChannelFlowOperator 继承了 ChannelFlowChannelFlowOperatorImpl 继承了 ChannelFlowOperator,有两点需要说明一下:

    1. ChannelFlowcollect 的方法调用顺序是这样的:ChannelFlow#collect > ChannelFlow#produceImpl > ChannelFlow#collectTo抽象 > ChannelFlowOperator#collectTo > ChannelFlowOperator#flowCollect抽象 > ChannelFlowOperatorImpl#flowCollect,最后一个方法中的内容为 flow.collect(collector),这个大家应该很熟悉了。
    2. ChannelFlowOperator 中,我们使用了 SendingCollector 进行了一层包装,充当我们的数据的接收者,这个 SendingCollector 实现了 FlowCollector#emit 方法,方法内容就是我们想要的 channel.send(value),接收到数据以后就使用 Channel 发射数据。

    理解了 flowOn,我们更新一下流程图:

    Flow流程图

    3.4 多个flowOn的复用

    再来一个栗子:

    栗子

    就像上面注释的一样,代码块会在 Dispatchers.IO 的调度器中执行,原理也很简单:

    public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
        checkFlowContext(context)
        return when {
            context == EmptyCoroutineContext -> this
            this is FusibleFlow -> fuse(context = context)
            else -> ChannelFlowOperatorImpl(this, context = context)
        }
    }
    
    public abstract class ChannelFlow<T>(
        // upstream context
        @JvmField public val context: CoroutineContext,
        // buffer capacity between upstream and downstream context
        @JvmField public val capacity: Int
    ) : FusibleFlow<T> {
        //...
        public override fun fuse(context: CoroutineContext, capacity: Int): FusibleFlow<T> {
            // note: previous upstream context (specified before) takes precedence
            val newContext = context + this.context
            val newCapacity = when {
                this.capacity == Channel.OPTIONAL_CHANNEL -> capacity
                capacity == Channel.OPTIONAL_CHANNEL -> this.capacity
                this.capacity == Channel.BUFFERED -> capacity
                capacity == Channel.BUFFERED -> this.capacity
                this.capacity == Channel.CONFLATED -> Channel.CONFLATED
                capacity == Channel.CONFLATED -> Channel.CONFLATED
                else -> {
                    // sanity checks
                    assert { this.capacity >= 0 }
                    assert { capacity >= 0 }
                    // combine capacities clamping to UNLIMITED on overflow
                    val sum = this.capacity + capacity
                    if (sum >= 0) sum else Channel.UNLIMITED // unlimited on int overflow
                }
            }
            if (newContext == this.context && newCapacity == this.capacity) return this
            return create(newContext, newCapacity)
        }
    
        //...
    }
    

    通过上面 flowOn 的分析,我们得知,第一个 flowOn 返回一个 ChannelFlowOperatorImpl,又因为它父类的父类是 ChannelFlow,它实现了 FusibleFlow 接口,所以在执行第二个 flowOn 方法的时候,this is FusibleFlow 的条件满足,接下拉就会执行自身的 FusibleFlow#fuse 方法。

    在这个方法里,我们需要注意的是第一行代码:

    val newContext = context + this.context
    

    如果你不了解 CoroutineContext 的结构根部不能看出问题,context 对应着 Dispatchers.Defaultthis.context 对应着 Dispatchers.IO,它们两个本质上都是拦截器,所以即使它们两个加起来,context[ContinuationInterceptor] 取拦截器的时候只能取一个,后面的会把前面的覆盖掉,巧就巧在它把旧的 context 放在后面了,所以这个 newContext == this.context 条件就会成立。

    这个条件成立的结果就是该方法的倒数第二行,直接将自身返回回去。所以第二个 flowOn 的作用就被忽略了~

    总结

    学习 Flow 源码的时候你就可以发现,Flow 的原理是跟协程挂钩的,因为我学习 Flow 原理的时候,又把协程的原理翻了一遍。

    苦涩

    这应该是网上第一篇分析 Flow 原理的,不过除了 Android Jetpack 以外,使用 Flow 的地方确实很少。在后面的文章中,我可能会分析 Paging 3 如何结合 Flow 使用的,这也是我想做的。

    本片文章到此就结束啦,如果你有更好的想法,欢迎评论区交流,如果觉得本文不错,点赞是对博主最大的鼓励。

    转眼一年又开始了,感觉去年很多事都没做好,新的一年加油啦~

    我是九心,欢迎关注我的公众号 九心说 接收我的最新文章,或者添加我的微信 JiuXinDev,拉你进群,与我在 Android 之路上一同进阶。

    相关文章

      网友评论

        本文标题:抽丝剥茧Kotlin - 协程中绕不过的Flow

        本文链接:https://www.haomeiwen.com/subject/lwuunktx.html