美文网首页
【Coroutine 源码】Coroutine 状态机实现源码分

【Coroutine 源码】Coroutine 状态机实现源码分

作者: allen218 | 来源:发表于2022-06-16 11:15 被阅读0次
1655393622741.jpg

1. 挂起函数

挂起函数是 Coroutine 实现的核心,当我们在调用挂起函数的时候,需要保证调用点符合下面两种情况:

  1. 另一个挂起函数中调用
  2. 协程体 block 中调用

如下所示:

// 代码段 1
fun main() {
    val coroutineScope = CoroutineScope(EmptyCoroutineContext)
    coroutineScope.launch {
        // 协程体 block 调用
        testSuspend1()
    }
}

suspend fun testSuspend1(): String {
    // 另一个挂起函数调用
    testSuspend2()
    return ""
}

suspend fun testSuspend2() {
    
}

fun normalMethod() {
    // 报错
    testSuspend2()
}

我们再来看看协程体 block 调用的情况:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    ......
}

从源码中可以看出,其实协程体 block 也是一个挂起函数环境。总的来说:挂起函数只能在另一个挂起函数中被调用

1.1 为什么普通函数无法调用挂起函数?

要搞明白为什么普通函数无法调用挂起函数,我们需要通过挂起函数反编译的代码来看一下挂起函数具体是实现。

@Nullable
public static final Object testSuspend1(@NotNull Continuation<? super Unit> $completion) {
    Object object = SuspendDemo1Kt.testSuspend2($completion);
    if (object == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
        return object;
    }
    return Unit.INSTANCE;
}

从上面反编译的源码可以看出,挂起函数被编译成了一个带 Continuation 参数的静态方法。所以我们在普通方法中无法调用挂起函数的原因是:挂起函数会被编译成一个带 Continuation 参数的方法,普通函数在调用的时候并没有传入这个参数,无法调用

1.2 为什么挂起函数可以调用挂起函数?

从 1.1 我们看到普通方法无法调用挂起函数的原因是没有传入 Continuation 参数。而我们在挂起函数中同样也没有传入对应的参数,为什么不会报错呢?我们还是进入源码中看看挂起函数在调用挂起函数的时候,到底做了什么操作。

    public static final Object testSuspend1(@NotNull Continuation<? super String> var0) {
        ......
        switch ($continuation.label) {
            case 0: {
                ......
                v0 = SuspendDemo1Kt.testSuspend2($continuation);
                ......
            }
        }
        ......
    }

原来在调用另一个挂起函数的时候,编译器在编译的时候将当前挂起函数中的 continuation 参数直接传递给了其调用的挂起函数,所以不会报错。

testSuspend1 中的 continuation 又是谁传的呢?

CoroutineScope coroutineScope = CoroutineScopeKt.CoroutineScope(EmptyCoroutineContext.INSTANCE);
       ......
            @Nullable
            public final Object invokeSuspend(@NotNull Object object) {
            ......
                switch (this.label) {
                    case 0: {
                        ......
                        Object object3 = SuspendDemo1Kt.testSuspend1(this);
                        ......
                        return object2;
                    }
                    ......
                }
            }
            ......
    }

在代码段 1 我们是在协程 block 中调用的 testSuspend1,通过编译器处理后,将 block 本身传递给了 testSuspend1 方法。那这个 block 到底是什么呢?其实这里的 block 也是一个 Continuation 实现。在这里先卖个关子,后面内容会讲到。

总的来说:由于协程 block 也是一个实现了 Continuation 接口的 lambda 对象,所以,可以直接调用挂起函数

1.3 Continuation 接口

public interface Continuation<in T> {

    public val context: CoroutineContext

    public fun resumeWith(result: Result<T>)
}

Continuation 接口本质上就是一个带有泛型参数的 CallBack,泛型的类型为挂起函数返回值的类型。这里的 resumeWith 方法是协程恢复后,回调结果时调用的方法。

1.4 CPS 转换

挂起函数在编译过程中进行的代码转换被称为 CPS(Continuation-Passing-Style Transformation)。其主要操作是将挂起函数的同步操作转换成 Callback 的异步回调

在代码段 1 中,testSuspend1 挂起函数反编译前后的函数签名的对比来看,是将 suspend () -> String 通过 CPS 转换成了 (Continuation) -> Any?。

反编译前的返回值类型 String 到哪里去了?

