美文网首页KotlinKotlin
kotlin协程的使用与原理

kotlin协程的使用与原理

作者: android老男孩 | 来源:发表于2019-10-08 09:28 被阅读0次

    协程

    协程是轻量级线程,一个线程中可以有很多协程,协程本质上可以认为是运行在线程上的代码块,协程提供的挂起操作会使协程暂停执行,而不会导致线程阻塞。协程的实现大多都在编译器操作,我们只负责优雅的使用语法糖就行

    协程的优势

    • 优化线程切换带来的资源消耗
    • 提高cpu利用率,一个线程遇到I/O密集型任务会阻塞,cpu不再工作或交给其它线程,这时就涉及到了切线程以及cpu没有工作的问题,协程可以在同一条线程下执行多任务,非阻塞,减少切换线程,并且提高了cpu的利用率
    • 由于在同一个线程中,协程不涉及到同步问题,数据保存与恢复,只要恢复挂起函数,函数回来直接使用线程数据即可
    • 我们可以用顺序编程的方式实现异步以及并发任务,这比响应式流的链式调用更加直观,给了我们同步编程的体验;
    • 但以上的线程切换等优化要放在 IO 密集型任务中,协程的挂起操作相比老式的线程阻塞操作大大提高了性能,在客户端场景中一般用线程池维护,不会有特别大的性能提升

    线程

    那么既然谈到了线程切换的成本,I/O操作,我们先来谈几个概念

    关于IO操作

    简单的理解就是IO所需要的CPU资源非常少。大部分工作是分派给DMA直接内存存取完成的

    CPU计算文件地址 ——> 委派DMA读取文件 ——> DMA接管总线 ——> CPU的A进程阻塞,挂起——> CPU切换到B进程 ——> 
    DMA读完文件后通知CPU(一个中断异常)——> CPU切换回A进程操作文件
    

    计算机硬件上使用DMA来访问磁盘等IO,也就是请求发出后,CPU就不再管了,直到DMA处理器完成任务,再通过中断告诉CPU完成了。所以,单独的一个IO时间,对CPU的占用是很少的,阻塞了就更不会占用CPU了,因为程序都不继续运行了,CPU时间交给其它线程和进程了。虽然IO不会占用大量的CPU时间,但是非常频繁的IO还是会非常浪费CPU时间的,所以面对大量IO的任务,有时候是需要算法来合并IO,或者通过cache来缓解IO压力的

    • 计算密集型 计算圆周率、对视频进行高清解码,务的特点是要进行大量的计算,消耗CPU资源,这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数

    • IO密集型 涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成

    线程的阻塞

    • 等待阻塞:运行(running)的线程执行o.wait()方法,JVM会把该线程放入等待队列(waitting queue)中。
    • 同步阻塞:运行(running)的线程在获取对象的同步锁时,若该同步锁被别的线程占用,则JVM会把该线程放入锁池(lock pool)中。
    • 其他阻塞:运行(running)的线程执行Thread.sleep(long ms)或t.join()方法,或者发出了I/O请求时,JVM会把该线程置为阻塞状态。当sleep()状态超时、join()等待线程终止或者超时、或者I/O处理完毕时,线程重新转入可运行(runnable)状态。

    sleep与wait区别

    • sleep和wait这两个函数被调用之后线程都应该放弃执行权,不同的是sleep不释放锁而wait的话是释放锁。
    • 一个线程调用Sleep之后进入了阻塞状态其它线程没有获取锁依然要等待,当sleep状态超时、join等待线程终止或者超时,线程重新转入可运行(runnable)状态。
    • wait不同的在释放执行权之后wait也把锁释放了进入了线程等待阻塞,它要运行的话还是要和其他的线程去竞争锁,之后才可以获得执行权。
    • sleep(long mills):让出CPU资源,但是不会释放锁资源。
    • wait():让出CPU资源和锁资源

    yield与 join

    • yield 调用此方法的线程,释放当前cpu的执行权,暂停当前正在执行的线程对象,并执行其他线程,它仅能使一个线程从运行状态转到可运行状态,而不是等待或阻塞状态,但可能没有效果
    • join 使得一个线程在另一个线程结束后再执行,也就是说使得当前线程可以阻塞其他线程执行

    线程的yield,sleep,wait都会让出cpu使用权,进入阻塞也是会让出cpu使用权的

    等待池和锁池

    当调用了wait()方法,释放锁资源和Cpu资源,线程进入等待池,当其他线程调用notify()会从等待池任意选择一个线程调入锁池,notifyAll()会调用所有等待池线程进入锁池,锁池里的对象可以竞争锁,优先级高的获得锁的能力更强,获得锁的线程可以进入就绪态继续执行,执行完之后释放锁,然后锁池里的线程再继续竞争

    线程创建与切换

    线程的创建是操作系统内核启动了一个线程,线程切换涉及到程序计数器,CPU寄存器状态

    程序计数器

    简单理解行号指示器,线程私有内存

    程序计数器是一块较小的内存空间,它可以看作是当前线程所执行字节码的行号指示器。在虚拟机的概念模型里(仅是概念模型,各种虚拟机可能会通过一些更高效的方式去实现),字节解释器工作时就是通过改变这个计数器的值来选取下一条需要执行的字节码指令。且由于java虚拟机的多线程是通过线程轮流切换并分配器执行时间的方式来实现的,在任何一个确定的时刻,一个处理器(对于多核处理器是一个内核)都只会执行一条线程中的指令,因为为了线程切换后能恢复到正确的执行位置,每条线程都需要有一个独立的程序计数器,各条线程之间计数器互不影响,我们称这类区域为“线程私有”的内存

    CPU寄存器

    其实就是将原有线程的数据标记保存到寄存器,以便恢复

    线程切换时需要知道在这之前当前线程已经执行到哪条指令了,所以需要记录程序计数器的值,另外比如说线程正在进行某个计算的时候被挂起了,那么下次继续执行的时候需要知道之前挂起时变量的值时多少,因此需要记录CPU寄存器的状态。所以一般来说,线程上下文切换过程中会记录程序计数器、CPU寄存器状态等数据。

    线程的栈

    进栈出栈也是为了实现函数的调用,多线程如果公用一个栈,那么会出现很多同步问题和错误,成本太高,这里的同步问题不光光是值,很可能导致函数地址返回错误,函数调用错误等。

    思维扩展,不同的程序计数器,维护一个栈?
    好,思路看起来可以,但是栈保存的是对象地址,地址也是需要赋值的,数据恢复时?清空整个栈,改成线程对应的值?成本太高,而且地址错误等会带来较多的问题

    • 直接内存与堆内存不是一块内存
    • 堆用来存储new出来的对象和数组,可以动态的分配内存大小,生命周期不确定,速度慢
    • 栈用来存储基本类型变量和对象的引用变量的地址,存在栈中的数据大小和生命周期必须是明确的,速度快
    • 堆是多线程共享的资源
    那么为什么线程需要私有栈,协程不需要

    协程的状态机(后边会详细讲协程的挂起和恢复)就是相当于私有栈,只是轻量级一些,因为只是单纯的代码块,作为一个对象存储在堆上,而且协程的数据恢复直接使用同一个线程的数据,没有同步问题

    协程创建原理

    我们在扩展库中通常使用 launch、async 等协程构建器来开启一个协程,但是实际上真正启动协程的函数应该是什么呢?
    标准库提供了一个 startCoroutine 函数来启动协程,它是一个扩展函数,接收者是一个挂起函数类型

    fun <T> (suspend  () -> T).startCoroutine(completion: Continuation<T>){
         createCoroutineUnintercepted(completion).intercepted().resume(Unit)
    }
    

    startCoroutine 的内部实现调用了 createCoroutineUnintercepted, 这个函数是一个协程内建函数
    协程内建函数的实现没法用 Kotlin 来表达,它是黑科技,所以编译器其实是认识它们的,会在编译时直接替换它们的实现

    @SinceKotlin("1.3")
    @InlineOnly
    @Suppress("UNUSED_PARAMETER", "RedundantSuspendModifier")
    public suspend inline fun <T> suspendCoroutineUninterceptedOrReturn(crossinline block: (Continuation<T>) -> Any?): T =
        throw NotImplementedError("Implementation of suspendCoroutineUninterceptedOrReturn is intrinsic")
    
    

    它在kotlin中的实现只是抛了一个error,但是不能用协程内建函数来解释挂起和协程工作的原理,我们接下来仔细分析一下挂起的实现

    协程挂起原理

    CPS 变换

    CPS 变换是对挂起函数的函数签名进行的一种变换:
    挂起函数 await 的函数签名如下所示

    suspend fun <T> CompletableFuture<T>.await(): T
    

    在编译期发生 CPS 变换之后

    fun <T> CompletableFuture<T>.await(continuation: Continuation<T>): Any?
    
    续体与续体拦截器

    续体是一个较为抽象的概念,简单来说它包装了协程在挂起之后应该继续执行的代码;在编译的过程中,一个完整的协程被分割切块成一个又一个续体。

    例如

    sequence {
        for (i in 1..10) yield(i * i)
        println("over")
    } 
    

    这里,每次调用挂起函数 yield()时,协程都会挂起,其执行的剩余部分被视作续体,所以有 10 个续体:循环运行第一次后,i=2,挂起;循环运行第二次后,i=3,挂起……最后一次打印“over”并完结协程

    对于已经创建,但尚未启动的协程,由它的初始续体表示,这由它的整个执行组成,类型为 Continuation<Unit> 。

    挂起函数在恢复的时候,理论上可能会在任何一个线程上恢复,有时我们需要限定协程运行在指定的线程,而这个工作就是续体拦截器负责, ContinuationInterceptor(续体拦截器),续体拦截器负责拦截恢协程协程在恢复后应执行的代码(即续体)并将其在指定线程或线程池恢复。

    挂起函数不一定会真正挂起协程

    在挂起函数的编译中,每个挂起函数都会被编译为一个实现了 Continuation 接口的匿名类,而续体拦截器会拦截真正挂起协程的挂起点的续体。这句话是什么意思?在协程中调用挂起函数,挂起函数不一定会真正挂起协程,例如下面这个例子:

    launch {
        val deferred = async {
            // 发起了一个网络请求
            ......
        }
        // 做了一些操作
        ......
        deferred.await()
        // 后续的一些操作
        ......
    }
    

    在 deferred.await() 这行执行的时候,如果网络请求已经取得了结果,那 await 函数会直接取得结果,而不会事实上的挂起协程,相反,如果网络请求还未产生结果,await 函数就会使协程挂起

    续体拦截器只拦截真正发生挂起的挂起点后的续体,对于未发生挂起的挂起点,续体会被直接调用 resumeWith 这一类的函数而不需要续拦截器对它进行操作。除此之外,续体拦截器还会缓存拦截过的续体,并且在不再需要它的时候调用 releaseInterceptedContinuation 函数释放它。

    状态机

    协程在编译挂起函数时会将函数体编译为状态机,这样做的好处在于避免创建过多的类和对象,是出于一种性能上的考虑。

    val a = a()
    val y = foo(a).await() // 挂起点 #1
    b()
    val z = bar(a, y).await() // 挂起点 #2
    c(z)
    

    这一段挂起函数内部的代码,它拥有两个挂起点;在编译后生成如下的伪 Java 字节码

    lass <anonymous_for_state_machine> extends SuspendLambda<...> {
        // 状态机当前状态
        int label = 0
        
        // 协程的局部变量
        A a = null
        Y y = null
        
        void resumeWith(Object result) {
            if (label == 0) goto L0
            if (label == 1) goto L1
            if (label == 2) goto L2
            else throw IllegalStateException()
            
          L0:
            // 这次调用,result 应该为空
            a = a()
            label = 1
            result = foo(a).await(this) // 'this' 作为续体传递
            if (result == COROUTINE_SUSPENDED) return // 如果 await 挂起了执行则返回
          L1:
            // 外部代码传入 .await() 的结果恢复协程 
            y = (Y) result
            b()
            label = 2
            result = bar(a, y).await(this) // 'this' 作为续体传递
            if (result == COROUTINE_SUSPENDED) return // 如果 await 挂起了执行则返回
          L2:
            // 外部代码传入 .await() 的结果恢复协程
            Z z = (Z) result
            c(z)
            label = -1 // 没有其他步骤了
            return
        }          
    } 
    

    现在,当协程开始时,我们调用了它的 resumeWith() —— label 是 0,然后我们跳去 L0,接着我们做一些工作,将 label 设为下一个状态—— 1,调用 .await(),如果协程执行挂起就返回。当我们想继续执行时,我们再次调用 resumeWith(),现在它继续执行到了 L1,做一些工作,将状态设为 2,调用 .await(),同样在挂起时返回。下一次它从 L3 继续,将状态设为 -1,这意味着"结束了,没有更多工作要做了"。

    虽然 Java 中没有 goto 语句,但是 class 字节码中支持 goto

    上边只是从源码和设计额角度解释了协程的挂起流程和恢复,但是协程挂起的本质是什么?挂起的实现到底是怎么实现?,到底谁来调用续体的resum 函数来恢复执行?也就是说必定要有线程一直跟踪这个任务,这样它才能在任务完成时恢复续体的执行。协程的挂起和 Java 的 NIO 机制是类似的,我们在一个线程中执行了一个原本会阻塞线程的任务,但是这个调用者线程没有发生阻塞,这是因为它们有一个专门的线程来负责这些任务的流转,也就是说,当我们发起多个阻塞操作的时候,可能只会阻塞这一个专门的线程,它一直在等待,谁的阻塞结束了,它就把回调再分派过去,这样就完成了阻塞任务与阻塞线程的多对一,而不是以前的一对一,所以挂起也好,NIO 也好,本质上都没有彻底消灭阻塞,但是它们都使阻塞的线程大大减少,从而避免了大量的线程上下文状态切换以及避免了大量线程的产生,从而在 IO 密集型任务中大大提高了性能

    协程基础api

    • GlobalScope.launch 创建协程非阻塞 默认不指定线程,不会在主线程中工作,会默认调用Dispatchers.Default,为什么会这样?官方文档不是这么写的?
      官方文档中默认会在main线程,但Dispatchers.Main 事实上是一个多平台化的 API,在 Android、JavaFX、Swing 等场景下实现的细节都不同,由于android机制中的主线程不能做耗时操作,所以在实现上做了一些改变,另外Dispatchers.Main将其它线程的结果调度到UI线程中,就是通过handler实现的
    GlobalScope.launch { // 运行在父协程的上下文中,即 runBlocking 主协程
          Log.d("1111", "main runBlocking      : I'm working in thread ${Thread.currentThread().name}")
    }
    GlobalScope.launch(Dispatchers.Unconfined) { // 不受限的——将工作在主线程中
          Log.d("1111", "Unconfined            : I'm working in thread ${Thread.currentThread().name}")
    }
    GlobalScope.launch(Dispatchers.Default) { // 将会获取默认调度器
         Log.d("1111", "Default               : I'm working in thread ${Thread.currentThread().name}")
    }
    GlobalScope.launch(newSingleThreadContext("MyOwnThread")) { // 将使它获得一个新的线程
         Log.d("1111", "newSingleThreadContext: I'm working in thread ${Thread.currentThread().name}")
    }
    
    输出结果
    10-06 10:40:13.118 5073-5099/com.wuba.testkotlin D/1111: main runBlocking      : I'm working in thread DefaultDispatcher-worker-1
    10-06 10:40:13.118 5073-5073/com.wuba.testkotlin D/1111: Unconfined            : I'm working in thread main
    10-06 10:40:13.122 5073-5099/com.wuba.testkotlin D/1111: Default               : I'm working in thread DefaultDispatcher-worker-1
    10-06 10:40:13.128 5073-5102/com.wuba.testkotlin D/1111: newSingleThreadContext: I'm working in thread MyOwnThread
    
    • async,通过await得到一个返回值,返回的是Deferred类
     val result = async {           
         "我是字符串"
     }
     val await = result.await()
    
    • runBlocking 阻塞
    • withcontext
    launch(Dispatchers.Main) {
           withContext(Dispatchers.IO) {
            }
           withContext(Dispatchers.IO) {
           }
    }
    
    • Dispatchers 线程调度器
      • Dispatchers.Default : 将会获取默认调度器,使用共享的后台线程池
      • Dispatchers.IO : 运行在IO线程中
      • Dispatchers.Main : 运行在主线程中
    launch(Dispatchers.Main) {...}
    launch(Dispatchers.IO) {...}
    
    • CommonPool 共享协程池
    launch(CommonPool) {...}
    
    • suspend 挂起函数
      只能在协程内部执行或者同样被suspend修饰的函数调用

    协程

    • 协程在不挂起时,Dispatchers.Default下,协程与协程之间,协程内部的代码会并发执行,在android中是这样因为交给了不同的线程,线程的dispather.main在多平台上有不同的实现,实现的依然是线程并发
    GlobalScope.launch{
                repeat(8) {
                    Log.d("1111", "第1个协程${i++},${Thread.currentThread().name}")
                }
            }
    
            GlobalScope.launch{
                repeat(8) {
                    Log.d("1111", "第2个协程${r++},${Thread.currentThread().name}")
                }
            }
    输出结果
    10-07 09:37:14.399 13951-13991/com.wuba.testkotlin D/1111: 第1个协程0,DefaultDispatcher-worker-1
    10-07 09:37:14.399 13951-13992/com.wuba.testkotlin D/1111: 第2个协程0,DefaultDispatcher-worker-2
    10-07 09:37:14.399 13951-13992/com.wuba.testkotlin D/1111: 第2个协程1,DefaultDispatcher-worker-2
    10-07 09:37:14.399 13951-13991/com.wuba.testkotlin D/1111: 第1个协程1,DefaultDispatcher-worker-1
    10-07 09:37:14.399 13951-13992/com.wuba.testkotlin D/1111: 第2个协程2,DefaultDispatcher-worker-2
    10-07 09:37:14.399 13951-13991/com.wuba.testkotlin D/1111: 第1个协程2,DefaultDispatcher-worker-1
    10-07 09:37:14.399 13951-13992/com.wuba.testkotlin D/1111: 第2个协程3,DefaultDispatcher-worker-2
    10-07 09:37:14.399 13951-13991/com.wuba.testkotlin D/1111: 第1个协程3,DefaultDispatcher-worker-1
    10-07 09:37:14.399 13951-13992/com.wuba.testkotlin D/1111: 第2个协程4,DefaultDispatcher-worker-2
    10-07 09:37:14.399 13951-13991/com.wuba.testkotlin D/1111: 第1个协程4,DefaultDispatcher-worker-1
    10-07 09:37:14.399 13951-13992/com.wuba.testkotlin D/1111: 第2个协程5,DefaultDispatcher-worker-2
    10-07 09:37:14.399 13951-13991/com.wuba.testkotlin D/1111: 第1个协程5,DefaultDispatcher-worker-1
    10-07 09:37:14.399 13951-13991/com.wuba.testkotlin D/1111: 第1个协程6,DefaultDispatcher-worker-1
    10-07 09:37:14.399 13951-13992/com.wuba.testkotlin D/1111: 第2个协程6,DefaultDispatcher-worker-2
    10-07 09:37:14.399 13951-13991/com.wuba.testkotlin D/1111: 第1个协程7,DefaultDispatcher-worker-1
    10-07 09:37:14.399 13951-13992/com.wuba.testkotlin D/1111: 第2个协程7,DefaultDispatcher-worker-2
    
    • 协程在不挂起时,但如果指定了同一个线程,两个协程是顺序执行的,第一个协程的代码会执行完成,第二个协程继续执行,协程与线程也一样
    GlobalScope.launch(Dispatchers.Unconfined) {
                for (i in 1..6) {
                    Log.d("1111", "协程任务打印第$i 次")
                }
            }
            for (i in 1..8) {
                Log.d("1111", "主线程打印第$i 次")
            }
    输出结果
    10-07 09:29:06.112 13051-13051/com.wuba.testkotlin D/1111: 协程任务打印第1 次
    10-07 09:29:06.112 13051-13051/com.wuba.testkotlin D/1111: 协程任务打印第2 次
    10-07 09:29:06.112 13051-13051/com.wuba.testkotlin D/1111: 协程任务打印第3 次
    10-07 09:29:06.112 13051-13051/com.wuba.testkotlin D/1111: 协程任务打印第4 次
    10-07 09:29:06.112 13051-13051/com.wuba.testkotlin D/1111: 协程任务打印第5 次
    10-07 09:29:06.112 13051-13051/com.wuba.testkotlin D/1111: 协程任务打印第6 次
    10-07 09:29:06.114 13051-13051/com.wuba.testkotlin D/1111: 主线程打印第1 次
    10-07 09:29:06.114 13051-13051/com.wuba.testkotlin D/1111: 主线程打印第2 次
    10-07 09:29:06.114 13051-13051/com.wuba.testkotlin D/1111: 主线程打印第3 次
    10-07 09:29:06.114 13051-13051/com.wuba.testkotlin D/1111: 主线程打印第4 次
    10-07 09:29:06.114 13051-13051/com.wuba.testkotlin D/1111: 主线程打印第5 次
    10-07 09:29:06.114 13051-13051/com.wuba.testkotlin D/1111: 主线程打印第6 次
    10-07 09:29:06.114 13051-13051/com.wuba.testkotlin D/1111: 主线程打印第7 次
    10-07 09:29:06.114 13051-13051/com.wuba.testkotlin D/1111: 主线程打印第8 次
    
    • 协程内部碰到挂起时,协程就不会执行了,等待挂起完成才能继续后面的部分
    suspend fun getToken(): String {
            delay(300)
            Log.d("AA", "getToken 开始执行,时间")
            return "ask"
        }
    
        suspend fun getResponse(token: String): String {
            delay(100)
            Log.d("AA", "getResponse 开始执行")
            return "response"
        }
    
        fun setText(response: String) {
            Log.d("AA", "setText 执行,时间")
        }
    
    // 运行时
        GlobalScope.launch(Dispatchers.Main) {
            Log.d("AA", "协程 开始执行")
            val token = getToken()
            val response = getResponse(token)
            setText(response)
        }
    输出结果
    10-07 09:22:48.358 12108-12108/? D/1111: 协程 开始执行
    10-07 09:22:48.660 12108-12108/? D/1111: getToken 开始执行
    10-07 09:22:48.760 12108-12108/? D/1111: getResponse 开始执行
    10-07 09:22:48.760 12108-12108/? D/11111: setText 开始执行
    

    在 getToken 方法将协程挂起时,getResponse 函数永远不会运行,只有等 getToken 挂起结速将协程恢复时才会运行
    但是如果将上边的代码改造成这样

     GlobalScope.launch(Dispatchers.Unconfined){
              var token = GlobalScope.async(Dispatchers.Unconfined) {
                    return@async getToken()
                }
    
                var response  = GlobalScope.async(Dispatchers.Unconfined) {
                    return@async getResponse("token")
                }
    
                setText("setText")
                Log.d("1111", "setText${Thread.currentThread().name}")
                token.await()
                response.await()
            }
    
    
    10-07 09:52:24.575 16174-16174/? D/1111: setText 开始执行
    10-07 09:52:24.575 16174-16174/? D/1111: setTextmain
    10-07 09:52:24.687 16174-16200/? D/1111: getResponsekotlinx.coroutines.DefaultExecutor
    10-07 09:52:24.687 16174-16200/? D/1111: getResponse 开始执行
    10-07 09:52:24.882 16174-16200/? D/1111: getTokenkotlinx.coroutines.DefaultExecutor
    10-07 09:52:24.882 16174-16200/? D/1111: getToken 开始执行
    

    getResponse 函数会先在getToken函数前边运行,所以getResponse实际上是在获取
    token.await()结果的时候被阻塞了,getResponse获取不到getToken的执行结果就会一直阻塞
    可见协程是不阻塞的,但是可以通过官方的api控制它们的调用关系,await方法就是获取这个协程的结果,即使注释掉这个方法,这个函数本身依然会执行

    参考

    https://www.jianshu.com/p/76d2f47b900d
    http://dy.163.com/v2/article/detail/E2O7K6TA0511HSJK.html
    https://my.oschina.net/LucasZhu/blog/1518275
    https://www.jianshu.com/p/c655e0a944ae
    https://blog.csdn.net/zhaojunwei666/article/details/96433488
    https://blog.csdn.net/suyimin2010/article/details/91125803
    https://www.jianshu.com/p/76d2f47b900d
    官方设计文档

    相关文章

      网友评论

        本文标题:kotlin协程的使用与原理

        本文链接:https://www.haomeiwen.com/subject/fyzhyctx.html