美文网首页程序员
破解Kotlin协程创建调用的那些事

破解Kotlin协程创建调用的那些事

作者: Android阿南 | 来源:发表于2021-02-03 15:03 被阅读0次

    Kotlin协程从1.3正式版除出来也很久了,相比大家伙也比较熟悉了,从Android的AAC架构到后后端都可以见到它的身影,那么问题来了,用了那么久的协程体你知道它怎么创建的么。

    一天我问同事:你知不知道协程是怎么创建的?

    同事:我知道,是launchasync...

    我:?????

    我:那如果抛开官方给的框架,就用Kotlin语言提供的最基本的API创建呢?

    同事:????,那你创个毛。

    此时人送外号逼王的我,知道机会来了。于是乎就有了这个文章。

    预备知识

    首先,你需要知道协程是啥玩意,知道基本概念之后在来看(知道的话,当我没有说

    好戏开场了

    我们抛开官方的框架,只用语言最基本的API来洞悉Kotlin协程的内部设计。

    为了接下来方便带领大家分析,我把用到的概念和API给贴出来,方便分析。

    一般启动协程我们需要:

    1、一个suspend fun

    suspend fun foo() {}
    复制代码
    

    2、一个启动协程的API:startCoroutine,如果不想要立即执行,则可用:createCoroutine

    @SinceKotlin("1.3")
    @Suppress("UNCHECKED_CAST")
    public fun <T> (suspend () -> T).startCoroutine(
        completion: Continuation<T>
    ) {
        createCoroutineUnintercepted(completion).intercepted().resume(Unit)
    }
    复制代码
    
    @SinceKotlin("1.3")
    @Suppress("UNCHECKED_CAST")
    public fun <T> (suspend () -> T).createCoroutine(
        completion: Continuation<T>
    ): Continuation<Unit> =
        SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)
    
    复制代码
    

    上面创建协程和启动协程中我们看到有一个非常关键的方法:createCoroutineUnintercepted方法实现如下(具体功能稍后在说:

    @SinceKotlin("1.3")
    public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
        completion: Continuation<T>
    ): Continuation<Unit> {
        val probeCompletion = probeCoroutineCreated(completion)
        return if (this is BaseContinuationImpl)
            create(probeCompletion)
        else
            createCoroutineFromSuspendFunction(probeCompletion) {
                (this as Function1<Continuation<T>, Any?>).invoke(it)
            }
    }
    复制代码
    

    3、协程执行完后需要一个 completion 来回调,即: Continuation

    @SinceKotlin("1.3")
    public interface Continuation<in T> {
        /**
         * The context of the coroutine that corresponds to this continuation.
         */
        public val context: CoroutineContext
    
        /**
         * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
         * return value of the last suspension point.
         */
        public fun resumeWith(result: Result<T>)
    }
    复制代码
    

    4、BaseContinuationImpl

    @SinceKotlin("1.3")
    internal abstract class BaseContinuationImpl(
        // This is `public val` so that it is private on JVM and cannot be modified by untrusted code, yet
        // it has a public getter (since even untrusted code is allowed to inspect its call stack).
        public val completion: Continuation<Any?>?
    ) : Continuation<Any?>, CoroutineStackFrame, Serializable {
        // This implementation is final. This fact is used to unroll resumeWith recursion.
        public final override fun resumeWith(result: Result<Any?>) {
            // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
            var current = this
            var param = result
            while (true) {
                // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
                // can precisely track what part of suspended callstack was already resumed
                probeCoroutineResumed(current)
                with(current) {
                    val completion = completion!! // fail fast when trying to resume continuation without completion
                    val outcome: Result<Any?> =
                        try {
                            val outcome = invokeSuspend(param)
                            if (outcome === COROUTINE_SUSPENDED) return
                            Result.success(outcome)
                        } catch (exception: Throwable) {
                            Result.failure(exception)
                        }
                    releaseIntercepted() // this state machine instance is terminating
                    if (completion is BaseContinuationImpl) {
                        // unrolling recursion via loop
                        current = completion
                        param = outcome
                    } else {
                        // top-level completion reached -- invoke and return
                        completion.resumeWith(outcome)
                        return
                    }
                }
            }
        }
    }
    复制代码
    

    5、ContinuationImpl

    @SinceKotlin("1.3")
    // State machines for named suspend functions extend from this class
    internal abstract class ContinuationImpl(
        completion: Continuation<Any?>?,
        private val _context: CoroutineContext?
    ) : BaseContinuationImpl(completion) {
        constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
    
        public override val context: CoroutineContext
            get() = _context!!
    
        @Transient
        private var intercepted: Continuation<Any?>? = null
    
        public fun intercepted(): Continuation<Any?> =
            intercepted
                ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                    .also { intercepted = it }
    
        protected override fun releaseIntercepted() {
            val intercepted = intercepted
            if (intercepted != null && intercepted !== this) {
                context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted)
            }
            this.intercepted = CompletedContinuation // just in case
        }
    }
    复制代码
    

    6、SuspendLambda

    @SinceKotlin("1.3")
    // Suspension lambdas inherit from this class
    internal abstract class SuspendLambda(
        public override val arity: Int,
        completion: Continuation<Any?>?
    ) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction {
        constructor(arity: Int) : this(arity, null)
    
        public override fun toString(): String =
            if (completion == null)
                Reflection.renderLambdaToString(this) // this is lambda
            else
                super.toString() // this is continuation
    }
    复制代码
    

    7、SuspendLambda的继承顺序为-> SuspendLambda:-> ContinuationImpl:-> BaseContinuationImpl

    8、suspend ()-> Unit 会被编译器编译成一个匿名类 extends SuspendLambda implements Function1<P, R> 并且实现 invokeSuspend 和 create 两个抽象方法。

    9、使用 suspend 修饰的函数编译器会自动添加一个Continuation。

    中场开始

    有了前面的理论,不上代码感觉总是怪怪的。

    首先我们定义一个launch用来启动我们的代码:

    代码有所改动,并不是前面所看到的startCoroutine里定义的那样,我做了个精简。 放心这样是完全可行的。

    fun <T> launch(block: suspend () -> T) {
        val coroutine = block.createCoroutineUnintercepted(object : Continuation<T> {
            override val context: CoroutineContext
                get() = EmptyCoroutineContext
    
            override fun resumeWith(result: Result<T>) {
                println("result=$result")
            }
        })
        coroutine.resume(Unit)
    }
    复制代码
    

    接着是我们的suspend函数(这里并没有真正的挂起,因为没有切线程:

    suspend fun loadImage() = suspendCoroutine<String> {
        it.resume("https://www.baidu.com/img/PCtm_d9c8750bed0b3c7d089fa7d55720d6cf.png")
    }
    复制代码
    

    看一下整体代码:

    import kotlin.coroutines.*
    import kotlin.coroutines.intrinsics.createCoroutineUnintercepted
    
    fun main() {
        launch {
            val image = loadImage()
            println("image=$image")
        }
    }
    
    fun <T> launch(block: suspend () -> T) {
        val coroutine = block.createCoroutineUnintercepted(object : Continuation<T> {
            override val context: CoroutineContext
                get() = EmptyCoroutineContext
    
            override fun resumeWith(result: Result<T>) {
                println("resumeWith=$result")
            }
        })
        coroutine.resume(Unit)
    }
    
    suspend fun loadImage() = suspendCoroutine<String> {
        it.resume("https://www.baidu.com/img/PCtm_d9c8750bed0b3c7d089fa7d55720d6cf.png")
    }
    复制代码
    

    OK,都准备完了,那就运行把。biu~ biu~ 看结果:

    [图片上传中...(image-1cca33-1612335597534-0)]

    结果很完美。不知道你们好不好奇协程怎么创建的,怎么调用到resumeWith的。有基础的同学可能知道编译器会给suspend fun 加一个Continuation,但具体是怎么调用这个 Continuation也不是很清楚。

    好吧,本着我不入地狱谁入地狱,有了前面的基础和问题,我们一步一步来,剖开协程内部创建调用流程的那些事。

    接下来我们把代码给反编译了,看看他真正庐山真面目:

    public final class KoroutineKt {
        public static final void main() {
            launch(new KoroutineKt$main$1(null));
        }
    
        static final class KoroutineKt$main$1 extends SuspendLambda implements Function1<Continuation<? super Unit>, Object> {
            int label;
    
            @Nullable
            public final Object invokeSuspend(@NotNull Object $result) {
                String image;
                Object object = IntrinsicsKt.getCOROUTINE_SUSPENDED();
                switch (this.label) {
                    case 0:
                        this.label = 1;
                        if (KoroutineKt.loadImage((Continuation<? super String>) this) == object)
                            return object;
                        image = (String) KoroutineKt.loadImage((Continuation<? super String>) this);
                        System.out.println(image);
                        return Unit.INSTANCE;
                    case 1:
                        System.out.println(image);
                        return Unit.INSTANCE;
                }
                throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }
    
            KoroutineKt$main$1(Continuation param1Continuation) {
                super(1, param1Continuation);
            }
    
            @NotNull
            public final Continuation<Unit> create(@NotNull Continuation completion) {
                Intrinsics.checkNotNullParameter(completion, "completion");
                return (Continuation<Unit>) new KoroutineKt$main$1(completion);
            }
    
            public final Object invoke(Object param1Object) {
                return ((KoroutineKt$main$1) create((Continuation) param1Object)).invokeSuspend(Unit.INSTANCE);
            }
        }
    
        public static final <T> void launch(@NotNull Function1 block) {
            Continuation coroutine = IntrinsicsKt.createCoroutineUnintercepted(block, new KoroutineKt$launch$coroutine$1());
            Continuation intercepted = IntrinsicsKt.intercepted(coroutine);
            Continuation continuation1 = intercepted;
            Unit unit = Unit.INSTANCE;
            Result.Companion companion = Result.Companion;
            continuation1.resumeWith(Result.constructor - impl(unit));
        }
    
        public static final class KoroutineKt$launch$coroutine$1 implements Continuation<T> {
            @NotNull
            public CoroutineContext getContext() {
                return (CoroutineContext) EmptyCoroutineContext.INSTANCE;
            }
    
            public void resumeWith(@NotNull Object result) {
                String str = "result=" + Result.toString - impl(result);
                System.out.println(str);
            }
        }
    
        @Nullable
        public static final Object loadImage(@NotNull Continuation completion) {
            SafeContinuation safeContinuation = new SafeContinuation(IntrinsicsKt.intercepted($completion));
            Continuation continuation = (Continuation) safeContinuation;
            String str = "https://www.baidu.com/img/PCtm_d9c8750bed0b3c7d089fa7d55720d6cf.png";
            Result.Companion companion = Result.Companion;
            continuation.resumeWith(Result.constructor - impl(str));
            if (safeContinuation.getOrThrow() == IntrinsicsKt.getCOROUTINE_SUSPENDED())
                DebugProbesKt.probeCoroutineSuspended(completion);
            return safeContinuation.getOrThrow();
        }
    }
    复制代码
    

    为了保证原汁原味,代码我只删除了部分调试代码。

    反编译后的代码,有两个匿名内部类。

    KoroutineKt$main$1 是我们的suspend () -> Unit,正如我前面所说的那样,他会被编译成一个匿名内部类 extends SuspendLambda implements Function1< P1,R>。

    KoroutineKt$launch$coroutine$1 这个是我们的 completion 就是那个object: Continuation的这个,不过我们不用太care它,知道他是我们suspend fun 最终会回调到这就行了。

    首先我们看launch()方法。

    public static final<T> void launch(@NotNull Function1 block){
        Continuation coroutine=IntrinsicsKt.createCoroutineUnintercepted(block,new KoroutineKt\$launch\$coroutine\$1());
        Continuation intercepted=IntrinsicsKt.intercepted(coroutine);
        Continuation continuation1=intercepted;
        Unit unit=Unit.INSTANCE;
        Result.Companion companion=Result.Companion;
        continuation1.resumeWith(Result.constructor-impl(unit));
    }
    复制代码
    

    launch()中最重要的一个方法就是createCoroutineUnintercepted(),他是我们创建协程最重要的一步。

    main()方法中,我们编译器自动给我们创建了一个 completion 为 null 的 KoroutineKt<math xmlns="http://www.w3.org/1998/Math/MathML"><semantics><mrow><mi>m</mi><mi>a</mi><mi>i</mi><mi>n</mi></mrow><annotation encoding="application/x-tex">main</annotation></semantics></math>main1, 进入launch()内部,首先执行createCoroutineUnintercepted,在这里创建我们的在completion,然后把我们的block和completion给传给createCoroutineUnintercepted方法, 前面我已经知道了 createCoroutineUnintercepted 会判断自己是不是 BaseContinuationImpl 的实现类,如果是,则调用create方法,否则这直接调用Function.invoke。

    是不是突然明白了什么? 我们看一下KoroutineKt<math xmlns="http://www.w3.org/1998/Math/MathML"><semantics><mrow><mi>m</mi><mi>a</mi><mi>i</mi><mi>n</mi></mrow><annotation encoding="application/x-tex">main</annotation></semantics></math>main1的create方法。

    @NotNull
    public final Continuation<Unit> create(@NotNull Continuation completion) {
        Intrinsics.checkNotNullParameter(completion, "completion");
        return (Continuation<Unit>) new KoroutineKt\$main\$1(completion);
    }
    复制代码
    

    create()方法把我们的completion给传入创建一个新的KoroutineKt<math xmlns="http://www.w3.org/1998/Math/MathML"><semantics><mrow><mi>m</mi><mi>a</mi><mi>i</mi><mi>n</mi></mrow><annotation encoding="application/x-tex">main</annotation></semantics></math>main1,并返回了一个Continuation<Unit>,至此我们的协程已经创建完成了。

    协程创建完了,是不是该启动了,于是continuation调用resumeWith(Unit)启动协程。

    这里的 continuation 可是继承自 BaseContinuationImpl 的哎,于是乎进入到了 BaseContinuationImpl 的 resumeWith。

    我们再看一下 BaseContinuationImpl 的 resumeWith 干了啥,上代码。

    public final override fun resumeWith(result: Result<Any?>) {
            // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
            var current = this
            var param = result
            while (true) {
                // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
                // can precisely track what part of suspended callstack was already resumed
                probeCoroutineResumed(current)
                with(current) {
                    val completion = completion!! // fail fast when trying to resume continuation without completion
                    val outcome: Result<Any?> =
                        try {
                            val outcome = invokeSuspend(param)
                            if (outcome === COROUTINE_SUSPENDED) return
                            Result.success(outcome)
                        } catch (exception: Throwable) {
                            Result.failure(exception)
                        }
                    releaseIntercepted() // this state machine instance is terminating
                    if (completion is BaseContinuationImpl) {
                        // unrolling recursion via loop
                        current = completion
                        param = outcome
                    } else {
                        // top-level completion reached -- invoke and return
                        completion.resumeWith(outcome)
                        return
                    }
                }
            }
        }
    复制代码
    

    在 resumeWith 中,调用了invokeSuspend(),于是乎进入到我们写的block中。

    public final Object invokeSuspend(@NotNull Object \$result) {
                String image;
                Object object = IntrinsicsKt.getCOROUTINE_SUSPENDED();
                switch (this.label) {
                    case 0:
                        this.label = 1;
                        if (KoroutineKt.loadImage((Continuation<? super String>) this) == object)
                            return object;
                        image = (String) KoroutineKt.loadImage((Continuation<? super String>) this);
                        System.out.println(image);
                        return Unit.INSTANCE;
                    case 1:
                        System.out.println(image);
                        return Unit.INSTANCE;
                }
                throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }
    复制代码
    

    看到这,有没有觉得我们识破了庐山真面目了,为啥这样子说呢。

    我觉得已经很明显了啊~

    首先 我们知道 label 最开始为0 ,经过switch,进入了case 0,label=1,然后执行我们的loadImage,如果我们的loadImage是一个真正在网络请求图片方法(即:已经挂起,那么返回的一定是 COROUTINE_SUSPENDED。

    根据上面的代码,我们知道,如果返回了COROUTINE_SUSPENDED,直接return invokeSuspend,接着在 BaseContinuationImpl 的 resumeWith 中 判断invokeSuspend 返回是不是 COROUTINE_SUSPENDED, 如果是则return 退出 while 结束循环,如果不是则拿到数据,组成Result,调用我们的 completion 把结果返回,我们的协程也就结束了。

    到这,可能有人会问了,你这都return怎么做的挂起呢?

    还记得最开始我说的如果函数用了suspend修饰编译器自动会加一个 Continuation 么,我们的 loadImage 就是 suspend 修饰的编译器当然会给它生成一个 continuation 的参数啊,在switch case 0 中,我们已经把自己传给 loadImage 了啊,等到 loadImage 拿到数据后,调用我们的 continuation.resumeWith,别忘了我们的 continuation 可是 BaseContinuationImpl,调用resumeWith,之后肯定又会执行 invokeSuspend,进入 switch 分支,只不过,在执行loadImage之前,我们 label 已经为 1,switch 会进入 case 1 ,这个时候我们已经有image数据了,然后调用println输出数据。

    好了,到这里我们的协程的创建和调用就差不多就弄的清清楚楚了。

    完美落幕

    我们总结一下吧:

    协程的创建:

    1、首先new 一个 没有 completion 的 SuspendLambda,实现了 invokeSuspend 和 create两个抽象方法。

    2、在创建协程的时候,用我们的 completion 调用最开始创建的 SuspendLambda的create方法,new一个带有completion的SuspendLambda,返回一个Continuation<Unit>

    3、调用resumeWith(Unit)启动协程。

    协程的挂起与恢复:

    1、协程挂起标志 COROUTINE_SUSPENDED,退出 while 。

    2、结果返回后调用resumeWith继续协程。

    作者:泽被苍生
    链接:https://juejin.cn/post/6924902656339116045

    相关文章

      网友评论

        本文标题:破解Kotlin协程创建调用的那些事

        本文链接:https://www.haomeiwen.com/subject/kutmtltx.html