前面我们看到 Continuation 接口需要指定泛型类型用于回调挂起函数的执行结果,这里的 String 就是这个具体泛型类型。

挂起函数的本质:通过 CPS 转换将同步调用的代码转换成 Callback 异步回调

2. 简单协程的创建

// 代码段 2
fun main() {
    block.startCoroutine(myContinuation)
}

val block: suspend () -> String = {
    println("block run")
    "block return"
}

val myContinuation = object : Continuation<String> {
    override val context: CoroutineContext
        get() = EmptyCoroutineContext

    override fun resumeWith(result: Result<String>) {
        println("myContinuation run, result: ${result.getOrNull()}")
    }
}

================
block run
myContinuation run, result: block return

代码段 2 中做了以下 3 件事情:

  1. 创建自定义的 Continuation 对象。这里需要实现 context 属性和 resumeWith 方法。其中 context 属性实现中,传入了一个 CoroutineContext 的空实现 EmptyCoroutineContext;resumeWith 方法会在 block 执行完成之后被调用。
  2. 创建一个 suspend () -> String 挂起函数 lambda 对象。这个方法中主要用来执行需要在协程体中执行的代码,执行完成后返回到 myContinuation 的 resumeWith 继续执行。
  3. 调用 kotlin 标准库中协程目录下的 startCoroutine 方法启动协程。

3. 简单协程的启动流程

这一部分主要是把第 2 章中的简单协程的创建过程从源码的角度把整体流程梳理清楚。主要回答以下两个问题:

  1. suspend () -> String 的这个 block 到底是什么
  2. startCoroutine 的启动流程

3.1 suspend () -> String 是什么

// 代码段 3
static final class SimpleCoroutineKt$block$1 extends SuspendLambda implements Function1<Continuation<? super String>, Object> {
    ......
  }
  
internal abstract class SuspendLambda(
    public override val arity: Int,
    completion: Continuation<Any?>?
) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction {
    ......
}

internal abstract class ContinuationImpl(
    completion: Continuation<Any?>?,
    private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
    ......
}

internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    ......
}

代码段 2 中的 block 被编译后继承至 SuspendLambda,最终继承实现了 Continuation 接口,也就是说 block 本身是一个实现了 Continuation 接口的类。

[图片上传失败...(image-69485-1655348985303)]

3.2 startCoroutine 的启动流程

// 代码段 4
@SinceKotlin("1.3")
@Suppress("UNCHECKED_CAST")
public fun <T> (suspend () -> T).startCoroutine(
    completion: Continuation<T>
) {
     创建 Coroutine 对象                        拦截器     执行 Coroutine
            ↓                                     ↓            ↓
    createCoroutineUnintercepted(completion).intercepted().resume(Unit)
}

当 suspend () -> String 对象调用 startCoroutine 方法后,做了以下几个操作:

  1. 调用 createCoroutineUnintercepted 函数创建 Coroutine 对象;
  2. 对 1. 中创建的 Coroutine 对象进行拦截操作,也就是对 Coroutine 对象进行包装,在执行 Coroutine 前做一些如切换线程的处理。默认不传任何拦截器的情况下,不会对 Coroutine 进行拦截;
  3. 调用 resume 函数执行 Coroutine。

3.2.1 createCoroutineUnintercepted(completion)

// 代码段 5
@SinceKotlin("1.3")
public expect fun <T> (suspend () -> T).createCoroutineUnintercepted(
    completion: Continuation<T>
): Continuation<Unit>

createCoroutineUnintercepted 被 expect 修辞了,说明当前函数是一个跨平台的函数,需要在对应的平台实现中去找具体的实现,这里是 JVM 平台,文件名称为:IntrinsicsJvm.kt。

// 代码段 6
@SinceKotlin("1.3")
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
    completion: Continuation<T>
): Continuation<Unit> {
    
    val probeCompletion = probeCoroutineCreated(completion)
    return if (this is BaseContinuationImpl)
        // 走这里的逻辑
        create(probeCompletion)
    else
        createCoroutineFromSuspendFunction(probeCompletion) {
            (this as Function1<Continuation<T>, Any?>).invoke(it)
        }
}

在代码段 3 中可以看到这个 (suspend () -> T) 对象继承了 BaseContinuationImpl 类,所以这里会走 create(probeCompletion) 方法。这个 create 方法对应的是 (suspend () -> T) 对象的方法,需要到反编译的代码里去找该方法。

