美文网首页Kotlin程序员Kotlin编程
深入浅出Kotlin Coroutine的状态机和线程调度

深入浅出Kotlin Coroutine的状态机和线程调度

作者: 不老猫 | 来源:发表于2017-11-19 23:13 被阅读621次

    【本文乃原创,未经授权,拒绝转载,谢谢!】
    Kotlin的Coroutine特性,虽然还只是实验阶段,但却大受欢迎,网上大量的资料介绍它。但网上的资料,分为两种类型,一种是介绍怎么使用,这个官方资料已经提供了大量的例子(参见引用3)。但看了之后大家会对它后面的实现存在很多疑虑。另一种是介绍其实现的,却介绍得比较晦涩,对于一些关键点也没有介绍清楚。比如编译器生产的状态机究竟长什么样子,挂起函数什么情况下会使协程挂起,线程调度器的切入时机是怎样的。所以在大量阅读代码和资料的基础上,总结下面这篇文章,希望能深入浅出地为大家解答这些问题。

    术语

    • Coroutine:协程,即可以被挂起的运算,挂起时会暂停运算,但不会阻塞当前的线程。线程可以被空闲出来去干其他事情。
    • Suspending function:挂起函数,是指函数的运行可以被中止,运行它的线程可以去干其他事情,而随后某个时候,函数可以在中止的地方继续执行。再次执行的线程可以不是之前的线程。Kotlin编译器在编译时,会做两件事:一是会为每个“挂起函数”增加一个参数,参数的类型是Continuation, 二是调用它的地方,会形成可以中止和恢复的挂起点。
    • Continuation:它是一个Interface,实现这个接口的类的对象可以作为协程调用“挂起函数”时的回调。“挂起函数”执行结束后可以使用它唤醒调用者。

    先谈谈何谓挂起

    可以把一个协程当成一个任务(Task),任务它运行在某个线程之上,任务它是可以中止的,之后又可以恢复执行。挂起(suspending)就是指任务的中止,它不会阻塞(blocking)当前的线程。一个协程并不是在任意的指令都可以挂起,只有在协程调用“挂起函数”时,才可以被挂起。所以协程这种挂起有着巨大的好处,它类似于线程,可以用于异步化流程,但当它挂起时又不会阻塞有限的线程资源。另外,不同协程切换不需要系统调用,是成本极度的操作。

    基本原理

    Kotlin协程是使用编译技术实现的,无需依赖于VM和OS的实现,使用了”代码转换“实现挂起技术。"挂起函数"会被转化成状态机,每个挂起调用都会被处理成一个状态。在挂起之前,本地变量会被保存起来,同时会把状态机的下一个状态保存起来。当状态机被唤醒时,所有变量会被恢复,同时状态机从上一次保存的状态开始执行。挂起时,保存这些信息的状态机,是Continuation类型的对象,可以被程序保存或者传递。当协程调用一个"挂起函数"时,就会把当前的状态机传递给“挂起函数”,让"挂起函数"在完成任务后可以唤醒它。这整个过程,下面我们会通过例子重点分析。

    神秘的状态机

    在”基本原理“部分我们谈到”挂起函数“是由状态机来实现的,而且是由编译器进行”代码转换“得到的。也即我们写一个“挂起函数”时,它是和普通函数长得八九不离十的,但生成的字节码和普通函数的字节码是有天壤之别的。搞懂Kotlin如何实现协程,重中之重就是搞清楚这个神秘的状态机是长什么样子的。

    首先什么是状态机?它由状态和行为构成。它每一时刻只会处于一种状态下面,可以在不同状态之间切换。行为可以触发状态的切换。状态机可以处于任意合法的状态,这可以看作它有不同的入口。状态机一般可以使用状态图来表示:

    state_machine.jpg

    接下来,我们来看一下具体的例子。这里使用生产Fibonacci序列的Lambda来分析。

    
    val fibonacciSeq = buildSequence {
        var a = 1
        var b = 1
    
        yield(1)
    
        while (true) {
            yield(b)
    
            val tmp = a + b
            a = b
            b = tmp
        }
    }
    

    在上面代码中,buildSequenceyield的定义如下:

    public fun <T> buildSequence(builderAction: suspend SequenceBuilder<T>.() -> Unit): Sequence<T>
    
    public abstract suspend fun yield(value: T)
    

    buildSequence的参数使用的是suspend lambdayield函数也定义为suspend function。对于这两种类型,Kotlin编译器在编译时会为它们各自生成一个状态机,此状态机实现了Continuation接口。

    对于Fibonacci序列例子中的Lambda,其编译后字节码的伪代码大致是这个样子的:

    class fibonacciSeq$1 extends CoroutineImpl
    {
      Continuation complete
      int label // 1
      int a     // 2
      int b     // 3
      void doResume(Object param, Throwable e) // 4
      {
        tableswitch(lable)  // 5
          case 0: L0
          case 1: L1
          case 2: L2
          else: L3
        L0:
          int a = 1
          int b = 1
          this.a = a   // 6
          this.b = b   // 7
          label = 1    // 8
          Object r = SequenceBuilder.yield(1, this)  // 9
          if(r == COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED  // 10
        L1:
          a = this.a   // 11
          b = this.b   // 12
          if (e != null) throw e  // 13
        L4:  // while begin
          this.a = a
          this.b = b
          label = 2   // 14
          Object r = SequenceBuilder.yield(b, this)
          if(r == COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED
        L2:
          a = this.a
          b = this.b
          if (e != null) throw e
          int tmp = a + b
          a = b
          b = tmp
          goto L4  // while end
        L3:   // 15
          throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine")
      }
      ...
    }
    

    现在让我们好好分析它。为了方便分析,我把关键代码使用注释作了标记。可以看到编译为我们生成的是一个状态机,其状态保存在lable标签上,这个变量是在其父类中定义的,为了方便理解在这里我们把它定义在子类中。

    state_marchine_kotlin.png

    这个状态机最重要的函数是doResume,它包含了所有状态对应的代码。状态机被唤醒执行代码时,它就是入口。只是对于当前不同的状态,会执行不同的代码片段。对于上面这个例子,它一共有3个状态。在函数的入口处就是一个tableswitch,根据不同状态跳转不同的代码。如果状态不正确,它会抛出一个IllegalStateException异常。

    我们可以看到状态发生变化是在代码1和14,它们都是在调用“挂起函数”之前进行的。只有在调用“挂起函数”之前,状态机才会改变状态。

    同时本地变量也被保存了起来,请看代码6、7。所以调用“挂起函数”之前,最主要是做了这两件事。

    调用“挂起函数”yield之时,因为这个类实现了Continuatin接口,所以this被传为最后一个参数传了进去,作为回调用。“挂起函数”执行完了之后可以通过它来使当前的状态机执行lable保存的状态。我们看一下Continuation的定义就明白为什么它可以作为回调:

    public interface Continuation<in T> {
    
        public val context: CoroutineContext
    
        public fun resume(value: T)
    
        public fun resumeWithException(exception: Throwable)
    }
    

    如果“挂起函数”确实挂起(suspending)了,那它最后会通过这个回调来唤醒当前的状态机,继续执行下一段代码。我们的例子中,标签10那行代码yield会挂起协程,在调用它之前label已经被赋值1,当它唤醒当前状态机时,状态机会再次执行doResume,从tableswitch中会根据label跳转到L1执行代码。

    如果“挂起函数”并没有挂起,那它会返回正确的结果(而非COROUTINE_SUSPENDED),此时协程不会挂起,回调对象也不会被调用。标签10的代码的判断条件也不会成立,所以协程会继续往下走。

    线程调度

    我们可以指定协程在单个线程上运行,也可以让它运行在线程池分配的线程上。那协程是如何被调度的?在使用上,我们在调用协程构建器时可以进行设置,比如,我们想运行在Kotlin Lib提供的线程池,则调用async(CommonPool){ ... };如果需要运行在单独的线程,则可以:

    var t = newSingleThreadContext("my thread")
    async(t){ ... }
    

    对于这个设置设置器的参数,如果你不提供,那默认是使用CommonPool。你可能好奇里面的调度功能是怎么实现的?可以分三步来分析:

    第一步,在创建Coroutine时,Kotlin会先判断你有没有提供调度器,如果有的话,它会把Coroutine包装在DispatchedContinuation中,后面使用的是这个包装类对象。

    第二步,当Coroutine被resume时,调用到的是包装类对象的resume,这时包装类对象就把Couroutine放到一个Runnable中,并把它放到调度器中排队,等待执行。

    第三步,调度器就会在自己的线程中执行,当Runnable被执行时,它的run()中就执行真正的resume

    在第一步中,Kotlin使用了Interceptor技术来实现。因为这个流程比较难理解,所以我们对着Kotlin Lib源码一步一步拆解。在创建Coroutine时:

    public fun <T> (suspend () -> T).createCoroutineUnchecked(completion: Continuation<T>): Continuation<Unit> = ... 
      (this.create(completion) as kotlin.coroutines.experimental.jvm.internal.CoroutineImpl).facade
    

    调用了facade,其实现是:

    val facade: Continuation<Any?> get() {
      if (_facade == null) _facade = interceptContinuationIfNeeded(_context!!, this)
            return _facade!!
    }
    

    interceptContinuationIfNeeded的实现是:

    internal fun <T> interceptContinuationIfNeeded(...) = context[ContinuationInterceptor]?.interceptContinuation(continuation) ?: continuation
    

    而调度器都会继承自CoroutineDispatcher,它就是一个Interceptor。

    // Source Code: CoroutineDispatcher.kt
    class CoroutineDispatcher : ContinuationInterceptor {
      public override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = DispatchedContinuation(this, continuation)
    }
    

    第二步和第三步,相关源码都在CoroutineDispatcher.kt中,大家可以自己研究研究。

    以上内容为本人的理解,能力有限,如有错漏,欢迎批评指出。

    欢迎关注本人公众号,与你分享移动开发的技术心得!:


    image.png

    参考文献:

    1. https://github.com/Kotlin/kotlin-coroutines/blob/master/kotlin-coroutines-informal.md
    2. https://kotlinlang.org/docs/tutorials/coroutines-basic-jvm.html
    3. https://github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md
    4. http://tutorials.jenkov.com/java-concurrency/synchronized.html
    5. https://github.com/aCoder2013/blog/issues/11
    6. https://www.kotlindevelopment.com/deep-dive-coroutines/

    相关文章

      网友评论

        本文标题:深入浅出Kotlin Coroutine的状态机和线程调度

        本文链接:https://www.haomeiwen.com/subject/xjqamxtx.html