美文网首页Android补给站禅与计算机程序设计艺术
Kotlin协程实现原理:ContinuationInterce

Kotlin协程实现原理:ContinuationInterce

作者: 午后一小憩 | 来源:发表于2020-11-19 20:34 被阅读0次

    今天我们来聊聊Kotlin的协程Coroutine

    如果你还没有接触过协程,推荐你先阅读这篇入门级文章What? 你还不知道Kotlin Coroutine?

    如果你已经接触过协程,但对协程的原理存在疑惑,那么在阅读本篇文章之前推荐你先阅读下面的文章,这样能让你更全面更顺畅的理解这篇文章。

    Kotlin协程实现原理:Suspend&CoroutineContext

    Kotlin协程实现原理:CoroutineScope&Job

    如果你已经接触过协程,相信你都有过以下几个疑问:

    1. 协程到底是个什么东西?
    2. 协程的suspend有什么作用,工作原理是怎样的?
    3. 协程中的一些关键名称(例如:JobCoroutineDispatcherCoroutineContextCoroutineScope)它们之间到底是怎么样的关系?
    4. 协程的所谓非阻塞式挂起与恢复又是什么?
    5. 协程的内部实现原理是怎么样的?
    6. ...

    接下来的一些文章试着来分析一下这些疑问,也欢迎大家一起加入来讨论。

    ContinuationInterceptor

    看到Interceptor相信第一印象应该就是拦截器,例如在Okhttp中被广泛应用。自然在协程中ContinuationInterceptor的作用也是用来做拦截协程的。

    下面来看下它的实现。

    public interface ContinuationInterceptor : CoroutineContext.Element {
        /**
         * The key that defines *the* context interceptor.
         */
        companion object Key : CoroutineContext.Key<ContinuationInterceptor>
    
        /**
         * Returns continuation that wraps the original [continuation], thus intercepting all resumptions.
         * This function is invoked by coroutines framework when needed and the resulting continuations are
         * cached internally per each instance of the original [continuation].
         *
         * This function may simply return original [continuation] if it does not want to intercept this particular continuation.
         *
         * When the original [continuation] completes, coroutine framework invokes [releaseInterceptedContinuation]
         * with the resulting continuation if it was intercepted, that is if `interceptContinuation` had previously
         * returned a different continuation instance.
         */
        public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
    
        ...
    }
    

    只给出了关键部分,ContinuationInterceptor继承于CoroutineContext.Element,所以它也是CoroutineContext,同时提供了interceptContinuation方法,先记住这个方法后续会用到。

    大家是否还记得在Kotlin协程实现原理系列的第一篇文章中,我们分析了CoroutineContext的内部结构,当时提到了它的plus方法,就是下面这段代码

    public operator fun plus(context: CoroutineContext): CoroutineContext =
        if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
            context.fold(this) { acc, element ->
                val removed = acc.minusKey(element.key)
                if (removed === EmptyCoroutineContext) element else {
                    // make sure interceptor is always last in the context (and thus is fast to get when present)
                    val interceptor = removed[ContinuationInterceptor]
                    if (interceptor == null) CombinedContext(removed, element) else {
                        val left = removed.minusKey(ContinuationInterceptor)
                        if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                            CombinedContext(CombinedContext(left, element), interceptor)
                    }
                }
            }
    

    在这里第一次看到了ContinuationInterceptor的身影,当时核心是为了分析CoroutineContext,所以只是提了plus方法每次都会将ContinuationInterceptor添加到拼接链的尾部。

    不知道有没有老铁想过这个问题,为什么要每次新加入一个CoroutineContext都要调整ContinuationInterceptor的位置,并将它添加到尾部?

    这里其实涉及到两点。

    其中一点是由于CombinedContext的结构决定的。它有两个元素分别是leftelement。而left类似于前驱节点,它是一个前驱集合,而element只是一个纯碎的CoroutineContext,而它的get方法每次都是从element开始进行查找对应KeyCoroutineContext对象;没有匹配到才会去left集合中进行递归查找。

    所以为了加快查找ContinuationInterceptor类型的实例,才将它加入到拼接链的尾部,对应的就是element

    另一个原因是ContinuationInterceptor使用的很频繁,因为每次创建协程都会去尝试查找当前协程的CoroutineContext中是否存在ContinuationInterceptor。例如我们通过launch来看协程的启动。

    public fun CoroutineScope.launch(
        context: CoroutineContext = EmptyCoroutineContext,
        start: CoroutineStart = CoroutineStart.DEFAULT,
        block: suspend CoroutineScope.() -> Unit
    ): Job {
        val newContext = newCoroutineContext(context)
        val coroutine = if (start.isLazy)
            LazyStandaloneCoroutine(newContext, block) else
            StandaloneCoroutine(newContext, active = true)
        coroutine.start(start, coroutine, block)
        return coroutine
    }
    

    如果你使用launch的默认参数,那么此时的Coroutine就是StandaloneCoroutine,然后调用start方法启动协程。

    public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
        initParentJob()
        start(block, receiver, this)
    }
    

    start中进入了CoroutineStart,对应的就是下面这段代码

    public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>) =
        when (this) {
            CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion)
            CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
            CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
            CoroutineStart.LAZY -> Unit // will start lazily
        }
    

    因为我们使用的是默认参数,所以这里对应的就是CoroutineStart.DEFAULT,最终来到block.startCoroutineCancellable

    internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
        runSafely(completion) {
            createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellable(Unit)
        }
    

    在这里我们终于看到了intercepted

    首先通过createCoroutineUnintercepted来创建一个协程(内部具体如何创建的这篇文章先不说,后续文章会单独分析),然后再调用了intercepted方法进行拦截操作,最后再resumeCancellable,这个方法最终调用的就是ContinuationresumeWith方法,即启动协程。

    所以每次启动协程都会自动回调一次resumeWith方法。

    今天的主题是ContinuationInterceptor所以我们直接看intercepted

    public expect fun <T> Continuation<T>.intercepted(): Continuation<T>
    

    发现它是一个expect方法,它会根据不同平台实现不同的逻辑。因为我们是Android所以直接看Android上的actual的实现

    public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
        (this as? ContinuationImpl)?.intercepted() ?: this
    

    最终来到ContinuationImplintercepted方法

    public fun intercepted(): Continuation<Any?> =
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }
    

    在这里看到了熟悉的context,获取到ContinuationInterceptor实例,并且调用它的interceptContinuation方法返回一个处理过的Continuation

    多次调用intercepted,对应的interceptContinuation只会调用一次。

    所以ContinuationInterceptor的拦截是通过interceptContinuation方法进行的。既然已经明白了它的拦截方式,我们自己来手动写一个拦截器来验证一下。

    val interceptor = object : ContinuationInterceptor {
     
        override val key: CoroutineContext.Key<*> = ContinuationInterceptor
     
        override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {
            println("intercept todo something. change run to thread")
            return object : Continuation<T> by continuation {
                override fun resumeWith(result: Result<T>) {
                    println("create new thread")
                    thread {
                        continuation.resumeWith(result)
                    }
                }
            }
        }
    
    }
     
    println(Thread.currentThread().name)
     
    lifecycleScope.launch(interceptor) {
        println("launch start. current thread: ${Thread.currentThread().name}")
        
        withContext(Dispatchers.Main) {
            println("new continuation todo something in the main thread. current thread: ${Thread.currentThread().name}")
        }
        
        launch {
            println("new continuation todo something. current thread: ${Thread.currentThread().name}")
        }
        
        println("launch end. current thread: ${Thread.currentThread().name}")
    }
    

    这里简单实现了一个ContinuationInterceptor,如果拦截成功就会输出interceptContinuation中对应的语句。下面是程序运行后的输出日志。

    main
    // 第一次launch
    intercept todo something. change run to thread
    create new thread
    launch start. current thread: Thread-2
    new continuation todo something in the main thread. current thread: main
    create new thread
    // 第二次launch
    intercept todo something. change run to thread
    create new thread
    launch end. current thread: Thread-7
    new continuation todo something. current thread: Thread-8
    

    分析一下上面的日志,首先程序运行在main线程,通过lifecycleScope.launch启动协程并将我们自定义的intercetpor加入到CoroutineContext中;然后在启动的过程中发现我们自定义的interceptor拦截成功了,同时将原本在main线程运行的程序切换到了新的thread线程。同时第二次launch的时候也拦截成功。

    到这里就已经可以证明我们上面对ContinuationInterceptor理解是正确的,它可以在协程启动的时候进行拦截操作。

    下面我们继续看日志,发现withContext并没有拦截成功,这是为什么呢?注意看Dispatchers.Main。这也是接下来需要分析的内容。

    另外还有一点,如果细心的老铁就会发现,launch startlaunch end所处的线程不一样,这是因为在withContext结束之后,它内部还会进行一次线程恢复,将自身所处的main线程切换到之前的线程,但为什么又与之前launch start的线程不同呢?

    大家不要忘了,协程每一个挂起后的恢复都是通过回调resumeWith进行的,然而外部launch协程我们进行了拦截,在它返回的ContinuationresumeWith回调中总是会创建新的thread。所以发生这种情况也就不奇怪了,这是我们拦截的效果。

    整体再来看这个例子,它是不是像一个简易版的协程的线程切换呢?

    CoroutineDispatcher

    现在我们来看Dispatchers.Main,为什么它会导致我们拦截失败呢?要探究原因没有直接看源码更加直接有效的。

    public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
    

    主要看它的类型,它返回的是MainCoroutineDispatcher,然后再看它是什么

    public abstract class MainCoroutineDispatcher : CoroutineDispatcher() {}
    

    发现MainCoroutineDispatcher继承于CoroutineDispatcher,主角登场了,但还不够我们继续看CoroutineDispatcher是什么

    public abstract class CoroutineDispatcher :
        AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
        
        public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
        
        public abstract fun dispatch(context: CoroutineContext, block: Runnable)
        
        public open fun dispatchYield(context: CoroutineContext, block: Runnable) = dispatch(context, block)
        
        public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
            DispatchedContinuation(this, continuation)
    }
    

    真想已经浮出水面了,原来CoroutineDispatcher实现了ContinuationInterceptor,说明CoroutineDispatcher也具有拦截器的功能。然后再结合CoroutineContext的性质,就很好解释为什么我们自定义的拦截器没有生效。

    原因就是它与我们自定义的拦截器一样都实现了ContinuationInterceptor接口,一旦使用Dispatchers.Main就会替换掉我们自定义的拦截器。

    因果关系弄明白了现在就好办了。我们已经知道它具有拦截功能,再来看CoroutineDispatcher提供的另外几个方法isDispatchNeededdispatch

    我们可以大胆猜测,isDispatchNeeded就是判断是否需要分发,然后dispatch就是如何进行分发,接下来我们来验证一下。

    ContinuationInterceptor重要的方法就是interceptContinuation,在CoroutineDispatcher中直接返回了DispatchedContinuation对象,它是一个Continuation类型。那么自然重点就是它的resumeWith方法。

    override fun resumeWith(result: Result<T>) {
        val context = continuation.context
        val state = result.toState()
        if (dispatcher.isDispatchNeeded(context)) {
            _state = state
            resumeMode = MODE_ATOMIC_DEFAULT
            dispatcher.dispatch(context, this)
        } else {
            executeUnconfined(state, MODE_ATOMIC_DEFAULT) {
                withCoroutineContext(this.context, countOrElement) {
                    continuation.resumeWith(result)
                }
            }
        }
    }
    

    这里我们看到了isDispatchNeededdispatch方法,如果不需要分发自然是直接调用原始的continuation对象的resumeWith方法,也就没有什么类似于线程的切换。

    那什么时候isDispatcheNeededtrue呢?这就要看它的dispatcer是什么。

    由于现在我们是拿Dispatchers.Main作分析。所以这里我直接告诉你们它的dispatcherHandlerContext

    override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
        HandlerContext(Looper.getMainLooper().asHandler(async = true), "Main")
    
    internal class HandlerContext private constructor(
        private val handler: Handler,
        private val name: String?,
        private val invokeImmediately: Boolean
    ) : HandlerDispatcher(), Delay {
        /**
         * Creates [CoroutineDispatcher] for the given Android [handler].
         *
         * @param handler a handler.
         * @param name an optional name for debugging.
         */
        public constructor(
            handler: Handler,
            name: String? = null
        ) : this(handler, name, false)
    
        @Volatile
        private var _immediate: HandlerContext? = if (invokeImmediately) this else null
    
        override val immediate: HandlerContext = _immediate ?:
            HandlerContext(handler, name, true).also { _immediate = it }
    
        override fun isDispatchNeeded(context: CoroutineContext): Boolean {
            return !invokeImmediately || Looper.myLooper() != handler.looper
        }
    
        override fun dispatch(context: CoroutineContext, block: Runnable) {
            handler.post(block)
        }
        ...
    }
    

    它继承于HandlerDispatcher,而HandlerDispatcher继承于MainCoroutineDispatcher

    条件都符合,我们直接看isDispatchNeeded方法返回true的逻辑。

    首先通过invokeImmediately判断,它代表当前线程是否与自身的线程相同,如何你外部使用者能够保证这一点,就可以直接使用Dispatcher.Main.immediate来避免进行线程的切换逻辑。当然为了保证外部的判断失败,最后也会通过Looper.myLooper() != handler.looper来进行校正。对于Dispatchers.Main这个的handle.looper自然是主线程的looper

    如果不能保证则invokeImmediatelyfalse,直接进行线程切换。然后进入dispatch方法,下面是Dispatchers.Maindispatch的处理逻辑。

    override fun dispatch(context: CoroutineContext, block: Runnable) {
        handler.post(block)
    }
    

    这个再熟悉不过了,因为这个时候的handler.post就是代表向主线程推送消息,此时的block将会在主线程进行调用。

    这样线程的切换就完成。

    所以综上来看,CoroutineDispatcher为协程提供了一个线程切换的统一判断与执行标准。

    首先在协程进行启动的时候通过拦截器的方式进行拦截,对应的方法是interceptContinuation,然后返回一个具有切换线程功能的Continuation,在每次进行resumeWith的时候,内部再通过isDispatchNeeded进行判断当前协程的运行是否需要切换线程。如果需要则调用dispatch进行线程的切换,保证协程的正确运行。

    如果我要自定义协程线程的切换逻辑,就可以通过继承于CoroutineDispatcher来实现,将它的核心方法进行自定义即可。

    当然,如果你是在Android中使用协程,那基本上是不需要自定义线程的切换逻辑。因为kotlin已经为我们提供了日常所需的Dispatchers。主要有四种分别为:

    1. Dispatchers.Default: 适合在主线程之外执行占用大量CPU资源的工作
    2. Dispatchers.Main: Android主线程
    3. Dispatchers.Unconfined: 它不会切换线程,只是启动一个协程进行挂起,至于恢复之后所在的线程完全由调用它恢复的协程控制。
    4. Dispatchers.IO: 适合在主线程之外执行磁盘或网络I/O

    最后我们再来简单提一下withContext

    withContext

    CoroutineDispatcher虽然能够提供线程的切换,但这只是单方向的,因为它没有提供线程的恢复。

    试想一下,我们有个网络请求,我们通过CoroutineDispatcher将线程切换到Dispatchers.IO,当拿到请求成功的数据之后,所在的线程还是IO线程,这样并不能有利于我们UI操作。所以为了解决这个问题kotlin提供了withContext,它不仅能够接受CoroutineDispatcher来帮助我们切换线程,同时在执行完毕之后还会帮助我们将之前切换掉的线程进恢复,保证协程运行的连贯性。这也是为什么官方推荐使用withContext进行协程线程的切换的原因。

    withContext的线程恢复原理是它内部生成了一个DispatchedCoroutine,保存切换线程时的CoroutineContext与切换之前的Continuation,最后在onCompletionInternal进行恢复。

    internal override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {
        if (state is CompletedExceptionally) {
            val exception = if (mode == MODE_IGNORE) state.cause else recoverStackTrace(state.cause, uCont)
            uCont.resumeUninterceptedWithExceptionMode(exception, mode)
        } else {
            uCont.resumeUninterceptedMode(state as T, mode)
        }
    }
    

    这个uCont就是切换线程之前的Continuation。具体实现就不在这分析了,感兴趣的老铁可以自己翻一翻源码。

    本篇文章主要介绍了ContinuationInterceptor作用与如何拦截协程的,同时也分析了CoroutineDispatcher内部结构,进一步剖析了协程线程切换的原理。希望对学习协程的伙伴们能够有所帮助,敬请期待后续的协程分析。

    项目

    android_startup: 提供一种在应用启动时能够更加简单、高效的方式来初始化组件,优化启动速度。不仅支持Jetpack App Startup的全部功能,还提供额外的同步与异步等待、线程控制与多进程支持等功能。

    AwesomeGithub: 基于Github客户端,纯练习项目,支持组件化开发,支持账户密码与认证登陆。使用Kotlin语言进行开发,项目架构是基于Jetpack&DataBindingMVVM;项目中使用了ArouterRetrofitCoroutineGlideDaggerHilt等流行开源技术。

    flutter_github: 基于Flutter的跨平台版本Github客户端,与AwesomeGithub相对应。

    android-api-analysis: 结合详细的Demo来全面解析Android相关的知识点, 帮助读者能够更快的掌握与理解所阐述的要点。

    daily_algorithm: 每日一算法,由浅入深,欢迎加入一起共勉。

    相关文章

      网友评论

        本文标题:Kotlin协程实现原理:ContinuationInterce

        本文链接:https://www.haomeiwen.com/subject/xznliktx.html