// 代码段 7
static final class SimpleCoroutineKt$block$1 extends SuspendLambda implements Function1<Continuation<? super String>, Object> {
    int label;
    
    SimpleCoroutineKt$block$1(Continuation $completion) {
      super(1, $completion);
    }
    
    @Nullable
    public final Object invokeSuspend(@NotNull Object $result) {
      IntrinsicsKt.getCOROUTINE_SUSPENDED();
      switch (this.label) {
        case 0:
          ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);
          System.out.println("block run");
          return "block return";
      } 
      throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
    }
    
    @NotNull
    public final Continuation<Unit> create(@NotNull Continuation<? super SimpleCoroutineKt$block$1> $completion) {
            // 创建 SuspendLambda 对象,并接收 myContinuation
      return (Continuation<Unit>)new SimpleCoroutineKt$block$1(c);
    }
    
    @Nullable
    public final Object invoke(@Nullable Continuation<?> p1) {
      return ((SimpleCoroutineKt$block$1)create(p1)).invokeSuspend(Unit.INSTANCE);
    }
  }

从上面反编译的代码来看,这里通过调用该 create 方法来创建了当前这个 Continuation 实例并返回给了 createCoroutineUnintercepted 方法。也就是 createCoroutineUnintercepted 方法最终会创建一个 SuspendLambda 类型的 Continuation 实例,这个 Continuation 实例中持有 myContinuation(该类型也是一个 Continuation 实例)。

3.2.2 intercepted()

// 代码段 8
@SinceKotlin("1.3")
public expect fun <T> Continuation<T>.intercepted(): Continuation<T>

这个 intercepted 同样也是一个跨平台的方法,同样的,我们也需要到 JVM 平台中找对应的实现。

// 代码段 9
@SinceKotlin("1.3")
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
    // this 继承 ContinuationImpl
    (this as? ContinuationImpl)?.intercepted() ?: this

由代码段 3 可知,这里的 this 继承至 ContinuationImpl,所以会调用 ContinuationImpl 的 intercepted 方法。

// 代码段 10
@SinceKotlin("1.3")
// State machines for named suspend functions extend from this class
internal abstract class ContinuationImpl(
    completion: Continuation<Any?>?,
    private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
    constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
    ......
    private var intercepted: Continuation<Any?>? = null

    public fun intercepted(): Continuation<Any?> =
        // 由于代码段 2 中的例子没有传拦截器,所以,这里返回 this
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }

    protected override fun releaseIntercepted() {
        ......
    }
}

