美文网首页android
Android平台的Kotlin协程

Android平台的Kotlin协程

作者: denny_z | 来源:发表于2020-05-05 22:33 被阅读0次

    目录

    [toc]

    1、协程是什么

    如果我们去维基百科,可以找到一段类似的话:

    协程是一种非抢占式或者说协作式的计算机程序并发调度的实现,程序可以主动挂起或者恢复执行。

    又如果你看了网上的很多文章,它们也可能这么说:

    • 协程是轻量级的线程。
    • 协程没有用户态到内核态的切换。
    • 协程的调度是协作式的,而线程是抢占式的。

    我们先不说上面的这些说法对不对。反正,相信无论是那种方式,看了之后你还是一脸懵逼。

    事实上,协程是一种脱离语言的概念。从本质上说,协程就是一段程序,它能够被挂起,待会儿再恢复执行。这里的挂起和恢复执行都是程序主动控制的(所以叫协作式),而不是像线程那样是由操作系统调度的。每个语言对协程都有自己的实现(也可能压根儿就不支持协程,如Java)。这也是我们对协程这个概念感到模糊和混乱的原因,因为协程本身的定义就是模糊的,而每个语言的实现又都不太一样。

    这篇文章,就以Kotlin中的协程为例。我们先介绍协程的基本用法,再看下它是怎么实现的。

    2、为什么要用协程

    我们先来看一个例子。假设有一个耗时的方法,用于获取一个Account对象。耗时的原因,可能因为是需要IO操作,或者是需要大量CPU资源,又或者本身就是需要延时一段时间后执行。

    那么,在Android中,为了防止它阻塞主线程造成ANR,我们可以这么写:

    getAccount(object : Callback<Account> {
        override fun onFail(code: Int, message: String) {
            // show error
        }
    
        override fun onSuc(response: Account) {
            // Do something with Account
        }
    })
    

    这就是典型的回调式的写法。在getAccount方法的内部,会把获取Account的逻辑放到后台线程中异步执行,并在结果返回后通过回调的形式返回给上层。
    这种写法的一大问题是,代码杂乱不堪,我们很容易陷入到回调地狱中。例如,如果在接口访问失败后需要增加重试的逻辑,要怎么写?如果在成功后,还需要请求另一个接口,又要怎么写?一层又一层的回调,直觉告诉你就是在写bug。

    幸运的是,我们有Rxjava。利用RxJava,我们可以这么写:

    Observable.create { emitter: ObservableEmitter<Account> -> emitter.onNext(getAccount()) }
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe {
                    // Do something with Account
                }
    

    RxJava的链式调用,使得代码简洁了很多。但是RxJava也有自身的问题,它的操作符太抽象了,以至于RxJava的学习成本很大。例如说,还是上面的例子,如果我们想要增加出错重试的逻辑,要怎么做?如果不熟悉retry这个操作符,你可能要花点时间了。

    但是,在使用Kotlin的协程版本中,我们的代码是这个样子的:

    coroutineScope.launch { // 启动一个协程
        val account = withContext(Dispatchers.IO) {
            // getAccount的逻辑会在IO线程中执行
            getAccount()
        }
        // 自动切换回原来的线程
        doSomethingWithAccount(account)
    }
    

    上面代码最神奇的地方在于withContext(Dispatchers.IO),这行代码使得getAccount的逻辑会自动在IO线程执行。而更加神奇的地方在于,getAccount执行完毕后会自动切换回原来的线程执行doSomethingWithAccount。在这里,我们可以执行UI刷新的逻辑。这一切,都是编译器帮助我们完成的。整个过程中我们没有用到回调(至少没有显性地看到)。
    这就是协程最大的一个好处。简单的说,就是用同步的方式写异步的代码。这个特性,让我们的代码变得无比简洁。

    3、协程应用指北

    3.1、Gradle配置

    在Android中,如果我们要使用协程,首先需要在app的build.gradle中,添加依赖:

    dependencies {
        implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.3"
        implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.3"
    }
    

    这里引入的两个依赖分别是协程的核心库与平台库。kotlin是一个跨平台的语言。例如在js本身就是单线程的事件循环,这与Android有较大的差别。平台库的作用,就是这一层差别的抽象。

    3.2、创建第一个协程

    协程的创建方法有3种,我们一个一个来看。

    3.2.1、使用runBlocking顶层函数

    runBlocking {
        print("Hello World!")
    }
    

    这个顶层函数会阻塞线程,直至runBlocking中的内容执行完毕。所以,这个方法一般也不会用在业务开发中。毕竟,我们使用协程就是因为它并发的能力。

    3.2.2、GlobalScope

    GlobalScope.launch {
        print("Hello World!")
    }
    

    GlobalScope是一个全局的作用域,使用这种方法创建的协程,生命周期与APP的生命周期一致。
    因为不能控制生命周期,这种方式可能会造成内存泄漏,一般在业务开发中不太常见。

    3.2.3、使用CoroutineScope对象

    val context = SupervisorJob() + Dispatchers.Main
    val coroutineScope = CoroutineScope(context)
    coroutineScope.launch { 
        print("Hello World!")
    }
    

    这个也是官方推荐的方式,使用一个CoroutineScope对象来创建协程。创建CoroutineScope需要一个Context对象(这个Context对象和Android中常见的Context不是一个概念)。
    更简单的,我们也可以直接使用MainScope这个对象。
    上面例子中的GlobalScope本质也是一个CoroutineScope

    val scope = MainScope()
    scope.launch {
        print("Hello World!")
    }
    

    使用这种方式创建的协程,可以在必要的时候取消。

    scope.cancel()
    

    这在一些场合十分有用。例如,在Activity销毁的时候取消所有的网络请求。

    3.2.4、小结:创建协程的关键参数

    我们以launch为例,看一下协程启动都有那些参数。下面给出了launch函数的定义。

    public fun CoroutineScope.launch(
        context: CoroutineContext = EmptyCoroutineContext,
        start: CoroutineStart = CoroutineStart.DEFAULT,
        block: suspend CoroutineScope.() -> Unit
    ): Job
    

    可以看到,创建一个协程总共有5个关键的参数,分别是:

    • CoroutineScope 协程作用域
    • CoroutineContext 协程的上下文
    • CoroutineStart 协程的启动模式
    • block 协程体
    • Job 作业

    协程体没什么好说的,就像你调用Handle().post(Runnable{ })时传入的Runnable对象一样,是你希望交给协程去执行的东西。
    下面的篇幅,我们介绍剩下的4个参数。为了便于理解,介绍的顺序会略有调整。

    3.3、协程启动模式

    协程的启动模式定义在CoroutineStart中,是一个枚举类:

    public enum class CoroutineStart {
        DEFAULT,    // 立即执行协程体
        LAZY,   // 只有在有必要的情况下才执行协程体
        @ExperimentalCoroutinesApi
        ATOMIC, // 立即执行协程体,但在开始运行之前无法取消
        @ExperimentalCoroutinesApi
        UNDISPATCHED;   //  立即在当前线程执行协程体,直到第一个suspend调用
    }
    

    DEFAULTLAZY是我们最常用的模式。LAZY只有在有必要的情况下才执行协程体。可以看一下下面这个例子:

    val deferred = async(start = CoroutineStart.LAZY) {
        print("coroutine run")
        "Hello World"
    }
    print(deferred.await())   // 1⃣️
    

    例如,上面的代码,我们设置了启动模式为LAZY。因此,直到y1⃣️处的代码被调用,async所创建的协程才会开始执行。
    ATOMICUNDISPATCHED还是实验性质的API,这里暂不介绍了。

    3.4、Job

    Joblaunch方法的返回值,通常的理解是作业。Job表示了一个协程的工作任务。Job是一个接口,我们看一下它的定义:

    public interface Job : CoroutineContext.Element
    

    而这个CoroutineContext.Element又继承自CoroutineContext。也就是说,Job本质上还是个CoroutineContext
    Job是用来干嘛的?看一下Job的函数就明白了。

    public val isActive: Boolean
    public val isCompleted: Boolean
    public val isCancelled: Boolean
    
    public fun start(): Boolean
    public fun cancel(cause: CancellationException? = null)
    public suspend fun join()
    

    3.4.1、launchasync

    前面我们举了个例子,使用launch创建并启动了一个协程。
    launch定义在Builders中。事实上,在Builders中,除了launch,还有另一个常用的创建协程的方法async

    public fun CoroutineScope.launch(...): Job
    public fun <T> CoroutineScope.async(...): Deferred<T> {
    

    单看这两个函数的定义,除了返回值不同也没有什么太大的区别了。
    这个Deferred本质上就是继承自Job,最重要的一点就是多了一个await的函数。故名思义,我们可以通过这个函数来获取协程执行结果。
    配合上文中提到的CoroutineStart.LAZY,就可以实现只有在await被调用的时候才能才会开始执行的异步任务。
    launch通常用于那些不关心结果的耗时任务,而async通常用于那些需要返回值的耗时任务,如网络请求、读取数据库等。

    3.5、CoroutineScope

    CoroutineScope表示一个协程的作用域。CoroutineScope是一个接口,定义如下:

    public interface CoroutineScope {
        public val coroutineContext: CoroutineContext
    }
    

    可以看到,CoroutineScope的定义非常简单。每个CoroutineScope都持有CoroutineContext对象。
    CoroutineScope的一个作用就是在一些场合方便地取消所有已启动的协程。
    在创建协程的时候,我们可以在父协程内创建子协程。这个时候,父协程会限制子协程的生命周期, 子协程则继承父协程的上下文。
    GlobalScope就是CoroutineScope的一个子类。
    作用域还和异常的传播有关,这个我们放到后面再说。

    3.6、CoroutineContext

    协程创建过程中,接收CoroutineContext对象作为协程的上下文。CoroutineContext本质上是一个接口,它有很多的实现。协程上下文是协程创建过程中很重要的一个参数。

    在实际使用过程中,可以创建多个上下文,并使用+操作符将其连在一起。相同类型的Context,右边覆盖左边的。

    Dispatchers.Main + CoroutineName("Get AccountInfo Coroutine")
    

    如果把CoroutineContext的接口和List进行对比,你会觉得两者出奇地相似!

    public operator fun plus(context: CoroutineContext): CoroutineContext
    public fun minusKey(key: Key<*>): CoroutineContext
    public operator fun <E : Element> get(key: Key<E>): E?
    

    CoroutineContext可以说就是一个以Key为索引的List。所以,本质上,CoroutineContext就是一个数据结构而已。我们也可以通过Key来查找某个具体的CoroutineContext,如:

    coroutineContext[CoroutineName]
    

    接下来介绍一些常见的CoroutineContext的实现。

    3.6.1、CoroutineName

    CoroutineName的作用是为协程命名,类似于java中为线程添加名称:

    new Thread().setName("Get AccountInfo Thread");
    

    在协程中,通过下面的代码为创建的协程添加名称:

    launch(CoroutineName("Get AccountInfo Coroutine")) { 
    }
    
    2.6.2、Dispatchers 调度器

    Dispatchers中有4个默认的调度器可供使用,他们都继承自CoroutineDispatcher

    • Dispatchers.Unconfined 不指定线程, 如果子协程切换线程那么接下来的代码也运行在该线程上
    • Dispatchers.IO 适用于IO读写,底层用线程池实现
    • Dispatchers.Main 根据平台不同而有所差, Android平台上的实现为HandlerContext(这里也说明了,在Android平台上,把一个协程绑定到主线程上执行,最终还是回到了Android平台的Handler的那一套)
    • Dispatchers.Default 默认调度器, 在线程池中执行协程体, 适用于计算操作

    3.7、异常处理

    3.7.1、全局的异常处理

    无论是线程还是RxJava,都有默认的异常处理器。例如,我们可以为线程设置一个默认的异常处理:

    Thread.setDefaultUncaughtExceptionHandler { t: Thread, e: Throwable ->
        println("Thread '${t.name}' throws an exception with message '${e.message}'")
    }
    

    我们也可以为RxJava设置默认的异常处理器:

    RxJavaPlugins.setErrorHandler(e -> {
        println("Throws an exception with message '${e.message}'")
    });
    

    同样,协程也可以添加默认的异常处理器。

    // 创建异常处理器,本质上仍然是 coroutineContext
    val exceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable ->
        print("Throws an exception with message: ${throwable.message}")
    }
    MainScope().launch(exceptionHandler) {
        throw IllegalArgumentException("wrong argument")
    }
    

    上面这段程序在运行的时候并不会崩溃,抛出的IllegalArgumentException的异常被我们自定义的异常处理器捕捉到了。

    3.7.2、asyncawaitjoin

    上面我们以launch为例介绍了CoroutineExceptionHandler。但是值得注意的是,使用async时,我们发现异常默默地消失了。即使没有定义异常处理器,程序也不会崩溃。

    MainScope().async {
        throw IllegalArgumentException("wrong argument")
    }
    

    这是因为async的设计思路与launch不同。对async来说,异常只有在调用await的时候才会消费。这也很好理解嘛,当我期望获取async的结果时,程序已经发生了异常,没有办法给出正确的结果。此时,只好抛出一个异常。
    但是另一个方法join则不一样了。join只关心协程是否执行完成,但并不关心是异常结束还是正常结束。即使我们用launch替代async,结果也是一样的。在join的调用处并不会抛出异常。

    总结一下:

    • 对于async返回的deferred来说,它有两个方法。当发生异常时,join并不关心是否发生异常,只关心协程是否结束。而await则会在调用处抛出异常。
    • launch中未捕获的异常与async的处理方式不同,launch会直接抛出给父协程,如果没有父协程或父协程不响应(如supervisorScope,下面会介绍),那么就交给上下文中指定的 CoroutineExceptionHandler处理,如果没有指定,那传给全局的CoroutineExceptionHandler等等,而async则要等await来消费。
    • 不管是哪个启动器,在应用了作用域之后,都会按照作用域的语义进行异常扩散,进而触发相应的取消操作,对于 async来说就算不调用await来获取这个异常,它也会在coroutineScope当中触发父协程的取消逻辑。
      看下面这个例子:
    MainScope().launch {    // 1⃣️
        val deferred  = async(exceptionHandler) {
            throw IllegalArgumentException("wrong arg") // 2⃣️
        }
    
        try {
            // 尽管这里捕获了异常,但是在2⃣️处抛出的异常仍然会扩散到1⃣️处,导致崩溃
            deferred.await()
        } catch (e : IllegalArgumentException) {
            print("catch exception: ${e.message}")
        }
    }
    
    3.7.3、作用域

    异常的传播还和协程的作用域有关。
    在前面我们已经看到,子协程出现异常会导致父协程同时被取消。那么有没有什么办法,把错误限制在子协程中呢?
    答案就是使用supervisorScope

    • coroutineScope内部的取消操作是双向传播的,子协程未捕获的异常也会向上传递给父协程。如果一个子协程异常退出,那么父协程也会退出。同样,父协程的异常也会导致所有子协程异常终止。同时,这也是协程内部再启动子协程的默认作用域。
    • supervisorScope内部的取消操作是单向传播的,父协程向子协程传播,子协程的错误不会传播给父协程和它的兄弟协程。它更适合一些独立不相干的任务。

    同样,用几个例子来说明一下:
    我们先定义一个错误处理器:

    val exceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable ->
        print("Throws an exception, ${coroutineContext[CoroutineName]}, message: ${throwable.message}")
    }
    

    接着,在coroutineScope作用域中启动一个协程:

    MainScope().launch(exceptionHandler + CoroutineName("coroutine 1")) {   // 1⃣️
        coroutineScope {
            launch(exceptionHandler + CoroutineName("coroutine 2")) {   // 2⃣️
                throw IllegalArgumentException("oops!")
            }
        }
    }
    

    最终的结果是:

    Throws an exception, CoroutineName(coroutine 1), message: oops!

    如果把上面这个例子中的coroutineScope换成supervisorScope,那么结果就变成了:

    Throws an exception, CoroutineName(coroutine 2), message: oops!

    在上面这个🌰中,如果我们使用supervisorScope,协程2⃣️发生的错误首先尝试向上传播到1⃣️,但是因为作用域的限制,父协程不响应异常处理,因此最终交给2⃣️处的异常处理器处理。
    但是,需要注意的是,supervisorScope只作用域其直接子协程。也就是说,在supervisorScope的子协程中再创建的子协程,遵守默认的作用域,也就是coroutineScope

    MainScope().launch(exceptionHandler + CoroutineName("coroutine 1")) {   // 1⃣️
        supervisorScope {
            launch(exceptionHandler + CoroutineName("coroutine 2")) {   // 2⃣️
                launch(exceptionHandler + CoroutineName("coroutine 3")) {   // 3⃣️
                    throw IllegalArgumentException("oops!")
                }
            }
        }
    }
    

    这段代码运行的结果是:

    Throws an exception, CoroutineName(coroutine 2), message: oops!

    这里的协程3⃣️是supervisorScope的子协程的子协程,因此,在3⃣️处发生的异常仍然会向上传播,最终被2⃣️捕捉到。

    3.8、互斥

    协程本质上仍是在线程上运行的。既然是在线程上,那么一定会有同步的问题。常见的解决线程之间同步问题的工具有:

    • synchronized 关键字
    • ReentrantLock等java.util.concurrent.locks包中的锁
    • AtomicInteger 等java.util.concurrent.atomic包中的原子类
    • ConcurrentHashMap等线程安全的集合
      除此之外,kotlin还为我们提供了两个工具:
    3.8.1、Mutex

    synchronized等关键字,在获取不到锁的时候会阻塞线程。而Mutex通过挂起函数,在没有获取锁就挂起协程,获取后再恢复协程,协程挂起时线程并没有阻塞就可以执行其他逻辑。

    mutex.withLock {
        counter++
    }
    
    3.8.2、ThreadLocal

    Java提供了ThreadLocal用来保存线程局部数据,每个线程中的值都是单独的。协程中同样可以通过.asContextElement实现协程版的局部数据。

    val threadLocal = ThreadLocal<String>()
    GlobalScope.launch(threadLocal.asContextElement("initial value")) {
        print("thread: ${Thread.currentThread()} threadLocal: ${threadLocal.get()}")
        yield()
        print("thread: ${Thread.currentThread()} threadLocal: ${threadLocal.get()}")
    }
    

    结果如下:

    thread: Thread[DefaultDispatcher-worker-1,5,main] threadLocal: initial value
    thread: Thread[DefaultDispatcher-worker-3,5,main] threadLocal: initial value

    可以看到,虽然线程切换了,但是从threadLocad中获取的值并没有发生改变。

    3.9、Android最佳实践

    Kotlin官网上给出了一个Android的最佳实践的例子,我们可以定义一个抽象的Activity,通过重写onDestroy方法,在退出的时候取消所有的协程,避免内存泄漏:

    abstract class ScopedActivity: Activity(), CoroutineScope by MainScope(){
        override fun onDestroy() {
            super.onDestroy()
            cancel()
        }
    }
    

    想要启动一个协程,我们直接调用launch方法就好。

    launch {
        // do something you want
    }
    

    4、协程源码解析

    4.1、suspend修饰符

    suspend是一个修饰符,含义是挂起,可以用在任何函数上。在开篇我们举例的的时候曾提到过:

    suspend fun backupLazy() = {
        // here for you to get LazyInfo
    }
    

    然而实际上,suspend修饰符并不能实现挂起的操作。这个关键字,并不会帮助我们切换线程。它仅仅是提示函数的调用者,这是一个耗时的函数,因此需要放到协程中执行,仅此而已。

    4.2、Continuation续体

    我们来看一下kotlin中协程的接口是怎么写的:

    public interface Continuation<in T> {
        public fun resumeWith(result: Result<T>)
    }
    

    这个resumeWith可能还不是很明显,但是如果看到下面两个扩展函数:

    public inline fun <T> Continuation<T>.resume(value: T): Unit =
        resumeWith(Result.success(value))
    
    public inline fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =
        resumeWith(Result.failure(exception))
    

    与我们常见的callback接口的定义对比一下:

    public interface Callback<T> {
        void onFail(int code, String message);
        
        void onSuc(T response);
    }
    

    是不是感觉一模一样!onSuconFail分别于resumeWithresumeWIthException相互对应。所以,有些人说协程本质上是回调,确实也是这么一回事。

    假设我们在主线程创建了一个协程并绑定到IO线程,那么在协程启动后,这个协程就从主线程上剥离了。主线程会继续执行剩下的代码,而与IO线程绑定的这个协程执行完后,就通过这个resumeWith的方法,重新绑定到主线程上执行。

    在协程官方文档上,把这个Continuation称之为续体。在编译的过程中,一个完整的协程被分割切块成一个又一个续体。每一次挂起之后,都会对应着一次resumeWith恢复。

    4.3、CPS 续体传递风格

    CPS的全称是Continuation-Passing-Style,翻译为续体传递风格,这是个有点抽象的概念。
    我们先看一下挂起的函数声明:

    suspend fun <T> CompletableFuture<T>.await(): T
    

    在经过所谓的CPS变换后,它的函数签名变成了这样:

    fun <T> CompletableFuture<T>.await(continuation: Continuation<T>): Any?
    

    这种变换,就成为CPS变换。主要的变化有两点:

    • 续体作为参数传入await方法
    • 返回值变成了Any?
      其中,第二点变化是因为,CPS变换后,这个函数除了要返回它本身的返回值,还要返回一个标记——COROUTINE_SUSPENDED(我们会在下文看到),这个标记返回时,表示表示这个挂起函数会发生事实上的挂起操作。
      什么叫事实上的挂起操作?我们举个例子:
    val deferred = async {  // 1⃣️
        // do something
    }
    delay(1000)
    deferred.await()    // 2⃣️
    

    上述代码执行到标记为2⃣️的地方,是否挂起取决于协程(1⃣️处创建)是否执行完成。如果已经执行完成了,那么2⃣️直接把结果拿过来用就行。如果没有完成,那么2⃣️就会发生事实上的挂起,等待协程执行完毕。

    4.4、ContinuationInterceptor 续体拦截器

    上面已经介绍了协程续体和CPS变换。我们创建的协程,在每一个挂起点,都对应着一次resumeWith的操作。那协程又是怎么绑定到线程上执行的?
    这就涉及到了ContinuationInterceptor
    ContinuationInterceptor是拦截器,拦截器可以做的事情有很多,切换线程只是它可以实现的目的之一。我们也可以实现自己的拦截器,用来打印一些日志。
    照例先看一下ContinuationInterceptor的定义:

    public interface ContinuationInterceptor : CoroutineContext.Element {
        public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
        public fun releaseInterceptedContinuation(continuation: Continuation<*>)
    }
    

    我列举了两个最重要的函数。其中,interceptContinuation会在ContinuationImpl中被调用,从而实现拦截的目的。

    val intercepted = continuation.context[ContinuationInterceptor]?.interceptContinuation(continuation) ?: continuation
    
    4.4.1、HandlerContext

    还记得前文提到的Dispatchers.Main吗?在Android平台,它的实现就是HandlerContextHandlerContext也是继承自ContinuationInterceptor。这里,用HandlerContext来举个例子,说明在Android平台,协程是怎么切换到主线程中执行的。
    大段的代码就不列举了,我们精简一下,只看它的一个函数:

    override fun dispatch(context: CoroutineContext, block: Runnable) {
        handler.post(block)
    }
    

    好嘛!万变不离其宗,最终还是回到了我们熟悉的Handler
    这也说明了,协程本质上只是编译器的一层封装。它底层所依赖的,仍旧是Java的线程池(例如Dispatchers.IO),或是平台的一些特性(如Handler)。

    4.5、状态机

    出于性能的考虑,协程在编译挂起函数时会将函数体编译为状态机,这样可以避免创建过多的类和对象。
    我们看一个例子:

    val a = a()
    val y = foo(a).await() // 挂起点 #1
    b()
    val z = bar(a, y).await() // 挂起点 #2
    c(z)
    

    上面的代码有2个挂起点。它在编译为java的字节码后,大致是这样的:

    class <anonymous_for_state_machine> extends SuspendLambda<...> {
        // 状态机当前状态
        int label = 0
        
        // 协程的局部变量
        A a = null
        Y y = null
        
        void resumeWith(Object result) {
            if (label == 0) goto L0
            if (label == 1) goto L1
            if (label == 2) goto L2
            else throw IllegalStateException()
            
          L0:
            // 这次调用,result 应该为空
            a = a()
            label = 1
            result = foo(a).await(this) // 'this' 作为续体传递
            if (result == COROUTINE_SUSPENDED) return // 如果 await 挂起了执行则返回
          L1:
            // 外部代码传入 .await() 的结果恢复协程 
            y = (Y) result
            b()
            label = 2
            result = bar(a, y).await(this) // 'this' 作为续体传递
            if (result == COROUTINE_SUSPENDED) return // 如果 await 挂起了执行则返回
          L2:
            // 外部代码传入 .await() 的结果恢复协程
            Z z = (Z) result
            c(z)
            label = -1 // 没有其他步骤了
            return
        }          
    }    
    

    注1:上面的代码是伪代码。
    注2:以上代码摘自Kotlin的官方设计文档

    一个挂起函数会被编译成一个匿名类,这个匿名类中的resumeWith函数实现了状态机。成员变量label代表了当前状态机的状态,每一个续体(即挂起点中间的部分以及挂起点与函数头尾之间的部分)都各自对应了一个状态,当函数运行到每个挂起点时,label的值都受限会发生改变,并且当前的续体(也就是代码中的this)都会作为实参传递给发生了CPS变换的挂起函数,如果这个挂起函数没有发生事实上的挂起,函数继续运行,如果发生了事实上的挂起,则函数直接return
    由于label记录了状态,所以,在协程恢复的时候,可以根据状态使用goto 语句直接跳转至上次的挂起点并向后执行,这就是协程挂起的原理。
    顺便提一句,虽然Java中没有goto语句,但是class字节码中支持goto

    5、协程高级应用

    Kotlin中的协程还提供了ChannelFlow 等API可供调用。
    ChannelSelect的结合可以实现协程版的NIO,Flow则是Kotlin协程与响应式编程模型结合的产物,同样分为发射数据的上游、接收数据的下游以及连接上下游的操作符三个部分。

    6、总结

    通过上面的源码分析,我们可以发现,协程是基于线程实现的一层更加上层的API,这个API能够帮助我们用同步的方式写出异步执行的代码。
    除此之外,它好像也没有什么特别神秘的东西了。本质上,协程仍旧是基于线程的,它并不是什么空中楼阁,可以凭空存在。

    6.1、协程真的更加高效吗

    还剩下的一个问题是,协程真的更加高效吗?
    在Kotlin的官方文档上,有这样一个例子,创建了10w个协程与10w个线程进行对比。此时,协程并没有什么问题,而线程则直接发生了OOM。

    repeat(100_000) {
        launch {
            delay(1000L)
            print(".")
        }
    }
    
    repeat(100_000) {
        thread {
            Thread.sleep(1000L)
            print(".")
        }
    }
    

    然而,通过前面的分析,我们已经明白,协程本质上是一层更上的API而已。如果考虑API的开销,协程可能会比直接使用原生的Handler或者Java的线程池更慢,并不存在性能上的优势。
    官网的例子的误导性在于,10w个协程最终都是绑定到线程池上的,并不是真正创建了10w个线程。如果不是创建10w个线程而是使用线程池,那么就不会发生OOM了。

    使用下面两个测试程序,在小米MIX 2进行测试,使用JAVA 线程池的API的性能是协程的2~3倍。这部分的开销。可见,协程对比原生的线程池,并没有什么性能上的优势。

    private const val REPEATE_TIMES = 100_000
    private const val DELAY_TIME = 1000L
    
    // 3800ms
    private fun coroutinesTest() {
        val startTime = System.currentTimeMillis()
        val count = AtomicInteger()
        val dispatcher = Executors.newSingleThreadScheduledExecutor().asCoroutineDispatcher()
    
        GlobalScope.launch {
            repeat(REPEATE_TIMES) {
                launch(dispatcher) {
                    delay(DELAY_TIME)
                    doIf(count.incrementAndGet() == REPEATE_TIMES) {
                        val endTime = System.currentTimeMillis()
                        print(endTime - startTime)
                    }
                }
            }
        }
    }
    // 1400ms
    private fun threadTest() {
        val startTime = System.currentTimeMillis()
        val count = AtomicInteger()
        val executors = Executors.newSingleThreadScheduledExecutor()
        repeat(REPEATE_TIMES) { executors.schedule({
            doIf(count.incrementAndGet() == REPEATE_TIMES) {
                val endTime = System.currentTimeMillis()
                print(endTime - startTime)
            }
        }, DELAY_TIME, TimeUnit.MICROSECONDS) }
    }
    

    相关文章

      网友评论

        本文标题:Android平台的Kotlin协程

        本文链接:https://www.haomeiwen.com/subject/vkcbghtx.html