美文网首页kotlin
「Kotlin篇」原来,协程是这么挂起的!

「Kotlin篇」原来,协程是这么挂起的!

作者: 付十一v | 来源:发表于2021-12-13 11:28 被阅读0次

    协程这个概念已经出来很长时间了,网上对它的定义是非阻塞式的线程框架,讨论最多的也是协程的挂起、恢复以及线程切换,那到底挂起是个什么样的概念,怎么就挂起了,怎么就又恢复了?

    带着这些问题,我走上了不归路......

    在开始探索协程挂起、恢复之前,需要先了解一下几个重要的名词和概念。

    1. Continuation

    Continuation在协程中其实只是一个接口,其作用有点类似RxJava中Observer,当请求成功时,触发onNext继续更新UI或者下一步的操作。只不过在协程中,Continuation包装了协程在挂起之后应该继续执行的代码,在编译的过程中,一个完整的协程被分割切块成一个又一个续体。在 await 函数的挂起结束以后,它会调用 continuation 参数的 resumeWith 函数,来恢复执行 await 函数后面的代码。

    2. invokeSuspend

    invokeSuspend中包含的便是我们协程体中的代码,内部管理了一个状态值,循环触发invokeSuspend的调用,而后根据不同的状态,做协程的挂起和恢复操作。

    这两个名词虽然比较抽象,但是对于后面的分析还是比较重要的,还是先以一个简单的例子开始,慢慢理解。

    使用launch开启一个协程,在协程体中,加入两个挂起函数,loadDataAloadDataB,并且在函数前后,log打印出函数的运行轨迹。如下:

    Log.d(TAG, "onCreate: start")
    lifecycleScope.launch {
        val num = loadDataA()
        loadDataB(num)
    }
    Log.d(TAG, "onCreate: end")
    
    private suspend fun loadDataA():Int {
        delay(3000)
        Log.d(TAG, "loadDataA: ")
        return 1
    }
    
    private suspend fun loadDataB(num:Int) {
        delay(1000)
        Log.d(TAG, "loadDataB: $num")
    }
    

    那launch开启协程,内部做了些什么?又是如何处理loadDataA和loadDataB这些挂起函数的?

    不妨先在launch处打上断点,Debug看看它的执行路径。

    image.png

    如上图所示,从onCreate开始,一个lifecycleScope.launch 的执行顺序是

    launch->start->invoke->startCoroutineCancellable->resumeWith再到最后invokeSuspend方法,

    至于invokeSuspend的作用是什么?这个后面会详细说明。

    既然给了大致的执行方向,我们只需要一步一步的跟进,查看内部详细的代码处理。

    # Builders.common.kt
        
    public fun CoroutineScope.launch(
        context: CoroutineContext = EmptyCoroutineContext,
        start: CoroutineStart = CoroutineStart.DEFAULT,
        block: suspend CoroutineScope.() -> Unit
    ): Job {
        val newContext = newCoroutineContext(context)
        val coroutine = if (start.isLazy)
            LazyStandaloneCoroutine(newContext, block) else
            StandaloneCoroutine(newContext, active = true)
        coroutine.start(start, coroutine, block)
        return coroutine
    }
    

    launch开始,在不阻塞当前线程的情况下启动一个新的协程,并将对协程的引用作为Job 返回,CoroutineScope.launch中传入了三个参数,第一个CoroutineContext为协程的上下文,第二个CoroutineStart,协程启动选项。 默认值为CoroutineStart.DEFAULT ,第三个block即协程体中的代码,也就是上面例子中的loadDataAloadDataB

    接着coroutine.start(start, coroutine, block)使用给定的代码块和启动策略启动此协程。

    # AbstractCoroutine.kt
        
    public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
        initParentJob()
        start(block, receiver, this)
    }
    

    initParentJob()方法之后,调用了start,这个时候你会发现,已经无法再继续跟进去了。

    但是在文章一开始Debug的路径显示,在start后,执行到的是CoroutineStart#invoke()

    # CoroutineStart.kt
        
    @InternalCoroutinesApi
    public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
        when (this) {
            DEFAULT -> block.startCoroutineCancellable(completion)
            ATOMIC -> block.startCoroutine(completion)
            UNDISPATCHED -> block.startCoroutineUndispatched(completion)
            LAZY -> Unit // will start lazily
        }
    

    在invoke中使用此协程的启动策略将相应的块作为协程启动,这里以DEFAULT为例,startCoroutineCancellable(),而在startCoroutineCancellable中,创建了Continuation,且调用resumeWith来传递请求结果。

    # DispatchedContinuation.kt
    
    public fun <T> Continuation<T>.resumeCancellableWith(
        result: Result<T>,
        onCancellation: ((cause: Throwable) -> Unit)? = null
    ): Unit = when (this) {
        is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
        else -> resumeWith(result)
    }
    

    到这里,你可能还是云里雾里,跟进这么多方法,到底有什么用途?不急

    我们接着往下看核心部分Continuation

    public interface Continuation<in T> {
        public val context: CoroutineContext
        
        public fun resumeWith(result: Result<T>)
    }
    

    Continuation是一个接口,接口内有一个resumeWith方法,在上面launch启动协程中startCoroutineCancellable调用了resumeWith。既然是接口且被调用,那必然是有地方实现了该接口,并且在resumeWith中做了一些事情。

    那就来看看实现Continuation接口的地方做了些啥?

    Continuation的具体实现是在ContinuationImpl类中,

    internal abstract class ContinuationImpl(
        completion: Continuation<Any?>?,
        private val _context: CoroutineContext?
    ) : BaseContinuationImpl(completion) 
    

    ContinuationImpl继承自BaseContinuationImpl,在BaseContinuationImpl中就可以看到resumeWith的具体实现。

    internal abstract class BaseContinuationImpl(
        public val completion: Continuation<Any?>?
    ) : Continuation<Any?>, CoroutineStackFrame, Serializable {
        
        public final override fun resumeWith(result: Result<Any?>) {
            var current = this
            var param = result
            while (true) {
     
                probeCoroutineResumed(current)
                with(current) {
                    val completion = completion!! 
                    val outcome: Result<Any?> =
                        try {
                            val outcome = invokeSuspend(param)
                            if (outcome === COROUTINE_SUSPENDED) return
                            Result.success(outcome)
                        } catch (exception: Throwable) {
                            Result.failure(exception)
                        }
                    releaseIntercepted() // this state machine instance is terminating
                    if (completion is BaseContinuationImpl) {
                        // unrolling recursion via loop
                        current = completion
                        param = outcome
                    } else {
                      
                        completion.resumeWith(outcome)
                        return
                    }
                }
            }
        }
    

    在resumeWith实现中,最核心的部分是在while循环中,调用invokeSuspend并且对返回的标志判断。

    val outcome = invokeSuspend(param)
    if (outcome === COROUTINE_SUSPENDED) return
    Result.success(outcome)
    

    invokeSuspend这个方法是不是有点熟悉?

    其实在文章开头Debug launch时,invokeSuspendlifecycleScope.launch开启协程内部执行的最后一个方法,invokeSuspend之后即开始执行协程体中的内容。

    既然我们已经从上面的分析了解到invokeSuspend是由ContinuationresumeWith而触发,那接下就看看
    协程体中的内容是如何执行的?即协程是如何挂起和恢复的?

    我们直接将文章开头的案例反编译。

    #启动协程

    image.png

    #挂起函数loadDataA

    image.png

    #挂起函数loadDataB

    image.png

    从启动协程launch开始,到执行resumeWith,触发invokeSuspend方法,可以看到在invokeSuspend中有存储了一个变量label,初始值为0。

    1. label=0,则进入case 0分支,

    case 0:
       ResultKt.throwOnFailure($result);
       var4 = MainActivity.this;
       this.label = 1;
       var10000 = var4.loadDataA(this);
       if (var10000 == var3) {
          return var3;
       }
       break;
    

    执行var4.loadDataA,在loadDataA中同样存在一个label,初始值同样为0,进入loadDataA中的case 0分支,执行delay()方法,同时将label置为1,并返回COROUTINE_SUSPENDED标志。

    # loadDataA
    case 0:
       ResultKt.throwOnFailure($result);
       ((<undefinedtype>)$continuation).label = 1;
       if (DelayKt.delay(3000L, (Continuation)$continuation) == var4) {
          return var4;
       }
       break;
    

    标识返回成功后,即var10000=COROUTINE_SUSPENDED,同时var3=COROUTINE_SUSPENDEDif (var10000 == var3)则直接return,跳出了launch操作,执行协程外的Log.d(TAG, "onCreate: end"),同时也将label置为1,这就是挂起的概念

    2. 挂起后,loadDataA中 Delay完3s将开始恢复协程,触发了loadDataA中的invokeSuspend方法。

    $continuation = new ContinuationImpl(var1) {
       // $FF: synthetic field
       Object result;
       int label;
    
       @Nullable
       public final Object invokeSuspend(@NotNull Object $result) {
          this.result = $result;
          this.label |= Integer.MIN_VALUE;
          return MainActivity.this.loadDataA(this);
       }
    };
    

    invokeSuspend中依然是执行MainActivity.this.loadDataA,只不过此时loadDataA中所保存的label的值=1,进入case 1分支,

    case 1:
       ResultKt.throwOnFailure($result);
       break;
    default:
       throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
    }
    
    Log.d("MainActivity", "loadDataA: ");
    return Boxing.boxInt(1);
    

    case1分支中只做了结果失败时异常的抛出,随后便执行了Log.d("MainActivity", "loadDataA: ");并返回值Int值。

    3. 在触发loadDataA中的invokeSuspend时,也触发了launch协程中的invokeSuspend,此时label=1,进入case1分支,

    case 1:
       ResultKt.throwOnFailure($result);
       var10000 = $result;
       break;
       ...
    default:
       throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
    }
    
    int num = ((Number)var10000).intValue();
    var4 = MainActivity.this;
    this.label = 2;
    if (var4.loadDataB(num, this) == var3) {
       return var3;
    } else {
       return Unit.INSTANCE;
    }
    

    同样,对结果做了失败的判断,同时将loadDataA所返回的Int值,赋值给了变量num,开始loadDataB的执行。

    4. loadDataB中的处理模式与loadDataA中一致,因为loadDataB之后无挂起函数,则在触发invokeSuspend时,返回的是Unit.INSTANCE,结束协程运行。

    总结一下,

    什么是挂起?

    简单来说就是判断标识为COROUTINE_SUSPENDED时,使用Continuation暂存当前协程的状态,而后直接return出当前协程体。

    什么是非阻塞?

    挂起是在接口的实现invokeSuspend方法中return出去的,而invokeSuspend之外的函数当然还是会继续执行呀。比如说在一个activity的onCreate的方法中,设置一个Button的onClickListener事件,紧跟其后初始化了一个viewModel,这时在onClick里return,那初始化viewModel难道因为return而不执行吗?当然没有影响。这就是挂起为什么是非阻塞式的。

    那恢复又是什么?

    协程挂起时,使用了Continuation暂存当前协程的状态,而挂起函数恢复时,会调用Continuation的resumeWith方法,继而触发invokeSuspend,根据Continuation所保存的label值,进入不同的分支,恢复之前挂起协程的状态,并且执行下一个状态。

    推荐阅读

    「Kotlin篇」多方位处理协程的异常

    「Kotlin篇」差异化分析,let,run,with,apply及also

    相关文章

      网友评论

        本文标题:「Kotlin篇」原来,协程是这么挂起的!

        本文链接:https://www.haomeiwen.com/subject/zwjjfrtx.html