初次调用 intercepted 方法时,intercepted 属性值为空,走 (context[ContinuationInterceptor]?.interceptContinuation(this) 这段代码,而在代码段 3 中我们并没有传递任何拦截器对象,所以,这里直接将 createCoroutineUnintercepted 函数创建的 Continuation 对象赋值给了 intercepted 属性。后面再次 intercepted 方法时,由于已经被赋值了,会直接返回对应的值。

3.2.3 resume(Unit)

// 代码段 11
@SinceKotlin("1.3")
@InlineOnly
public inline fun <T> Continuation<T>.resume(value: T): Unit =
    resumeWith(Result.success(value))

这个 resume 方法会调用到 Continuation 的 resumeWith 方法,而这个 Continuation 是一个 SuspendLambda 类型的 Continuation,根据 3.1 中的继承关系,最终调用到了 BaseContinuationImpl 的 resumeWith 方法,该方法是协程状态机的核心,整个状态机的调度都是在该方法中完成的,我们会在下面 5. 协程状态机的运行机制中讲到。

到这里简单协程通过状态机机制就运行起来了,整个启动的时序图如下所示:

4. 几种常见的 Continuation

4.1 completion Continuation

3.x 中简单协程中调用 startCoroutine 方法启动协程时传入的 Continuation 值就是一个 completion Continuation,这个类型的 Continuation 主要用于当 block 中的代码执行完成返回后,调用其 resumeWith 恢复到协程体继续执行的 Continuation。

// 代码段 12
static final class SampleCoroutineKt$block$1 extends SuspendLambda implements Function1<Continuation<? super String>, Object> {
    int label;
    
    SampleCoroutineKt$block$1(Continuation $completion) {
      super(1, $completion);
    }
......

internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    public final override fun resumeWith(result: Result<Any?>) {
                while(true) {
                ......
                // 这里的 completion 直接实现的 Continuation 接口,未继承 BaseContinuationImpl
                if (completion is BaseContinuationImpl) {
                    // unrolling recursion via loop
                    current = completion
                    param = outcome
                } else {
                    // 执行该代码
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }

我们在创建 block 所对应的 SuspendLambda 类型的 Continuation 对象时,传入了 completion Continuation,该 completion Continuation 被传给了父类,最终 BaseContinuationImpl 接收到了该 completion Continuation,上面说过 BaseContinuationImpl 就是协程状态机的核心流转类,而 completion Continuation 直接实现的 Continuation,未继承至 BaseContinuationImpl,所以,上面的状态机流转中,执行了 completion.resumeWith(outcome) 代码并直接返回来结束了状态机中的 while(true) 循环,也就是当 completion Continuation 被回调后,状态机的流转也就结束了

4.2 suspendLambda Continuation

suspend () -> String 类型的 block 就是一个 suspendLambda Continuation。该类型的 Continuation 主要用于从其它直接被 block 所调用的挂起函数恢复后回调当前 suspendLambda Continuation 中的 resumeWith 方法返回到 block 代码块中继续执行对应挂起函数下面的代码。

[图片上传失败...(image-2020c8-1655348985304)]

这里的 suspendMethod1 或者 suspendMethod2 挂起函数恢复后都会回调当前 suspendLambda Continuation 中的 resumeWith 方法,回调到 block 中继续执行 block 中的代码。

4.3 SafeContinuation

internal expect class SafeContinuation<in T> : Continuation<T> {
    internal constructor(delegate: Continuation<T>, initialResult: Any?)

    @PublishedApi
    internal constructor(delegate: Continuation<T>)

    @PublishedApi
    internal fun getOrThrow(): Any?

    override val context: CoroutineContext
    override fun resumeWith(result: Result<T>): Unit
}

默认使用 suspendCoroutineUninterceptedOrReturn 时需要开发者自己来实现挂起和恢复的操作,对开发人员的要求较高,一不小心就容易出现无法正常挂起,或者重复调用 resumeWith 进行恢复的情况,而导致执行流程与预期不符。

SafeContinuation 主要是 suspendCoroutine 在对 suspendCoroutineUninterceptedOrReturn 进行封装在内部实现安全挂起时所使用的 Continuation。在上面的代码可以看出,其主要是使用代理模式,在这个代理 Confituation 的 resumeWith 方法中实现安全的挂起。这个类的具体实现会在以后的文章中再介绍,这里先提一下。

suspendCoroutine 中 Continuation 的层级结构为:

[图片上传失败...(image-423efe-1655348985304)]

4.4 DispatchedContinuation

internal class DispatchedContinuation<in T>(
    @JvmField val dispatcher: CoroutineDispatcher,
    @JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
    ......
        override fun resumeWith(result: Result<T>) {
        ......
        if (dispatcher.isDispatchNeeded(context)) {
            ......
            // 执行调度逻辑
            dispatcher.dispatch(context, this)
        } else {
            executeUnconfined(state, MODE_ATOMIC) {
                withCoroutineContext(this.context, countOrElement) {
                    // 直接在当前线程执行
                    continuation.resumeWith(result)
                }
            }
        }
    }
}

DispatchedContinuation 是调度器中实现线程调度非常重要的 Continuation 代理类,通过持有 continuation 类并实现 resumeWith 方法来根据当前协程中是否指定了调度器来进行相应的逻辑操作。

一个指定了调度器的 Continuation 的层级关系如图所示:

[图片上传失败...(image-ab9f55-1655348985304)]

5. 协程状态机的运行机制

// 代码段 13
fun main() {
    block.startCoroutine(myContinuation)

    Thread.sleep(5000)
}

val block: suspend () -> String = {
    println("${Thread.currentThread().name} - block run")
    val test1 = suspendMethod1()
    val test2 = suspendMethod2()

    println("${Thread.currentThread().name} - test1 + test2: ${test1 + test2}")
    "block return"
}

val myContinuation = object : Continuation<String> {
    override val context: CoroutineContext
        get() = EmptyCoroutineContext

    override fun resumeWith(result: Result<String>) {
        println("${Thread.currentThread().name} - myContinuation run, result: ${result.getOrNull()}")
    }
}

suspend fun suspendMethod1(): String {
    return "suspendMethod1"
}

suspend fun suspendMethod2(): String {
    delay(1000)
    return "suspendMethod2"
}

main - block run
kotlinx.coroutines.DefaultExecutor - test1 + test2: suspendMethod1suspendMethod2
kotlinx.coroutines.DefaultExecutor - myContinuation run, result: block return

上面代码是在代码段 2 中添加了调用挂起函数的部分代码,这一节我们主要的任务就是从源码的角度来分析上面代码的执行过程,也就是在协程中调用了两个挂起函数的状态机是如何实现的。

5.1 挂起函数与状态机的关系

代码段 13 中一共有 3 个挂起函数,其中包括两个 suspend 修辞的普通函数和 1 个 suspend 修辞的高阶函数。这三个挂起函数从源码中可以看出,对应着三个状态机,也就是每一个挂起函数都对应一个自己的状态机,用于流转当前挂起函数的挂起和恢复逻辑。

public static final Object suspendMethod1(@NotNull Continuation<? super String> var0) {
        ......
        // suspendMethod1 状态机的逻辑
        switch ($continuation.label) {
            case 0: {
                ResultKt.throwOnFailure((Object)$result);
                $continuation.label = 1;
                v0 = DelayKt.delay((long)1000L, (Continuation)$continuation);
                v1 = v0;
                if (v0 != var3_3) return "suspendMethod1";
                return var3_3;
            }
            case 1: {
                ResultKt.throwOnFailure((Object)$result);
                v1 = $result;
                return "suspendMethod1";
            }
        }
        throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
    }

public static final Object suspendMethod2(@NotNull Continuation<? super String> var0) {
        ......
        // suspendMethod2 状态机逻辑
        switch ($continuation.label) {
            case 0: {
                ResultKt.throwOnFailure((Object)$result);
                $continuation.label = 1;
                v0 = DelayKt.delay((long)1000L, (Continuation)$continuation);
                v1 = v0;
                if (v0 != var3_3) return "suspendMethod2";
                return var3_3;
            }
            case 1: {
                ResultKt.throwOnFailure((Object)$result);
                v1 = $result;
                return "suspendMethod2";
            }
        }
        throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
    }
    
private static final Function1<Continuation<? super String>, Object> block = (Function1)new Function1<Continuation<? super String>, Object>(null){
        Object L$0;
        int label;
        
        @Nullable
        public final Object invokeSuspend(@NotNull Object var1_1) {
            var4_2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            // suspend lambda 状态机的逻辑
            switch (this.label) {
                case 0: {
                    ResultKt.throwOnFailure((Object)var1_1);
                    System.out.println((Object)(Thread.currentThread().getName() + " - block run"));
                    this.label = 1;
                    v0 = SimpleCoroutineKt.suspendMethod1((Continuation<? super String>)((Continuation)this));
                    if (v0 == var4_2) {
                        return var4_2;
                    }
                    ** GOTO lbl14
                }
                case 1: {
                    ResultKt.throwOnFailure((Object)$result);
                    v0 = $result;
lbl14:
                    // 2 sources

                    test1 = (String)v0;
                    this.L$0 = test1;
                    this.label = 2;
                    v1 = SimpleCoroutineKt.suspendMethod2((Continuation<? super String>)((Continuation)this));
                    if (v1 == var4_2) {
                        return var4_2;
                    }
                    ** GOTO lbl25
                }
                case 2: {
                    test1 = (String)this.L$0;
                    ResultKt.throwOnFailure((Object)$result);
                    v1 = $result;
lbl25:
                    // 2 sources

                    test2 = (String)v1;
                    System.out.println((Object)(Thread.currentThread().getName() + " - test1 + test2: " + test1 + test2));
                    return "block return";
                }
            }
            throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
        }
        ......
    };

此外,suspend lambda 中使用的状态机嵌套了 suspendMethod1 和 suspendMethod2 两个状态机的逻辑,整体流程由 suspend lambda 中的状态机来控制其流转。

当使用 suspend 修辞的函数中,没有调用任何其它的挂起函数时,称为伪挂起函数,通过反编译的代码可以看到,其内部直接返回了,并没有实现状态机相关的逻辑(老版本的 kotlin 编译器存在状态机相关代码)。

suspend fun suspendMethod1(): String {
    return "suspendMethod1"
}

// 反编译后代码
public static final Object suspendMethod1(@NotNull Continuation<? super String> $completion) {
    return "suspendMethod1";
}

5.2 状态机的流转

3.2 中分析了 startCoroutine 的启动过程,这里接着上面的分析,从 resume(Unit) 开始,通过跟踪源码的方式,说清楚整个状态机是如何流转的。

public fun <T> (suspend () -> T).startCoroutine(
    completion: Continuation<T>
) {
    createCoroutineUnintercepted(completion).intercepted().resume(Unit)
}

先回顾一下 3.2 中的分析结果:

  1. createCoroutineUnintercepted(completion) 创建了一个 suspendLambda 类型的 Continuation,并持有了 myContinuation;
  2. intercepted() 由于代码段 2 中没有配置拦截器,直接返回了前面创建的 suspendLambda 的 Continuation;
  3. resume(Unit) 启动协程并进行状态机的流转。
public inline fun <T> Continuation<T>.resume(value: T): Unit =
    resumeWith(Result.success(value))

resume(Unit) 调用了 Continuation 的 resumeWith 方法,由于当前的 Continuation 类型是一个 SuspendLambda 的 Continuation,会调用到 BaseContinuationImpl 类中的 resumeWith 方法。

// 代码段 14
internal abstract class BaseContinuationImpl(
    // myContinuation 对象
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    public final override fun resumeWith(result: Result<Any?>) {
        var current = this
        var param = result
        while (true) {
            probeCoroutineResumed(current)
            with(current) {
                val completion = completion!! // fail fast when trying to resume continuation without completion
                val outcome: Result<Any?> =
                    try {
                        // 调用点 1(状态机开始运转的地方)
                        val outcome = invokeSuspend(param)
                        // 调用点 2
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                // 调用点 3
                if (completion is BaseContinuationImpl) {
                    // unrolling recursion via loop
                    current = completion
                    param = outcome
                } else {
                    // 调用点 4
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }

    protected abstract fun invokeSuspend(result: Result<Any?>): Any?

    ......
}

代码执行到调用点 1 的 invokeSuspend(param) 方法后,会调用到 suspendLambda Continuation 中的对应实现。

// 代码段 15 suspendLambda Continuation 反编译的 invoke 方法
public final Object invokeSuspend(@NotNull Object var1_1) {
            var4_2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            switch (this.label) {
                case 0: {
                    // 检查是否有异常,如果有异常的话,直接抛出异常并结束当前协程状态机的运行
                    ResultKt.throwOnFailure((Object)var1_1);
                    System.out.println((Object)(Thread.currentThread().getName() + " - block run"));
                    this.label = 1;
                    调用点 1
                    v0 = SimpleCoroutineKt.suspendMethod1((Continuation<? super String>)((Continuation)this));
                    // v0 == COROUTINE_SUSPENDED 
                    if (v0 == var4_2) {
                        // 执行此逻辑
                        return var4_2;
                    }
                    ** GOTO lbl14
                }
                case 1: {
                    ResultKt.throwOnFailure((Object)$result);
                    v0 = $result;
lbl14:
                    // 2 sources

                    test1 = (String)v0;
                    this.L$0 = test1;
                    this.label = 2;
                    v1 = SimpleCoroutineKt.suspendMethod2((Continuation<? super String>)((Continuation)this));
                    if (v1 == var4_2) {
                        return var4_2;
                    }
                    ** GOTO lbl25
                }
                case 2: {
                    test1 = (String)this.L$0;
                    ResultKt.throwOnFailure((Object)$result);
                    v1 = $result;
lbl25:
                    // 2 sources

                    test2 = (String)v1;
                    System.out.println((Object)(Thread.currentThread().getName() + " - test1 + test2: " + test1 + test2));
                    return "block return";
                }
            }
            throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
        }

刚执行 switch (this.label) 判断,label == 0,会执行 SimpleCoroutineKt.suspendMethod1((Continuation<? super String>)((Continuation)this)) 方法,并将 suspendlambda Continuation 传递到了 suspendMethod1 方法。

// 代码段 16
@Nullable
public static final Object suspendMethod1(@NotNull Continuation<? super String> var0) {
    if (!(var0 instanceof suspendMethod1.1)) ** GOTO lbl-1000
    var2_1 = var0;
    ......

    {
        // 创建 ContinuationImpl 对象
        $continuation = new ContinuationImpl(var0){
            /* synthetic */ Object result;
            int label;

            @Nullable
            public final Object invokeSuspend(@NotNull Object $result) {
                this.result = $result;
                this.label |= Integer.MIN_VALUE;
                return SimpleCoroutineKt.suspendMethod1((Continuation<? super String>)((Continuation)this));
            }
        };
    }
    // 调用点 1
    $result = $continuation.result;
    var3_3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
    switch ($continuation.label) {
        case 0: {
            // 检查是否发现异常,如发现异常则抛出异常
            ResultKt.throwOnFailure((Object)$result);
            $continuation.label = 1;
            // 执行 delay 逻辑并传入当前状态机中创建的 continuation 对象,
            // 用于 delay 恢复的时候回调到当前状态机继续执行
            v0 = DelayKt.delay((long)1000L, (Continuation)$continuation);
            v1 = v0;
            // v0 = COROUTINE_SUSPENDED 与 var3_3 相等
            if (v0 != var3_3) return "suspendMethod1";
            // 逻辑走这里
            return var3_3;
        }
        case 1: {
            ResultKt.throwOnFailure((Object)$result);
            v1 = $result;
            return "suspendMethod1";
        }
    }
    throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}

suspendMethod1 中状态机的流转逻辑如下:

  1. 创建 ContinuationImpl 实例,用于 delay(1000) 函数恢复后调用其 invokeSuspend 函数回调到当前状态机继续执行下面的流程。
  2. 由于刚开始进入 label == 0,会执行 case 0 的逻辑,case 0 的主要逻辑如下:
    2.1 检查是否产生了异常,如果发现有异常则直接抛出当前异常,结束协程状态机的执行;
    2.2 调用 delay(1000) 代码,并将当前 suspendMethod1 中创建的 Continuation 对象传递过去,用于 delay(1000) 挂起函数恢复后的回调。
    2.3 由于 delay(1000) 在调用时会先返回 COROUTINE_SUSPENDED 挂起标识,不会走 if (v0 != var3_3) 的逻辑,这里直接走 return COROUTINE_SUSPENDED 的逻辑。
  3. case 0 中的返回的 COROUTINE_SUSPENDED 值会返回到代码段 15 中的调用点 1 的位置,由于 if (v0 == var4_2) 为 true,继续返回 COROUTINE_SUSPENDED 到代码段 14 中的调用点 1 的位置。
  4. 代码段 14 中 if (outcome === COROUTINE_SUSPENDED) 为 true,直接返回跳出 while 循环,完成当前协程的挂起操作。
  5. delay(1000) 延迟时间到达后,调用 suspendMethod1 中 ContinuationImpl 实例的 resumeWith() 恢复挂起状态并调用 suspendMethod1 内部创建的 invokeSuspend, 恢复到 suspendMethod1 中继续执行 suspendMethod1 中下面的逻辑。
public final Object invokeSuspend(@NotNull Object $result) {
    this.result = $result;
    this.label |= Integer.MIN_VALUE;
    // 恢复到 suspendMethod1 中继续执行下面的代码
    return SimpleCoroutineKt.suspendMethod1((Continuation<? super String>)((Continuation)this));
}
  1. 由于从 delay(1000) 恢复的时候,SimpleCoroutineKt.suspendMethod1((Continuation<? super String>)((Continuation)this)) 这里传递的 Continuation 是 suspendMethod1 之前创建的 ContinuationImpl 实例,代码段 16 调用点 1 中的值为 delay(1000) 执行完成后返回的 Unit 值。
  2. 此时 label == 1,走 case 1 中的代码,直接将 “suspendMethod1” 字符串值返回到了 suspendLambda Continuation 的 invokeSuspend 方法中继续执行相应的代码(这里可能是代码反编译代码的问题,真正的流程应该是 suspendMethod 挂起恢复后,执行完方法中的逻辑后,调用之前传入的 suspendLambda Continuation 对象的 resumeWith 方法中的 invokeSuspend 方法,返回到 suspendLambda Continuation 中继续执行 suspendMethod2 的代码)。
  3. suspend1Method 执行完成后返回到 suspendLambda 实例的 invokeSuspend 方法继续执行 case 1 中的逻辑,这里主要处理 suspendMethod2 的调用,与 suspendMethod1 完全一样,这里不再赘述。这里主要从 suspendMethod2 挂起恢复后的代码说起。
  4. suspendMethod2 执行完成后通过调用 suspendLabmda Continuation 中的 resumeWith 方法中的 invokeSuspend 方法,回到了 suspendLambda Continuation 中继续执行 case 2 的逻辑并返回 “block return” 到代码段 14 中调用点 1 继续执行下面的代码。
  5. 由于 outcome == “block return” 非 COROUTINE_SUSPENDED,所以会执行下面的判断:
if (completion is BaseContinuationImpl) {
    current = completion
    param = outcome
} else {
    // 走这里的逻辑
    completion.resumeWith(outcome)
    return
}

这里的 completion 为 myContinuation,其直接实现了 Continuation 接口,非 BaseContinuationImpl 类型,这里会调用 myContinuation 中的 resumeWith 函数并打印 “myContinuation run, result: block return”,打印后通过返回结束状态机的执行。

到这里整个协程状态机的执行流程就走完了,下面我通过图解的方式再走一遍整个状态机的流转过程,加深理解。

代码调用图解:

BaseContinuationImpl 流转.png
suspendLambda Continuation 流转.png
suspendMethod1 流转.png
suspendMethod2 流转.png

代码执行时序图:

Coroutine State Machine(1).png

6. 总结

  1. 挂起函数是 Coroutine 实现的核心,挂起函数只能在挂起函数或者协程体内部被调用,而协程体内部也是一个 suspend 修辞的 block lambda,所以,挂起函数只能被挂起函数调用
  2. 普通函数无法直接调用挂起函数是因为挂起函数在经过 kotlin 编译器编译后,会接收一个实现了 Continuation 接口的实例对象,在普通函数中并没有传递这个参数,所以,默认无法被调用。但在 JAVA 代码中可以通过实现 Continuation 接口传入这个对象。suspend 函数可以调用 suspend 函数的原因是 Kotlin 编译器会将当前 suspend 接收到的 continuation 实例对象传递给其调用的其它 suspend 函数。
  3. Kotlin 编译器在编译 suspend 函数所进行的代码转换过程被称为 CPS(Continuation-Passing-Style Transformation),其主要操作是将挂起函数同步操作的代码转换成 Callback 异步调用,也就是挂起函数的本质是通过 CPS 转换将同步调用的代码转换成 Callback 异步回调
  4. suspend () -> String 会被 Kotlin 编译器编译成继承自 SuspendLambda 抽象类的 Continuation 类,其最终继续自 BaseContinuationImpl 抽象类。
  5. BaseContinuationImpl 是整个状态机流转的核心,所有挂起函数内部的挂起和恢复都是通过该类中的 resumeWith() 函数完成的。
  6. 协程中常见的几种 Continuation 实例包括 completion Continuation、suspendLambda Continuation、SafeContinuation 和 DispatchContinuation(或者自定义的拦截器 Continuation)。其中 completion Continuation 主要负责当协程体 block 中的代码执行完成返回后,调用其 resumeWith 继续执行协程级别的 Continuation;suspendLambda Continuation 主要负责从其它直接被 block 所调用的挂起函数恢复后回调当前 suspendLambda Continuation 中的 resumeWith 方法返回到 block 代码块中继续执行对应挂起函数下面的代码;SafeContinuation 主要是 suspendCoroutine 在对 suspendCoroutineUninterceptedOrReturn 进行封装在内部实现安全挂起时所使用的 Continuation;DispatchContinuation(或者自定义的拦截器 Continuation)是调度器中实现线程调度非常重要的 Continuation 代理。
  7. startCoroutine 启动一个协程的主要流程为:创建一个接收 Continuation 实例的 SuspendLambda 类型的 Continuation;然后调用 “intercepted()” 函数,如果协程中指定了拦截器,会再次创建一个接收 SuspendLambda Continuation 的 DispatchedContinuation 的 Continuation 实例,如果没有指定,则返回 前一步创建的 SuspendLambda Continuation 实例;最后调用 resume(Unit) 方法启动协程状态机的流转。
  8. 挂起函数与状态机的关系是每一个挂起函数(非伪挂起函数)内部都有一个状态机,挂起函数的状态机流转由 SuspendLambda Continuation 中的状态机控制,当 SuspendLambda Continuation 内部状态机执行完成会返回到协程级别的状态机(myContinuation)去执行相应的逻辑,并最终完成当前协程的执行。

相关文章

网友评论

      本文标题:【Coroutine 源码】Coroutine 状态机实现源码分

      本文链接:https://www.haomeiwen.com/subject/ucxqvrtx.html