美文网首页Kotlin编程
Kotlin Coroutine suspend 原理解析

Kotlin Coroutine suspend 原理解析

作者: wo883721 | 来源:发表于2021-07-21 13:30 被阅读0次

    一. 回调地狱

    1.1 同步操作

    假如我们有这样一个需求:

    fun childFun1(): Int {
        return 10
    }
    
    fun childFun2(): Int {
        return 20
    }
    
    fun childFun3(num1: Int, num2: Int): Int {
        return num1 + num2
    }
    
    fun parentFun() : Int {
        val num1 = childFun1()
        val num2 = childFun2()
        val sum = childFun3(num1, num2)
        return sum
    }
    

    即: 从多个操作中(childFun1, childFun2) 获取值,然后再对这些值进行处理(childFun3),程序逻辑非常直观易懂。

    例如,先上传用户选择的图片到服务端储存,获取服务端返回的图片对应的地址,然后这些地址设置到对应位置。

    1.2 异步操作

    如果这些操作都是耗时操作,为了不阻塞线程,需要将这些耗时操作放到其他线程中,即

    val executor: ExecutorService = Executors.newFixedThreadPool(2)
    
    fun childFun1(callback: (Int) -> Unit): Unit {
        executor.execute {
            Thread.sleep(1000)
            callback(10)
        }
    }
    
    fun childFun2(callback: (Int) -> Unit): Unit {
        executor.execute {
            Thread.sleep(2000)
            callback(20)
        }
    }
    
    fun childFun3(num1: Int, num2: Int, callback: (Int) -> Unit): Unit {
        executor.execute {
            Thread.sleep(500)
            callback(num1 + num2)
        }
    }
    
    fun parentFun(callback: (Int) -> Unit) : Unit {
        childFun1(fun(num1) {
            childFun2(fun(num2) {
                childFun3(num1, num2, callback)
            })
        })
    }
    

    因为是异步操作,结果值不能直接返回,只能通过 callback 方式异步回传,所以当异步操作很多的时候,整个回调链就很长了,让代码逻辑显得不清晰。

    1.3 协程

    suspend fun childFun1(): Int {
        delay(1000)
        return 10
    }
    
    suspend fun childFun2(): Int {
        delay(2000)
        return 20
    }
    
    suspend fun childFun3(num1: Int, num2: Int): Int {
        delay(500)
        return num1 + num2
    }
    
    suspend fun parentFun() : Int {
        val num1 = childFun1()
        val num2 = childFun2()
        val sum = childFun3(num1, num2)
        return sum
    }
    

    我们可以看到和同步方式操作一模一样,只不过方法上多了 suspend 关键字而已。

    注: 这里的 delay 方法,不会像 Thread.sleep 阻塞当前线程。

    二. suspend 原理

    上面说过,suspend 不会阻塞当前线程,那么它怎么将异步操作的数据,同步传递回来呢?答案其实也是回调,只不过隐藏的很深,我们慢慢分析。

    suspend fun childFun1(): Int {
        Thread.sleep(1000)
        return 10
    }
    
    suspend fun childFun2(): Int {
        Thread.sleep(2000)
        return 20
    }
    
    suspend fun childFun3(num1: Int, num2: Int): Int {
        Thread.sleep(500)
        return num1 + num2
    }
    
    suspend fun parentFun() : Int {
        val num1 = childFun1()
        val num2 = childFun2()
        val sum = childFun3(num1, num2)
        return sum
    }
    

    这里将 delay 换成 Thread.sleep ,先看 suspend 反编译的 java 代码

    public final class CoroutineKt {
       @Nullable
       public static final Object childFun1(@NotNull Continuation $completion) {
          Thread.sleep(1000L);
          return Boxing.boxInt(10);
       }
    
       @Nullable
       public static final Object childFun2(@NotNull Continuation $completion) {
          Thread.sleep(2000L);
          return Boxing.boxInt(20);
       }
    
       @Nullable
       public static final Object childFun3(int num1, int num2, @NotNull Continuation $completion) {
          Thread.sleep(500L);
          return Boxing.boxInt(num1 + num2);
       }
    
       @Nullable
       public static final Object parentFun(@NotNull Continuation $completion) {
          Object $continuation;
          label37: {
             if ($completion instanceof <undefinedtype>) {
                $continuation = (<undefinedtype>)$completion;
                if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {
                   ((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
                   break label37;
                }
             }
    
             $continuation = new ContinuationImpl($completion) {
                // $FF: synthetic field
                Object result;
                int label;
                int I$0;
                int I$1;
    
                @Nullable
                public final Object invokeSuspend(@NotNull Object $result) {
                   this.result = $result;
                   this.label |= Integer.MIN_VALUE;
                   return CoroutineKt.parentFun(this);
                }
             };
          }
    
          Object var10000;
          label31: {
             int num1;
             int num2;
             Object var6;
             label30: {
                Object $result = ((<undefinedtype>)$continuation).result;
                var6 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
                switch(((<undefinedtype>)$continuation).label) {
                case 0:
                   ResultKt.throwOnFailure($result);
                   ((<undefinedtype>)$continuation).label = 1;
                   var10000 = childFun1((Continuation)$continuation);
                   if (var10000 == var6) {
                      return var6;
                   }
                   break;
                case 1:
                   ResultKt.throwOnFailure($result);
                   var10000 = $result;
                   break;
                case 2:
                   num1 = ((<undefinedtype>)$continuation).I$0;
                   ResultKt.throwOnFailure($result);
                   var10000 = $result;
                   break label30;
                case 3:
                   num2 = ((<undefinedtype>)$continuation).I$1;
                   num1 = ((<undefinedtype>)$continuation).I$0;
                   ResultKt.throwOnFailure($result);
                   var10000 = $result;
                   break label31;
                default:
                   throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
                }
    
                num1 = ((Number)var10000).intValue();
                ((<undefinedtype>)$continuation).I$0 = num1;
                ((<undefinedtype>)$continuation).label = 2;
                var10000 = childFun2((Continuation)$continuation);
                if (var10000 == var6) {
                   return var6;
                }
             }
    
             num2 = ((Number)var10000).intValue();
             ((<undefinedtype>)$continuation).I$0 = num1;
             ((<undefinedtype>)$continuation).I$1 = num2;
             ((<undefinedtype>)$continuation).label = 3;
             var10000 = childFun3(num1, num2, (Continuation)$continuation);
             if (var10000 == var6) {
                return var6;
             }
          }
    
          int sum = ((Number)var10000).intValue();
          return Boxing.boxInt(sum);
       }
    }
    

    2.1 Continuation

    我们注意到 suspend 修饰的方法,转成 java 方法时,会在方法最后面添加上 Continuation 类型的参数:

    suspend fun childFun3(num1: Int, num2: Int): Int {
        Thread.sleep(500)
        return num1 + num2
    }
    //  变成了
    @Nullable
    public static final Object childFun3(int num1, int num2, @NotNull Continuation $completion) {
          Thread.sleep(500L);
          return Boxing.boxInt(num1 + num2);
    }
    

    这个 Continuation 实例非常重要,它是协程能够实现异步回调的关键对象。

    /**
     * Interface representing a continuation after a suspension point that returns a value of type `T`.
     */
    @SinceKotlin("1.3")
    public interface Continuation<in T> {
        /**
         * The context of the coroutine that corresponds to this continuation.
         */
        public val context: CoroutineContext
    
        /**
         * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
         * return value of the last suspension point.
         */
        public fun resumeWith(result: Result<T>)
    }
    
    1. context : 协程的上下文对象,里面储存着协程上下文环境信息,比如父协程,线程协调器等
    2. resumeWith 方法: 唤醒协程继续执行的方法

    注: 这里的唤醒,并不是说协程被线程阻塞了

    2.2 协程体

    观察 childFun1childFun2childFun3 方法,除了参数上多了一个 $completion 参数,并没有其他变化,那是因为这三个方法中,没有调用其他 suspend 方法,所以和普通函数没有多大区别。

    suspend fun parentFun() : Int {
        val num1 = childFun1()
        val num2 = childFun2()
        val sum = childFun3(num1, num2)
        return sum
    }
    
    // 转变成了
    
     @Nullable
       public static final Object parentFun(@NotNull Continuation $completion) {
          Object $continuation;
          label37: {
             if ($completion instanceof <undefinedtype>) {
                $continuation = (<undefinedtype>)$completion;
                if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {
                   ((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
                   break label37;
                }
             }
    
             $continuation = new ContinuationImpl($completion) {
                // $FF: synthetic field
                Object result;
                int label;
                int I$0;
                int I$1;
    
                @Nullable
                public final Object invokeSuspend(@NotNull Object $result) {
                   this.result = $result;
                   this.label |= Integer.MIN_VALUE;
                   return CoroutineKt.parentFun(this);
                }
             };
          }
    
          Object var10000;
          label31: {
             int num1;
             int num2;
             Object var6;
             label30: {
                Object $result = ((<undefinedtype>)$continuation).result;
                var6 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
                switch(((<undefinedtype>)$continuation).label) {
                case 0:
                   ResultKt.throwOnFailure($result);
                   ((<undefinedtype>)$continuation).label = 1;
                   var10000 = childFun1((Continuation)$continuation);
                   if (var10000 == var6) {
                      return var6;
                   }
                   break;
                case 1:
                   ResultKt.throwOnFailure($result);
                   var10000 = $result;
                   break;
                case 2:
                   num1 = ((<undefinedtype>)$continuation).I$0;
                   ResultKt.throwOnFailure($result);
                   var10000 = $result;
                   break label30;
                case 3:
                   num2 = ((<undefinedtype>)$continuation).I$1;
                   num1 = ((<undefinedtype>)$continuation).I$0;
                   ResultKt.throwOnFailure($result);
                   var10000 = $result;
                   break label31;
                default:
                   throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
                }
    
                num1 = ((Number)var10000).intValue();
                ((<undefinedtype>)$continuation).I$0 = num1;
                ((<undefinedtype>)$continuation).label = 2;
                var10000 = childFun2((Continuation)$continuation);
                if (var10000 == var6) {
                   return var6;
                }
             }
    
             num2 = ((Number)var10000).intValue();
             ((<undefinedtype>)$continuation).I$0 = num1;
             ((<undefinedtype>)$continuation).I$1 = num2;
             ((<undefinedtype>)$continuation).label = 3;
             var10000 = childFun3(num1, num2, (Continuation)$continuation);
             if (var10000 == var6) {
                return var6;
             }
          }
    
          int sum = ((Number)var10000).intValue();
          return Boxing.boxInt(sum);
       }
    

    我们发现 parentFun 方法转成的 java 代码,比我们想象中的要多,这个就是协程实现的秘密。

    方法流程分析:

    1. 创建 $continuation 对象, 它是 ContinuationImpl 类的实例,并且储存了方法参数的 $completion 实例,作用是当本协程体(parentFun 方法) 执行完毕之后,会回调 $completion 实例的 resumeWith 方法,唤醒调用方的协程。

    注: 当调用 $continuation 对象的 resumeWith 方法会调用 invokeSuspend 方法,就会再次调用 parentFun 方法。

    1. label 不同的时候,执行的逻辑不同:
      1 . 当 label = 0 时,先将 label 设置成 1 ,并调用 childFun1 方法,参数就是当前协程体 $continuation。如果返回的结果值是 COROUTINE_SUSPENDED ,那么就直接 parentFun 方法退出,实现协程挂起。
      2 . 当 label = 1 时,一定是 childFun1 方法内部通过 $continuationresumeWith 方法回调来的,得到 childFun1 方法异步结果值。我们的例子中,不会走到这一步,因为我们直接返回了结果值。
      3 . 当 label = 2 时, 同上,是 childFun2 方法内部通过 $continuationresumeWith 方法回调来的,得到 childFun2 方法异步结果值。
      4 . 当 label = 3 时, 同上原理。

    即: label = 1 表示调用了 childFun1 方法;label = 2 表示调用了 childFun2 方法;label = 3 表示调用了 childFun3 方法。并等待 resumeWith 回调带来的结果值。

    这种实现方式,我们称之为状态机。

    2.3 协程函数的原理

    通过上面的分析,我们了解到协程是如何实现挂起和恢复的。
    不同于线程的阻塞和唤醒,协程的挂起是方法直接返回,不执行接下来的代码,它的恢复是通过被调用放来实现的。
    以上面的例子为例:

    1. parentFun 方法先调用了 childFun1 协程方法,并将自己的协程体 $continuation 传递给 childFun1 方法
    2. 如果 childFun1 方法返回值是 COROUTINE_SUSPENDED ,那么 parentFun 方法直接退出,不会执行 childFun2 方法;直到 childFun1 方法调用 $continuationresumeWith 方法,就会重新调用 parentFun 方法,并因为 label = 1,直接获取结果值,再调用 childFun2 方法。
    3. 如果 childFun1 方法返回值不是 COROUTINE_SUSPENDED,即不需要挂起,那么就继续调用 childFun2 方法。
    4. childFun2childFun3 方法调用过程和 childFun1 方法逻辑一样。

    因此我们可以总结:

    1. 当在协程函数内部调用其他协程函数时,都会生成一个挂起点,即这里的 childFun1 childFun2childFun3 ,都对应一个 label
    2. 调用其他协程函数的时候,都会将本协程函数的协程体 $continuation 传递给被调用的协程函数,以便被调用的协程函数可以回调恢复本协程函数。
    3. 当被调用的协程函数返回 COROUTINE_SUSPENDED,即表明被调用的协程函数是一个异步操作,希望本协程函数挂起,等待被调用的协程函数执行完成回调它。本协程函数就会从这个挂起点直接返回,不再执行下面的代码,直到被调用的协程函数通过 $continuationresumeWith 方法来恢复本协程函数,并继续执行下面代码,直到遇到下一个挂起点。

    本质上协程也是通过回调实现异步操作的,只不过 kotlin 编译器将协程函数变成状态机。
    也明白了为什么 suspend 函数为什么只能在 suspend 函数内部调用,而不能在普通函数内部执行,因为没有隐藏的 $continuation 对象。

    三. 创建协程

    上面分析了 suspend 函数,但是现在这个函数,没办法执行,因为 suspend 函数都需要 Continuation 实例,那么第一个 Continuation 实例该如何创建呢?
    kotlin 标准库中提供了两个函数来创建 Continuation 实例

    3.1 createCoroutine 方法

    @SinceKotlin("1.3")
    @Suppress("UNCHECKED_CAST")
    public fun <T> (suspend () -> T).createCoroutine(
        completion: Continuation<T>
    ): Continuation<Unit> =
        SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)
    
    @SinceKotlin("1.3")
    @Suppress("UNCHECKED_CAST")
    public fun <R, T> (suspend R.() -> T).createCoroutine(
        receiver: R,
        completion: Continuation<T>
    ): Continuation<Unit> =
        SafeContinuation(createCoroutineUnintercepted(receiver, completion).intercepted(), COROUTINE_SUSPENDED)
    

    通过 suspend 函数去创建 Continuation 对象,返回 SafeContinuation 类的实例,这个类以后我们有机会分析。
    completion 当协程完成之后,会调用它的 resumeWith 方法。

    例如:

    fun main() {
        val coroutine = (::parentFun).createCoroutine(object : Continuation<Int>{
            override val context: CoroutineContext
                get() = EmptyCoroutineContext
    
            override fun resumeWith(result: Result<Int>) {
                println("result:$result")
            }
        })
        // 执行
        coroutine.resume(Unit)
    }
    

    3.2 startCoroutine 方法

    @SinceKotlin("1.3")
    @Suppress("UNCHECKED_CAST")
    public fun <T> (suspend () -> T).startCoroutine(
        completion: Continuation<T>
    ) {
        createCoroutineUnintercepted(completion).intercepted().resume(Unit)
    }
    
    @SinceKotlin("1.3")
    @Suppress("UNCHECKED_CAST")
    public fun <R, T> (suspend R.() -> T).startCoroutine(
        receiver: R,
        completion: Continuation<T>
    ) {
        createCoroutineUnintercepted(receiver, completion).intercepted().resume(Unit)
    }
    

    这个方法不仅创建了 Continuation 对象,并且还直接执行了协程函数。
    例如:

    fun main() {
        (::parentFun).startCoroutine(object : Continuation<Int>{
            override val context: CoroutineContext
                get() = EmptyCoroutineContext
    
            override fun resumeWith(result: Result<Int>) {
                println("result:$result")
            }
        })
    }
    

    3.3 suspendCoroutine 方法

    上面的例子中 childFun1 这些方法,我们并没有实现异步操作,这里有两个难点:

    1. 没有办法返回 COROUTINE_SUSPENDED 值,因为它不是 Int 类型,且外部获取不到这个值。
    2. 获取不到调用方的 Continuation 对象,来恢复调用方。

    针对这种情况,kotlin 提供了 suspendCoroutine 方法来解决这个问题。

    @SinceKotlin("1.3")
    @InlineOnly
    public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T =
        suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
            val safe = SafeContinuation(c.intercepted())
            block(safe)
            safe.getOrThrow()
        }
    

    childFun1 方法进行改变

    suspend fun childFun1(): Int {
        Thread.sleep(1000)
        return 10
    }
    
    // 转变成
    val executor = Executors.newScheduledThreadPool(1) 
    
    suspend fun childFun1(): Int = suspendCoroutine { continuation ->
        executor.schedule(fun() { continuation.resume(10) }, 1000, TimeUnit.MILLISECONDS)
    }
    

    这里我们并没有直接返回,而是通过 executor 线程池,延迟 1 秒钟之后再返回值,模拟异步操作。

    转成的 java 代码

       @Nullable
       public static final Object childFun1(@NotNull Continuation $completion) {
          boolean var1 = false;
          boolean var3 = false;
          SafeContinuation var4 = new SafeContinuation(IntrinsicsKt.intercepted($completion));
          Continuation continuation = (Continuation)var4;
          int var6 = false;
          executor.schedule((Runnable)(new CoroutineKt$childFun1$2$1(continuation)), 1000L, TimeUnit.MILLISECONDS);
          Object var10000 = var4.getOrThrow();
          if (var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
             DebugProbesKt.probeCoroutineSuspended($completion);
          }
    
          return var10000;
       }
    

    因为 suspendCoroutine 函数时内联函数,因此函数内容直接复制到 childFun1 函数中。

    SafeContinuation 的 getOrThrow 方法

        @PublishedApi
        internal actual fun getOrThrow(): Any? {
            var result = this.result // atomic read
            if (result === UNDECIDED) {
                if (RESULT.compareAndSet(this, UNDECIDED, COROUTINE_SUSPENDED)) return COROUTINE_SUSPENDED
                result = this.result // reread volatile var
            }
            return when {
                result === RESUMED -> COROUTINE_SUSPENDED // already called continuation, indicate COROUTINE_SUSPENDED upstream
                result is Result.Failure -> throw result.exception
                else -> result // either COROUTINE_SUSPENDED or data
            }
        }
    

    可以看出在没有调用 resumeWith 方法时,就返回 COROUTINE_SUSPENDED

    val executor = Executors.newScheduledThreadPool(2)
    
    suspend fun childFun1(): Int = suspendCoroutine { continuation ->
        executor.schedule(fun() { continuation.resume(10) }, 1000, TimeUnit.MILLISECONDS)
    }
    
    suspend fun childFun2(): Int = suspendCoroutine { continuation ->
        executor.schedule(fun() { continuation.resume(20) }, 2000, TimeUnit.MILLISECONDS)
    }
    
    suspend fun childFun3(num1: Int, num2: Int): Int = suspendCoroutine { continuation ->
        executor.schedule(fun() { continuation.resume(num1 + num2) }, 1000, TimeUnit.MILLISECONDS)
    }
    
    suspend fun parentFun() : Int {
        val num1 = childFun1()
        val num2 = childFun2()
        val sum = childFun3(num1, num2)
        return sum
    }
    
    fun main() {
        (::parentFun).startCoroutine(object : Continuation<Int>{
            override val context: CoroutineContext
                get() = EmptyCoroutineContext
    
            override fun resumeWith(result: Result<Int>) {
                println("result:$result")
            }
        })
    }
    

    相关文章

      网友评论

        本文标题:Kotlin Coroutine suspend 原理解析

        本文链接:https://www.haomeiwen.com/subject/fkermltx.html