抽丝剥茧Kotlin - 协程

作者: 九心_ | 来源:发表于2020-08-18 12:08 被阅读0次

    前言

    文章接上篇,这一篇我们好好聊一聊协程的原理,通过上一篇的学习,相信大家对于如何使用协程已经非常熟悉了。

    故事还得从上次的协程分享开始,由于大家对协程的实践并不多,所以大家对下面的这段代码如何执行争论不休:

    GlobalScope.launch {
        val a = async {
            1+2
        }
    
        val b = async {
            1+3
        }
    
        val c = a + b
        Log.e(TAG,"result:$c")
    }
    

    有人说,a 和 b 会串行执行,有人说,a 和 b 会并行执行,那么执行的结果到底是什么样的?我们将在下面的文章给出。

    悲伤的故事

    本个系列文章分为三篇,本文是第二篇:

    《即学即用Kotlin - 协程》
    《抽丝剥茧Kotlin - 协程基础篇》
    《抽丝剥茧Kotlin - 协程Flow篇》

    一、结构简要介绍

    首先,我们得明确协程中有哪些东西,如果你会使用协程,那你肯定知道协程中有 CoroutineScopeCoroutineContextCoroutineDispatcher,这些都是使用过程中我们可以接触到的 API。

    我简单的整理了协程中主要的基础类:

    协程的类图

    协程的类结构可分为三部分:CoroutineScopeCoroutineContextContinuation

    1. Continuation

    如果你会使用协程,那你肯定知道,协程遇到耗时 suspend 操作可以挂起,等到任务结束的时候,协程会自动切回来。

    它的奥秘就是 ContinuationContinuation 可以理解程续体,你可以理解其每次在协程挂起点将剩余的代码包括起来,等到结束以后执行剩余的内容。一个协程的代码块可能会被切割成若干个 Continuation,在每个需要挂起的地方都会分配一个 Continuation

    先抛出一些结论,协程在做耗时操作的时候,如果执行了耗时 suspend 操作,会自动挂起,但是这个耗时操作终究是要做的,只不过切换到其他线程去做了,做完以后协程就需要切回来,但是切到哪儿呢?这便是 Continuation 需要解决的问题。

    Continuation 的流程是这样的:

    Continuation流程

    无论是使用 launch 还是 async 启动的协程,都会有一个结束的时候用来回调的 continuation

    2. CoroutineScope

    关于 CoroutineScope 没有特别多要说的,它持有了 CoroutineContext,主要对协程的生命周期进行管理。

    3. CoroutineContext

    一开始看 CoroutineContext 觉得特别晕,不明白为啥要这么设计,看了 Bennyhuo 大佬的文章以后才稍微好转。

    从上面协程的类的机构中可以看出,光看这个 CoroutineContext 这个接口(源码内容我们下面讲),会发现它有点像 List 集合,而继承自 CoroutineContext 接口的 Element 接口则定义了其中的元素。

    随后,这个 Element 接口被划分成了两种类,JobContinuationInterceptor

    • Job:从字面上来讲,它代表一个任务,Thread 也是执行任务,所以我们可以理解它定义了协程的一些东西,比如协程的状态,协程和子协程的管理方式等等。
    • ContinuationInterceptor:也从字面上来看,它是 Continuation 的拦截器,通过拦截 Continuation,完成我们想要完成的工作,比如说线程的切换。

    二、结构源码分析

    上面我们从概念上介绍了协程的三大件,在这部分,我们从源码分析。

    1. Continuation

    suspend 修饰的方法会在在编译期间被编译器做特殊处理,这种处理被成为CPS(续体转换风格) 转化suspend 方法会被包裹成 Continuation

    说了这么久的 Continuation,我们还没有见过接口代码,由于接口内容不多,我就把所有的内容贴出来了:

    /**
     * Interface representing a continuation after a suspension point that returns a value of type `T`.
     */
    @SinceKotlin("1.3")
    public interface Continuation<in T> {
        /**
         * The context of the coroutine that corresponds to this continuation.
         */
        public val context: CoroutineContext
    
        /**
         * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
         * return value of the last suspension point.
         */
        public fun resumeWith(result: Result<T>)
    }
    

    我们重点关注Continuation#resumeWith()方法,从注释来看,通过返回 suspend 挂起点的值来恢复协程的执行,协程可以从参数 Result<T>) 获取成功的值或者失败的结果,如果没有结果,那么 Result<T> 的泛型是 UnitResulut 这个类也特别简单,感兴趣的同学可以查看源码。

    BaseContinuationImpl 实现了 Continuation 接口,我们看一下 Continuation#resumeWith 方法的实现:

    internal abstract class BaseContinuationImpl(
        // 完成后调用的 Continuation
        public val completion: Continuation<Any?>?
    ) : Continuation<Any?>, CoroutineStackFrame, Serializable {
        
        public final override fun resumeWith(result: Result<Any?>) {
            // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
            var current = this
            var param = result
            while (true) {
                probeCoroutineResumed(current)
                with(current) {
                    val completion = completion!! // fail fast when trying to resume continuation without completion
                    val outcome: Result<Any?> =
                        try {
                            // 1. 执行 suspend 中的代码块
                            val outcome = invokeSuspend(param)
                            // 2. 如果代码挂起就提前返回
                            if (outcome === COROUTINE_SUSPENDED) return
                            // 3. 返回结果
                            Result.success(outcome)
                        } catch (exception: Throwable) {
                            // 3. 返回失败结果
                            Result.failure(exception)
                        }
                    releaseIntercepted() // this state machine instance is terminating
                    if (completion is BaseContinuationImpl) {
                        // 4. 如果 completion 中还有子 completion,递归
                        current = completion
                        param = outcome
                    } else {
                        // 5. 结果通知
                        completion.resumeWith(outcome)
                        return
                    }
                }
            }
        }
    }
    

    主要的过程我在注释中已经标注出来了,我来解释一下 Continuation 的机制。

    每个 suspend 方法生成的 BaseContinuationImpl,其构造方法有一个参数叫 completion,它也是一个 Continuation,它的调用时机是在 suspen 方法执行完毕的时候。我们后面称

    Continuation流程

    这个流程展示给我们的内容很直观了,简单起见,我们直接看3、4和5这一个 launch 启动流程就好,通常一个 launch 生成一个外层 Continuation一个相应的结果 Continuation,我们后面称结果 continuationcompleteContinuation 调用顺序是:

    1. 调用外层 Continuation 中的 Continuation#resumeWith() 方法。
    2. 该方法会去执行 launch 包裹的代码块,并返回一个结果。
    3. 将上述代码块执行的结果交给 completion,由它完成协程结束的通知。

    上述的过程只存在于一个 launch 并且里面没有执行其他耗时的挂起操作,对于这些情况,我们将会在下面的文章讨论。

    抛出问题一: 可以看到,在注释2,遇到耗时的 suspend,返回的结果是一个 COROUTINE_SUSPENDED,后面会直接返回,耗时操作结束的时候,我们的 completion 怎么恢复呢?

    2. CoroutineContext 和 Element

    在概要分析的时候,我们说 CoroutineContext 的结构像一个集合,是从它的接口得出结论的:

    public interface CoroutineContext {
        // get 方法,通过 key 获取
        public operator fun <E : Element> get(key: Key<E>): E?
        // 累加操作
        public fun <R> fold(initial: R, operation: (R, Element) -> R): R
        // 操作符 + , 实际的实现调用了 fold 方法
        public operator fun plus(context: CoroutineContext): CoroutineContext
        // 移除操作
        public fun minusKey(key: Key<*>): CoroutineContext
        
        // CoroutineContext 定义的 Key
        public interface Key<E : Element>
    
        // CoroutineContext 中元素的定义
        public interface Element : CoroutineContext {
            // key
            public val key: Key<*>
            //...
        }
    }
    

    从中我们可以大致看出,CoroutineContext 中可以通过 Key 来获取元素 Element,并且 Element 接口也是继承自 CoroutineContext 接口。

    除此以外,CoroutineContext 支持增加和移除操作,并且支持 + 操作符来完成增加。+ 操作符即 plus 方法是有具体实现的,感兴趣的可以自己看一下,主要涉及到了拦截器 ContinuationInterceptor 的添加。

    1.1 Job

    Job 的注释中阐述定义是这样的:

    A background job. Conceptually, a job is a cancellable thing with a life-cycle that culminates in its completion.

    从中我们可以得出:

    1. 后台任务
    2. 可取消
    3. 生命周期在完成它的时候结束

    从后台任务的角度来看,Job 听着有点像 Thread,和 Thread 一样,Job 也有各种状态,文档中对 Job 各种状态的注释(感觉大佬们的注释写的真棒~):

    Job流程

    Job 另一个值得关注的点是对子 Job 的管理,主要的规则如下:

    1. Job都会结束的时候,父 Job 才会结束
    2. Job 取消的时候,子 Job 也会取消

    上述的一些内容都可以从 Job 的接口文档中得出。那么,Job哪里来的?如果你看一下CoroutineScope#launch方法,你就会得出结论,该方法的返回类型就是 Job,我们每次调用该方法,都会创建一个 Job

    1.2 ContinuationInterceptor

    顾名思义,Continuation 拦截器,先看接口:

    interface ContinuationInterceptor : CoroutineContext.Element {
        // ContinuationInterceptor 在 CoroutineContext 中的 Key
        companion object Key : CoroutineContext.Key<ContinuationInterceptor>
        /**
         * 拦截 continuation
         */
        fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
    
        //...
    }
    

    这个接口可以提炼的就这两个信息:

    1. 拦截器的 Key,也就是说,无论你后面一个 CoroutineContext 放了多少个拦截器,KeyContinuationInterceptor 的拦截器只能有一个。
    2. 我们都知道,Continuation 在调用其 Continuation#resumeWith() 方法,会执行其 suspend 修饰的函数的代码块,如果我们提前拦截到,是不是可以做点其他事情,比如说切换线程,这也是 ContinuationInterceptor 的作用之一。

    需要说明一下,我们通过 Dispatchers 来指定协程发生的线程,Dispatchers 实现了 ContinuationInterceptor接口。

    3. CoroutineScope

    CoroutineScope 的接口很简单:

    public interface CoroutineScope {
        public val coroutineContext: CoroutineContext
    }
    

    它要求后续的实现都要提供 CoroutineContext,不过我们都知道,CoroutineContext 是协程中很重要的东西,既包括 Job,也包括调度器。

    在上面的代码中,我多次使用了 Android Jetpack 中的 Lifecycle 中协程的扩展库,好处我们获取 CoroutineScope 更加简单,无需在组件 onDestroy 的时候手动 cancel,并且它的源码超级简单,前提是你会使用 Lifecycle

    internal class LifecycleCoroutineScopeImpl(
        override val lifecycle: Lifecycle,
        override val coroutineContext: CoroutineContext
    ) : LifecycleCoroutineScope(), LifecycleEventObserver {
        // ...
    
        override fun onStateChanged(source: LifecycleOwner, event: Lifecycle.Event) {
            if (lifecycle.currentState <= Lifecycle.State.DESTROYED) {
                lifecycle.removeObserver(this)
                coroutineContext.cancel()
            }
        }
    }
    

    并且它也支持你在指定的生命周期调用协程,大家看一下接口就明白了。

    三、过程源码分析

    先上一段使用代码:

    lifecycleScope.launch(Dispatchers.Main) {
        val a = async { getResult(1, 2) }
        val b = async { getResult(3, 5) }
    
        val c = a.await() + b.await()
        Log.e(TAG, "result:$c")
    } 
    
    suspend fun getResult(a: Int, b: Int): Int {
        return withContext(Dispatchers.IO) {
            delay(1000)
            return@withContext a + b
        }
    }
    

    虽然代码很简单,但是源码还是比较复杂的,我们分步讲。

    第一步 获取 CoroutineScope

    我已经在上面说明了,我们使用的 Lifecycle 的协程拓展库,如果我们不使用拓展库,就得使用 MainScope,它们的 CoroutineContext 都是一样的:

    public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)
    
    // LifecycleCoroutineScope
    val Lifecycle.coroutineScope: LifecycleCoroutineScope
        get() {
            while (true) {
                // ...
                val newScope = LifecycleCoroutineScopeImpl(
                    this,
                    SupervisorJob() + Dispatchers.Main.immediate
                )
                // ...
                return newScope
            }
        }
    

    显而易见,MainScopeLifecycleCoroutineScope 都使用了 SupervisorJob() + Dispatchers.Main, 作为它们的 CoroutineContext

    说明一下,SupervisorJobDispatchers.Main 很重要,它们分别代表了CoroutineContext 之前提及的 JobContinuationInterceptor,后面用到的时候再分析。

    第二步 启动协程

    直接进入 CoroutineScope#launch() 方法:

    public fun CoroutineScope.launch(
        context: CoroutineContext = EmptyCoroutineContext,
        start: CoroutineStart = CoroutineStart.DEFAULT,
        block: suspend CoroutineScope.() -> Unit
    ): Job {
        val newContext = newCoroutineContext(context)
        val coroutine = if (start.isLazy)
            LazyStandaloneCoroutine(newContext, block) else
            StandaloneCoroutine(newContext, active = true)
        coroutine.start(start, coroutine, block)
        return coroutine
    }
    

    上面的方法一共有三个参数,前两个不作过多介绍,第三个参数:

    block: suspend CoroutineScope.() -> Unit)
    

    这是一个方法,是一个 lambda 参数,同时也表明了它需要被 suspend 修饰。 继续看 launch 方法,发现它主要做了两件事:

    1. 组合新的 CoroutineContext
    2. 再创建一个 Continuation

    组合新的CoroutineContext

    在第一行代码 val newContext = newCoroutineContext(context) 做了第一件事,这里的 newCoroutineContext(context) 是一个扩展方法:

    public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
        val combined = coroutineContext + context
        val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
        return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
            debug + Dispatchers.Default else debug
    }
    

    CoroutineScope 使用本身的 coroutineContext 集合,利用 + 操作符将我们在 launch 方法中提供的 coroutineContext 添加进来。

    再创建一个Continuation

    回到上一段代码,通常我们不会指定 start 参数,所以它会使用默认的 CoroutineStart.DEFAULT,最终 coroutine 会得到一个 StandaloneCoroutine

    StandaloneCoroutine 实现自 AbstractCoroutine,翻开上面的类图,你会发现,它实现了 ContinuationJobCoroutineScope 等一堆接口。需要说明一下,这个 StandaloneCoroutine 其实是我们当前 Suspend Continationcomplete

    接着会调用

    coroutine.start(start, coroutine, block)
    

    这就表明协程开始启动了。

    第三步 start

    进入到 AbstractCoroutine#start 方法:

    public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
        initParentJob()
        start(block, receiver, this)
    }
    

    跳过层层嵌套,最后到达了:

    internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
        runSafely(completion) {
            // 外面再包一层 Coroutine
            createCoroutineUnintercepted(receiver, completion)
                // 如果需要,做拦截处理
                .intercepted()
                // 调用 resumeWith 方法      
                .resumeCancellableWith(Result.success(Unit))
        }
    

    虽然这仅仅是一个函数,但是后面主要的逻辑都揭露了:

    1. 创建一个没有拦截过的 Continuation
    2. 拦截 Continuation
    3. 执行 Continuation#resumeWith 方法。

    第四步 又创建 Continuation

    我这里用了 ,因为我们在 launch 中已经创建了一个 AbstractContinuaion,不过它是一个 complete,从各个函数的行参就可以看出来。

    不过我们 suspend 修饰的外层 Continuation 还没有创建,它来了,是 SuspendLambda,它继承自 ContinuationImpl,如果你问我为什么源码中没找到具体实现,我觉得可能跟 suspend 修饰符有关,由编译器处理,但是调用栈确实是这样的:

    调用栈

    看一下 SuspendLambda 类的实现:

    internal abstract class SuspendLambda(
        public override val arity: Int,
        completion: Continuation<Any?>?
    ) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction {
        constructor(arity: Int) : this(arity, null)
        //...
    }
    

    可以看到,它的构造方法的形参就包括一个 complete

    第五步 拦截处理

    回到:

    internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
        runSafely(completion) {
            // 外面再包一层 Coroutine
            createCoroutineUnintercepted(receiver, completion)
                // 如果需要,做拦截处理
                .intercepted()
                // 调用 resumeWith 方法      
                .resumeCancellableWith(Result.success(Unit))
        }
    

    里面的拦截方法 Continuation#intercepted() 方法是一个扩展方法:

    @SinceKotlin("1.3")
    public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
        (this as? ContinuationImpl)?.intercepted() ?: this
    

    createCoroutineUnintercepted(receiver, completion) 返回的是一个 SuspendLambda,所以它肯定是一个 ContinuationImpl,看一下它的拦截方法的实现:

    internal abstract class ContinuationImpl(
        completion: Continuation<Any?>?,
        private val _context: CoroutineContext?
    ) : BaseContinuationImpl(completion) {
        constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
    
        public override val context: CoroutineContext
            get() = _context!!
    
        public fun intercepted(): Continuation<Any?> =
            intercepted
                ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                    .also { intercepted = it }
        // ...
    }
    

    ContinuationImpl#intercepted()方法中,直接利用 context 这个数据结构通过 context[ContinuationInterceptor] 获取拦截器。

    CoroutineDispatcher拦截实现

    我们都知道 ContinuationInterceptor 具有拦截作用,它的直接实现是 CoroutineDispatcher 这个抽象类,所有其他调度器都直接或者间接继承这个类,我们关注一下它的拦截方法:

    public abstract class CoroutineDispatcher :
        AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
        //...
        public abstract fun dispatch(context: CoroutineContext, block: Runnable)
        // 1.拦截的 Continuation 被包了一层 DispatchedContinuation
        public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
            DispatchedContinuation(this, continuation)
        //...
    }
    
    internal class DispatchedContinuation<in T>(
        @JvmField val dispatcher: CoroutineDispatcher,
        @JvmField val continuation: Continuation<T>
    ) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation {
        // ...
        override fun resumeWith(result: Result<T>) {
            // ...
            if (dispatcher.isDispatchNeeded(context)) {
                // 2. 后面一个参数需要提供 Runnable,父类已经实现
                dispatcher.dispatch(context, this)
            } 
            //...
        }
        // ...
    }
    
    // SchedulerTask 是一个 Runnable
    internal abstract class DispatchedTask<in T>(
        @JvmField public var resumeMode: Int
    ) : SchedulerTask() {
        // ...
        public final override fun run() {
            // ...
            try {
                //...
                withCoroutineContext(context, delegate.countOrElement) {
                    // 3. continuation 是 DispatchedContinuation 包裹的 continuation
                    continuation.resume(...)
                }
            }
            //...
        }
    }
    

    简单来说,就是对原有的 ContinuationresumeWith 操作加了一层拦截,就像这样:

    拦截流程

    加入 CoroutineDispatcher 以后,执行真正的 Continue#resumeWith() 之前,会执行 CoroutineDispatcher#dispatch() 方法,所以我们现在关注 CoroutineDispatcher#dispatch 具体实现即可。

    讲一个CoroutineDispatcher具体实现

    首先我们得明确这个 CoroutineDispatcher 来自哪里?它从 context 获取,context来自哪里?

    注意 SuspendLambdaContinuationImpl 的构造方法,SuspendLambda 中的参数没有 CoroutineContext,所以只能来自 completion 中的 CoroutineContext,而completionCoroutineContext 来自 launch 方法中来自 CoroutineScope,默认是 SupervisorJob() + Dispatchers.Main,不过只有 Dispatchers.Main 继承了 CoroutineDispatcher

    Dispatchers.Main 是一个 MainCoroutineDispatcher,Android 中对应的 MainCoroutineDispatcherHandlerContext

    internal class HandlerContext private constructor(
        private val handler: Handler,
        private val name: String?,
        private val invokeImmediately: Boolean
    ) : HandlerDispatcher(), Delay {
        public constructor(
            handler: Handler,
            name: String? = null
        ) : this(handler, name, false)
    
        //...
    
        override fun dispatch(context: CoroutineContext, block: Runnable) {
            // 利用主线程的 Handler 执行任务
            handler.post(block)
        }
    
        override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
            // 利用主线程的 Handler 延迟执行任务,将完成的 continuation 放在任务中执行
            val block = Runnable {
                with(continuation) { resumeUndispatched(Unit) }
            }
            handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
            continuation.invokeOnCancellation { handler.removeCallbacks(block) }
        }
    
        //..
    }
    

    重点来了,调度任务最后竟然交给了主线程的 Handler,其实想想也对,主线程的任务最后一般都会交给主线程的 Handler

    好奇的同学可能问了,如果不是主线程呢?不是主线程就利用的线程池:

    public open class ExperimentalCoroutineDispatcher(
        private val corePoolSize: Int,
        private val maxPoolSize: Int,
        private val idleWorkerKeepAliveNs: Long,
        private val schedulerName: String = "CoroutineScheduler"
    ) : ExecutorCoroutineDispatcher() {
        // 执行期
        override val executor: Executor
            get() = coroutineScheduler
        private var coroutineScheduler = createScheduler()
    
        override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
            try {
                coroutineScheduler.dispatch(block)
            } catch (e: RejectedExecutionException) {
                DefaultExecutor.dispatch(context, block)
            }
    }
    

    结果可以说是很清晰了,coroutineScheduler 是一个线程池,如果像了解具体的过程,同学们可以自行查看代码。

    读到这里,你可能有一点明白 CoroutineContext 为什么要设计成一种数据结构:

    1. coroutineContext[ContinuationInterceptor] 就可以直接取到当前协程的拦截器,并且一个协程只能对应一个调度器。
    2. 调度器都放在其他 coroutineContext 的前面,所以在执行协程的时候,可以做拦截处理。

    同理,我们也可以使用 coroutineContext[Job] 获取当前协程。

    第六步 resumeWith

    再次回到:

    internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
        runSafely(completion) {
            // 外面再包一层 Coroutine
            createCoroutineUnintercepted(receiver, completion)
                // 如果需要,做拦截处理
                .intercepted()
                // 调用 resumeWith 方法      
                .resumeCancellableWith(Result.success(Unit))
        }
    

    现在我们看 Continue#resumeCancellableWith() 方法,它是一个扩展方法,里面的调度逻辑是:

    1. DispatchContinuation#resumeCancellableWith
    2. CoroutineDispatcher#dispatch
    3. Continuation#resumeWith

    这里的 Continuation 就是 SuspendLambda,它继承了 BaseContinuationImpl,我们看一下它的实现方法:

    internal abstract class BaseContinuationImpl(
        public val completion: Continuation<Any?>?
    ) : Continuation<Any?>, CoroutineStackFrame, Serializable {
        // This implementation is final. This fact is used to unroll resumeWith recursion.
        public final override fun resumeWith(result: Result<Any?>) {
            // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
            var current = this
            var param = result
            while (true) {
                // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
                // can precisely track what part of suspended callstack was already resumed
                probeCoroutineResumed(current)
                with(current) {
                    val completion = completion!! // fail fast when trying to resume continuation without completion
                    val outcome: Result<Any?> =
                        try {
                            // 1. 执行 suspend 里面的代码块
                            val outcome = invokeSuspend(param)
                            // 2. 如果代码块里面执行了挂起方法,会提前返回
                            if (outcome === COROUTINE_SUSPENDED) return
                            Result.success(outcome)
                        } catch (exception: Throwable) {
                            Result.failure(exception)
                        }
                    releaseIntercepted() // this state machine instance is terminating
                    if (completion is BaseContinuationImpl) {
                        // 3. 如果完成的completion也是BaseContinuationImpl,就会进入循环
                        current = completion
                        param = outcome
                    } else {
                        // 4. 执行 completion resumeWith 方法 
                        completion.resumeWith(outcome)
                        return
                    }
                }
            }
        }
    }
    

    这边被我分为2个部分:

    • 执行 suspend 方法,并获取结果
    • 调用 complete(放在下一步讲)

    执行suspend方法

    在第一处会先执行 suspend 修饰的方法内容,在方法里面可能又会调度 suspend 方法,比如说我们的实例方法:

    lifecycleScope.launch(Dispatchers.Main) {
        val a = async { getResult(1, 2) }
        val b = async { getResult(3, 5) }
    
        val c = a.await() + b.await()
        Log.e(TAG, "result:$c")
    } 
    
    suspend fun getResult(a: Int, b: Int): Int {
        return withContext(Dispatchers.IO) {
            delay(1000)
            return@withContext a + b
        }
    }
    

    因为我们在 getResult 执行了延时操作,所以我们 launch 方法肯定执行了耗时挂起方法,所以 BaseContinuationImpl#invokeSuspend 方法会返回一个 COROUTINE_SUSPENDED,结果你也看到了,该方法会提前结束。(说明一下,我没有找到BaseContinuationImpl#invokeSuspend 方法的具体实现,我猜可能跟编译器有关)

    我猜你肯定跟我一样好奇,遇到耗时挂起会提前返回,那么耗时挂起如何对 complete 进行恢复的?

    我们看一下 delay(1000) 这个延时操作在主线程是如何处理的:

    public suspend fun delay(timeMillis: Long) {
        if (timeMillis <= 0) return // don't delay
        return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
            cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
        }
    }
    
    
    internal class HandlerContext private constructor(
        private val handler: Handler,
        private val name: String?,
        private val invokeImmediately: Boolean
    ) : HandlerDispatcher(), Delay {
        //...
    
        override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
            val block = Runnable {
                with(continuation) { resumeUndispatched(Unit) }
            }
            handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
            continuation.invokeOnCancellation { handler.removeCallbacks(block) }
        }
    
        //...
    }
    

    可以看到,将恢复任务包了一个 Runnable,交给 HandlerHandler#postDelayed() 方法了。

    第七步 complete resumeWith

    对于 complete 的处理一般会有两种。

    complete是BaseContinuationImpl

    第一种情况是我们称之为套娃,完成回调的 Continuation 它本身也有自己的完成回调 Continuation,接下来循环就对了。

    调用complete的resumeWith

    第二种情况,就是通过 complete 去完成回调,由于 completeAbstractContinuation,我们看一下它的 resumeWith

    public abstract class AbstractCoroutine<in T>(
        /**
         * The context of the parent coroutine.
         */
        @JvmField
        protected val parentContext: CoroutineContext,
        active: Boolean = true
    ) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
        // ...
        public final override fun resumeWith(result: Result<T>) {
            // 1. 获取当前协程的技术状态
            val state = makeCompletingOnce(result.toState())
            // 2. 如果当前还在等待完成,说明还有子协程没有结束
            if (state === COMPLETING_WAITING_CHILDREN) return
            // 3. 执行结束恢复的方法,默认为空
            afterResume(state)
        }
    
        // 这是父类 JobSupport 中的 makeCompletingOnce 方法
        // 为了方便查看,我复制过来
        internal fun makeCompletingOnce(proposedUpdate: Any?): Any? {
            loopOnState { state ->
                // tryMakeCompleting 的内容主要根据是否有子Job做不同处理
                val finalState = tryMakeCompleting(state, proposedUpdate)
                when {
                    finalState === COMPLETING_ALREADY ->
                        throw IllegalStateException(
                            "Job $this is already complete or completing, " +
                                    "but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull
                        )
                    finalState === COMPLETING_RETRY -> return@loopOnState
                    else -> return finalState // COMPLETING_WAITING_CHILDREN or final state
                }
            }
        }
    }
    

    这段代码的意思其实也很简单,就是协程即将完成,得先评估一下协程的技术状态,别协程还有东西在运行,就给结束了。对于一些有子协程的一些协程,会等待子协程结束的时候,才会结束当前协程。

    一个 launch 的过程大概就是这样了。大致的流程图是这样的:

    launch流程

    下面我们再谈谈 async

    四、关于async

    asynclaunch 的代码相似度很高:

    public fun <T> CoroutineScope.async(
        context: CoroutineContext = EmptyCoroutineContext,
        start: CoroutineStart = CoroutineStart.DEFAULT,
        block: suspend CoroutineScope.() -> T
    ): Deferred<T> {
        val newContext = newCoroutineContext(context)
        val coroutine = if (start.isLazy)
            LazyDeferredCoroutine(newContext, block) else
            DeferredCoroutine<T>(newContext, active = true)
        coroutine.start(start, coroutine, block)
        return coroutine
    }
    

    最终也会进行三步走:

    internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
        runSafely(completion) {
            // 外面再包一层 Coroutine
            createCoroutineUnintercepted(receiver, completion)
                // 如果需要,做拦截处理
                .intercepted()
                // 调用 resumeWith 方法      
                .resumeCancellableWith(Result.success(Unit))
        }
    

    不同的是,async 返回的是一个 Deferred<T>,我们需要调用 Deferred#await() 去获取返回结果,它的实现在 JobSupport

    private open class DeferredCoroutine<T>(
        parentContext: CoroutineContext,
        active: Boolean
    ) : AbstractCoroutine<T>(parentContext, active), Deferred<T>, SelectClause1<T> {
        // ... awaitInternal方法来自父类 JobSupport
        override suspend fun await(): T = awaitInternal() as T
        // ...
    
        // 这是 JobSupport 中的实现
        internal suspend fun awaitInternal(): Any? {
            // 循环获取结果
            while (true) { // lock-free loop on state
                val state = this.state
                // 1. 如果处于完成状态
                if (state !is Incomplete) {
                    if (state is CompletedExceptionally) { // Slow path to recover stacktrace
                        recoverAndThrow(state.cause)
                    }
                    return state.unboxState()
                }
                // 2. 除非需要重试,不然就 break
                if (startInternal(state) >= 0) break 
            }
            // 等待挂起的方法
            return awaitSuspend() // slow-path
        }
    }
    

    它的具体过程可以从我的注释看出,就不一一介绍了,感兴趣的同学可以查看源码。

    1. 本文一开始的讨论

    本文一开始的代码是错的,连编译器都过不了,尴尬~

    正确的代码应该是:

    GlobalScope.launch {
        val a = async {
            1+2
        }
    
        val b = async {
            1+3
        }
    
        val c = a.await() + bawait()
        Log.e(TAG,"result:$c")
    }
    

    如果是正确的代码,这里可能分两种情况:

    如果你放在UI线程,那肯定是串行的,这时候有人说,我在 a 里使用 delay(1000),在 b 里使用 delay(2000),得到 c 的时候就花了 2000 毫秒啊,这不是并行吗?事情并不是这样的,delay 操作使用了 Handler#postDelay 方法,一个延迟了 1000 毫秒执行,一个延迟了 2000 毫秒执行,但是主线程只有一个,所以只能是串行。

    如果是子线程,通常都是并行的,因为我们使用了线程池啊~

    总结

    写这边源码分析的时候,一些细节总是找不到,比如说 suspendLambda 的子类找不到,自己对 Kotlin 的学习有待深入。

    所以本文有些地方还值得商榷,如果你有更好的理解,欢迎下方交流。

    前段时间自己生了一点小病,每周都往医院跑,所以导致整个人都很丧,写博客的效率也很低,这篇文章写了快一个月,哎,太难了~,不过,这周开始就恢复正常。

    GO

    关于我

    我是九心,新晋互联网码农,如果想要进阶和了解更多的干货,欢迎关注我的公众号接收到的我的最新文章。

    微信二维码

    相关文章

      网友评论

        本文标题:抽丝剥茧Kotlin - 协程

        本文链接:https://www.haomeiwen.com/subject/jdwhjktx.html