美文网首页
Kotlin 协程启动篇:静态代理分层

Kotlin 协程启动篇:静态代理分层

作者: jumplover | 来源:发表于2019-11-04 11:48 被阅读0次

    前段时间在项目中引入了 Kotlin Coroutine,那么也来谈谈对它的理解。所谓窥一斑而知全豹,首先尝试透过一个🌰来窥探协程的启动流程,直接通过 GlobalScope.launch 启动了一个顶级协程并指定在主线程中执行:

    fun test() {
        GlobalScope.launch(Dispatchers.Main) {
            Log.d("GlobalScopeTest", "Log in GlobalScope")
        }
        Log.d("GlobalScopeTest", "Log after GlobalScope")
    }
    

    上面的代码输出结果为:

    Log after GlobalScope
    Log in GlobalScope
    

    为什么会出现这样的执行顺序?协程体内的代码逻辑是透过什么机制分发到主线程执行?要解答这些问题,还得从 GlobalScope.launch 寻找突破口。

    1、GlobalScope.launch:协程入口

    launch 是个扩展函数、启动协程并且返回一个 job 对象。这里先大概了解返回的 job 可以方便地对协程进行追踪,取消等操作。

    launch 源码:

    public fun CoroutineScope.launch(
       context: CoroutineContext = EmptyCoroutineContext,
       start: CoroutineStart = CoroutineStart.DEFAULT,
       block: suspend CoroutineScope.() -> Unit
    ): Job {
       val newContext = newCoroutineContext(context)
       // 构建 AbstractCoroutine 对象,并由此开启协程任务
       val coroutine = if (start.isLazy)
           LazyStandaloneCoroutine(newContext, block) else
           StandaloneCoroutine(newContext, active = true)
       coroutine.start(start, coroutine, block)
       return coroutine
    }
    

    看了 launch 方法,会有一个疑惑:经过编译后,协程把我们的代码逻辑封装成了一个 SuspendLambda 类,那么必然需要该类的相关信息才可能进行回调执行逻辑,但是在入口处并没有看到相关参数,难道还有什么黑科技能够获取到类相关信息?

    事实上反编译 class 文件,发现 CoroutineScope.launch 方法签名和源码实现略有不同,其中多了一个参数:Function2<? super CoroutineScope, ? super Continuation<? super T>, ? extend Object>。 而生成的 SuspendLambda 类就是 Function2 的实现类。

    没错,在调用 CoroutineScope.launch 的时候,传入的 Function2 的参数值就是生成的 SuspendLambda 对象!所以在这里并没有什么黑科技,只是由于编译器的参与,隐藏了其中一些细节。

    反编译得到的 launch 源码:

    @NotNull
    public static final Job launch(@NotNull CoroutineScope $this$launch, @NotNull CoroutineContext context, @NotNull CoroutineStart start, @NotNull Function2<? super CoroutineScope, ? super Continuation<? super Unit>, ? extends Object> block) {
       StandaloneCoroutine coroutine;
       Intrinsics.checkParameterIsNotNull($this$launch, "$this$launch");
       Intrinsics.checkParameterIsNotNull(context, "context");
       Intrinsics.checkParameterIsNotNull(start, "start");
       Intrinsics.checkParameterIsNotNull(block, "block");
       CoroutineContext newContext = CoroutineContextKt.newCoroutineContext($this$launch, context);
       if (start.isLazy()) {
            coroutine = new LazyStandaloneCoroutine(newContext, block);
       } else {
           coroutine = new StandaloneCoroutine(newContext, true);
       }
       coroutine.start(start, coroutine, block);
       return coroutine;
    }
    
    

    回到 launch ,方法最后调用 start 启动了协程。层层追踪最后调用了 startCoroutineCancellable:

    internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
        runSafely(completion) {
            createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellable(Unit)
        }
    
    

    观察方法名大概能得出一个判断:

    1. createCoroutineUnintercepted:首先创建了一个非拦截的协程 Continuation。
    2. intercepted:添加拦截器 Continuation。
    3. resume:正式启动协程。

    看来要知道更多细节,还要从这几个方法入手。

    2、createCoroutineUnintercepted:构建任务层对象

    createCoroutineUnintercepted 在 jvm 的实现在 IntrinsicsJvm 类中。在这里,launch 处传入的 Function2 参数起了作用,通过这个对象调用 create() 来构建 编译器生成的 SuspendLambda 对象

    @SinceKotlin("1.3")
    public fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
        receiver: R,
        completion: Continuation<T>
    ): Continuation<Unit> {
        val probeCompletion = probeCoroutineCreated(completion)
        return if (this is BaseContinuationImpl)
             // 创建 SuspendLambda 对象
            create(receiver, probeCompletion)
        else {
            createCoroutineFromSuspendFunction(probeCompletion) {
                (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
            }
        }
    }
    
    

    反编译得到的 create 方法 :

    @NotNull
    public final Continuation<Unit> create(@Nullable Object value, @NotNull Continuation<?> completion) {
        Intrinsics.checkParameterIsNotNull(completion, "completion");
        
        // 将Continuation传入作为completion参数值(静态代理)
        Continuation coroutineTest$test$1 = new CoroutineTest$test$1(this.$context, completion);
        CoroutineScope coroutineScope = (CoroutineScope) value;
        coroutineTest$test$1.p$ = (CoroutineScope) value;
        return coroutineTest$test$1;
    }
    
    

    所以 launch 传入的 Function2 参数作用是构建出编译器为我们生成的 SuspendLambda 对象,后续才能回调挂起的逻辑。那么 SuspendLambda.intercepted() 又做了什么?

    3、Continuation.intercepted:构建任务调度层对象

    追踪 intercepted 调用链,最终调用了 ContinuationImpl.intercepted :

    public fun intercepted(): Continuation<Any?> =
        intercepted ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                    .also { intercepted = it }
    
    

    上面代码关键点就在于 (context[ContinuationInterceptor]?.interceptContinuation(this),context[ContinuationInterceptor] 得到的是 HandlerContext 对象(why?)而 handlerContext 是 CoroutineDispatcher 的子类(CoroutineDispatcher 可以理解为线程级别的任务调度类,决定了任务在哪个线程中执行),CoroutineDispatcher.interceptContinuation() 返回了 DispatchedContinuation 对象。

    why HandlerContext handlerContext = context[ContinuationInterceptor] ?

    栗子中我们调用 launch 传入了 Dispatchers.Main 作为 context,CoroutineContext 可以理解为一个数据结构,其中键为 ContinuationInterceptor,值为 Dispatchers.Main。所以context[ContinuationInterceptor] 得到的是 Dispatchers.Main,即为 HandlerContext。

    intercepted 就是通过静态代理得到了 DispatchedContinuation 对象。接下来只需要通过 resumeCancellable 开启协程。

    4、DispatchedContinuation.resumeCancellable:开启协程任务

    DispatchedContinuation 为任务调度类,将具体任务分配给 Dispatcher(也就是上面通过 context[ContinuationInterceptor] 得到的对象) 执行。resumeCancellable 内部调用 dispatcher.dispatch 将任务交给了 Dispatcher,方法第二个参数 this 指代 runnable 对象。

    resumeCancellable 的实现:

    @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
    inline fun resumeCancellable(value: T) {
       // 指定需要线程调度,默认为true
       if (dispatcher.isDispatchNeeded(context)) {
           _state = value
           resumeMode = MODE_CANCELLABLE
           dispatcher.dispatch(context, this)
       } else {
       // 不需要线程调度
           executeUnconfined(value, MODE_CANCELLABLE) {
               if (!resumeCancelled()) {
                    resumeUndispatched(value)
               }
           }
       }
    }
    
    

    HandlerContext 的 dispatch 方法实现如下,通过 handler 把任务抛出去:

    override fun dispatch(context: CoroutineContext, block: Runnable) {
        handler.post(block)
    }
    
    

    DispatchedContinuation 内 runnable.run 方法实现如下:

    public final override fun run() {
       val taskContext = this.taskContext
       var exception: Throwable? = null
       try {
           val delegate = delegate as DispatchedContinuation<T>
           val continuation = delegate.continuation
           val context = continuation.context
           val job = if (resumeMode.isCancellableMode) context[Job] else null
           val state = takeState() // NOTE: Must take state in any case, even if cancelled
           withCoroutineContext(context, delegate.countOrElement) {
               if (job != null && !job.isActive) {
                   val cause = job.getCancellationException()
                   cancelResult(state, cause)
                   continuation.resumeWithStackTrace(cause)
               } else {
                   val exception = getExceptionalResult(state)
                   if (exception != null)
                       continuation.resumeWithStackTrace(exception)
                   else
                     // 回调suspendLambda.resumeWith()
                       continuation.resume(getSuccessfulResult(state))
               }
           }
       } catch (e: Throwable) {
            // This instead of runCatching to have nicer stacktrace and debug experience
          exception = e
       } finally {
          val result = runCatching { taskContext.afterTask() }
          handleFatalException(exception, result.exceptionOrNull())
       }
    }
    

    run 方法内部通过调用 continuation.resume 回调整条链路的任务逻辑。

    continuation.resume 的主要下游调用链路:continuation.resume() -> continuation.resumeWith() -> suspendLambda.invokeuspend() -> AbstractContinue.resumeWith()。

    其中 suspendLambda.invokeuspend 内部封装的就是我们栗子中的逻辑: Log.d("GlobalScopeTest", "Log in GlobalScope")

    从 launch 开启协程到如何回调到我们的逻辑大致流程已经理清。由于这个栗子指定了主线程作为运行环境。还有更多关于协程如何进行线程调度、多线程下函数挂起、恢复等流程在这里未涉及到。不过我们大概可以推测对于指定IO线程执行的协程体,协程必然维护了自己的线程池作为协程的运行环境。

    小结

    1. launch 方法还有一个隐式参数:Function2,也就是编译器生成 SuspendLambda 的子类对象。为后续构建真实可用的 SuspendLambda 对象做铺垫。
    2. 通过静态代理构建了三层 Continuation 分别负责不同的职责:AbstractCoroutine(协程状态处理层) -> SuspendLambda(逻辑执行层) -> CoroutineDispatcher(任务调度层)
    3. 任务最终交由 CoroutineDispatcher 执行。最终脱离不开线程级别的任务调度。

    相关文章

      网友评论

          本文标题:Kotlin 协程启动篇:静态代理分层

          本文链接:https://www.haomeiwen.com/subject/dohtuctx.html