美文网首页
Kotlin协程

Kotlin协程

作者: Vgecanshang | 来源:发表于2018-08-24 17:45 被阅读0次

    前言

     今年的Google开发者大会已表明将Kotlin作为其正式的语言,现Google大力主推Kotlin, 在GitHub上的官方Demo基本都是用Kotlin编写的,在学习Kotlin的时候,觉得有必要把协程这部分单独拎出来,现对Kotlin协程写篇文章记录。

    Kotlin协程

     协程,即协作代码段。相对线程而言,协程更适合于用来实现彼此熟悉的程序组件。协程提供了一种可以避免线程阻塞的能力,这就是它的核心功能。

     理解:子任务协作运行,优雅的处理异步问题解决方案

     协程的概念其实是很早就被提出的,下面引用BLOG中的一段回答来讲解协程究竟是怎么来的:

    1.一开始大家想要同一时间执行多个代码任务,于是就有了并发。从程序员的角度可以看成是多个独立的逻辑流,内部可以是多CPU并行,也可以是单CPU时间分片。

    2.但是一并发就有上下文切换的问题,干了一半跑去处理另一件事,我这做了一半的东西怎么保存。进程就是这样抽象出来的一个概念,搭配虚拟内存、进程表之类,用来管理独立的程序运行、切换。

    3.后来硬件水平提升了,一台电脑上有了好几个CPU就可以一人跑一进程,就是所谓的并行

    4.但是一并行,进程数一高,大部分系统资源就得用于进程切换的状态保存。后来搞出线程的概念,大致意思就是这个地方阻塞了,但我还有其他地方的逻辑流可以计算,不用特别麻烦的切换页表,刷新TLB,只要把寄存器刷新一遍就行。

    5.如果你嫌操作系统调度线程有不确定性,不知道什么时候开始,什么时候切走,我自己在进程里面手写代码去管理逻辑调度这就是用户态线程

    6.而用户态线程是不可剥夺的,如果一个用户态线程发生了阻塞,就会造成整个进程的阻塞,所以进程需要自己拥有调度线程的能力。而如果用户态线程将控制权交给进程,让进程调度自己,这就是协程

    后来我们的内存越来越大,操作系统的调度也越来越智能,就慢慢没人再去花时间去自己实现用户态线程、协程这些东西了。

     协程把异步编程放入库中来简化这类操作。程序逻辑在协程中顺序表述,而底层的库会将其转换为异步操作。库会将相关的用户代码打包成回调,订阅相关事件,调用其执行到不同的线程(甚至不同的机器),而代码依然像顺序执行那么简单。

    为什么又要用协程了?

     既然上面说协程已经淘汰在历史的长河中了,为什么现在又声势浩大的跑来了?
     前面我们讲由于操作系统的多线程调度越来越智能,硬件设备也越来越好, 这大幅度提升了线程效率,因此正常情况下线程的效率是高于协程的,而且是远高于协程的。
     那么线程在什么情况下效率是最高的?就是在一直run的情况下。但是线程几乎是很难一直run的,比如:线程上下文切换、负责计算阻塞、IO阻塞。
     于是又有人想起了协程,这个可以交给代码调度的东西。

    协程的本质作用

     协程实际上就是极大程度的复用线程,通过让线程满载运行,达到最大程度的利用CPU,进而提升应用性能。
     什么意思呢?

    举一个例子:在Android上发起一个网络请求

    step1:主线程创建一个网络请求的任务。
    step2:通过一个子线程去请求服务端响应。
    step2.1:等待网络传递请求,其中可能包括了TCP/IP的一系列进程。
    step2.2:等待服务器处理,比如你请求一个列表数据,服务器逻辑执行依次去缓存、数据库、默认数据找到应该返回给你的数据,再将数据返回给你。
    step2.3:又是一系列数据回传。
    step3:在子线程中获取到服务器返回的数据。将数据转换成想要的格式。
    step4:在主线程中执行某个回调方法。

     在上面例子中,第2步通常我们会用一个线程池存放一批创建好的线程做复用,防止多次创建线程。
     但是使用了线程池,就会遇到第一个问题,池中预存多少线程才最适合?存少了,后面的任务需要等待有空余的线程才能开始执行;存多了,闲置的线程浪费内存。这个问题实际上海是线程利用率不高的问题。

    上面的例子如果换做协程是这么个流程:

    step1:主线程创建一个协程,在协程中创建网络请求的任务。
    step2:为协程分配一个执行的线程(本例中就是子线程了),在线程中去请求服务端响应。
    step2.1:(接下来会发生阻塞),挂起子线程中的这个协程,等待网络传递请求,其中可能包括了TCP/IP的一系列过程。
    step2.2:协程依旧处理挂起状态,等待服务器处理,比如你请求一个列表数据,服务器逻辑执行依次去缓存、数据库、默认数据找到应该返回给你的数据,再将数据回传给你。
    step2.3:协程依旧处理挂起状态,又是一系列的数据回传。
    step3:获取到服务器返回的数据,在子线程中恢复挂起的协程。将数据转换成想要的格式。
    step4:在主线程中执行某个回调方法。

     在上面的例子中,整个步骤没有发生任何改变,但是因为引入了协程概念。当线程中的协程发生了挂起,线程依旧是可以继续做事的,比如开始执行第二个协程,而协程的挂起是一个很轻的操作(其内在只是一次状态机的变更,就是一个switch语句的分支执行,详细内容后面有)。这就大大提升了多任务并发的效率,同时极大的提升了线程的利用率。

      这就是协程的本质——极大程度的复用线程,通过让线程满载运行,达到最大程度的利用CPU,进而提升应用性能

    协程配置(以AS中为例)

     在app的build.gradle中增加如下配置:

    kotlin {
        experimental {
            coroutines 'enable'
        }
    }
    

     并添加如下依赖:

    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:0.20'
    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:0.20'
    

    配置截图:


    image.png

    经过上面的步骤Corutine的配置就已经完成了。接下来就可以使用Coroutine了。

    Kotlin协程使用

     在Kotlin上, 使用协程只需要知道两个方法和他们的返回类型,就可以很熟悉的用上协程了。分别是:

    fun launch(): Job
    fun async(): Deferred
    

    launch方法

     从方法名就能看出,launch表示启动一个协程。
    下面是launch的源码:

    public fun launch(
        context: CoroutineContext = DefaultDispatcher,
        start: CoroutineStart = CoroutineStart.DEFAULT,
        parent: Job? = null,
        block: suspend CoroutineScope.() -> Unit
    ): Job {
        val newContext = newCoroutineContext(context, parent)
        val coroutine = if (start.isLazy)
            LazyStandaloneCoroutine(newContext, block) else
            StandaloneCoroutine(newContext, active = true)
        coroutine.initParentJob(newContext[Job])
        start(block, coroutine, coroutine)
        return coroutine
    }
    

     launch()方法接收三个参数,通常很少用到第二个参数。第一个参数是一个协程的上下文,CoroutineContext不仅可以用于在协程跳转的时刻传递数据,同时最主要的功能,是用于表明协程运行与恢复时的上下文环境。
     通常Android在用的时候都是传一个UI,就表示在UI线程启动协程,或者传一个CommonPool表示在异步启动协程,还有一个Unconfined表示不指定,在哪个线程调用就在哪个线程恢复。

     下面贴出一个使用实例:

    fun test(){
        
        val UI = HandlerContext(Handler(Looper.getMainLooper()) , "UI")
        launch (UI) {
            val isUIThread = Thread.currentThread() == Looper.getMainLooper().thread
            println("UI::==123=$isUIThread")
        }
    
        launch (CommonPool) {
            val isUIThread = Thread.currentThread() == Looper.getMainLooper().thread
            println("CommonPool::==456=$isUIThread")
        }
    }
    
    //输出
    UI::=123==true
    CommonPool::==456=false
    

    Job对象

     launch()方法会返回一个job对象,job对象常用的方法有三个,叫startjoin , cannel。分别对应了协程的启动、切换至当前协程、取消。

    start()方法使用实例:

    fun test(){
        //当启动类型设置成LAZY时,协程不会立即启动,而是手动调用start()后它才会启动。
        val job = launch (UI , CoroutineStart.LAZY) {
            println("hello lazy")
        }
        job.start()
    }
    

     join()方法就比较特殊,它是一个suspend方法。suspend修饰的方法(或闭包)只能调用被suspend修饰过的方法(或闭包)。方法声明如下:

    public suspend fun join()
    

     因此,join()方法只能在协程体内部使用,跟它的功能:切换至当前协程所吻合。

     fun test(){
    
        val job1 = launch (UI , CoroutineStart.LAZY) {
    
            println("launch test: hello1")
    
        }
    
        val job2 = launch (UI) {
    
            println("launch test: hello2")
    
            job1.join()
    
            println("launch test: hello3")
    
        }
    
    }
    
    //输出
    
    07-23 14:10:33.509 12607-12607/[cy.com.kotlindemo](http://cy.com.kotlindemo/) I/System.out:launch test: hello2
    
    07-23 14:10:33.614 12607-12607/[cy.com.kotlindemo](http://cy.com.kotlindemo/) I/System.out: launch test: hello1
    
    07-23 14:10:33.930 12607-12607/[cy.com.kotlindemo](http://cy.com.kotlindemo/) I/System.out: launch test: hello3
    
    

    async()方法

     async()方法也是创建一个协程并启动,甚至连方法的声明都跟launch()方法一模一样。
     不同的是,方法的返回值,返回的是一个Deferred对象。这个接口是Job接口的子类。
     因此上文介绍的所有方法,都可以用于Deferred的对象。

    Drferred最大的一个用处在于它特有的一个方法await()

    public suspend fun await(): T
    

    await()可以返回当前协程的执行结果,也就是你可以这样写代码:

    fun test(){
    
        val deferred1 = async(CommonPool){
            "heello1"
        }
    
        val deferred2 = async(UI){
            println("hello2")
            println(deferred1.await())
        }
    }
    

     你发现神奇的地方了吗?我让一个工作在主线程的协程,获取到了一个异步协程的返回值

     这意味着,我们以后网络请求、图片加载、文件操作说明的,都可以丢到一个异步的协程中去,然后在同步代码中直接返回值,而不再需要去写回调了。

     这就是我们经常使用的一个最大的特性。

    附上一个使用launch和async的简单例子:

    fun test(){
        //每秒输出两个数字
        val job1 = launch (Unconfined , CoroutineStart.LAZY) {
            var count = 0
             while (true){
                count ++
                //delay()表示将这个协程挂起500ms
                delay(500)
                println("test job1: count::$count")
            }
        }
    
        //job2会立刻启动
        val job2 = async (CommonPool) {
            job1.start()
            "test job2: 第二个job2"
        }
    
        launch(UI){
            delay(3000)
            job1.cancel()
            //await()的规则是:如果此刻job2已经执行完则立刻返回结果,否则等待job2执行
            println(job2.await())
        }
    }
    
    //最终输出6次,job1就被cancel了
    07-23 14:42:30.779 27525-27590/[cy.com.kotlindemo](http://cy.com.kotlindemo/) I/System.out: test job1: count::1
    
    07-23 14:42:31.281 27525-27590/[cy.com.kotlindemo](http://cy.com.kotlindemo/) I/System.out: test job1: count::2
    
    07-23 14:42:31.782 27525-27590/[cy.com.kotlindemo](http://cy.com.kotlindemo/) I/System.out: test job1: count::3
    
    07-23 14:42:32.283 27525-27590/[cy.com.kotlindemo](http://cy.com.kotlindemo/) I/System.out: test job1: count::4
    
    07-23 14:42:32.785 27525-27590/[cy.com.kotlindemo](http://cy.com.kotlindemo/) I/System.out: test job1: count::5
    
    07-23 14:42:33.286 27525-27590/[cy.com.kotlindemo](http://cy.com.kotlindemo/) I/System.out: test job1: count::6
    
    07-23 14:42:33.316 27525-27525/[cy.com.kotlindemo](http://cy.com.kotlindemo/) I/System.out: test job2: 第二个job2
    
    

     协程是通过编码实现的一个任务,它和操作系统或者JVM没有任何关系,它的存在更类似于虚拟的线程。
     下面是一个示例:

    val UI = HandlerContext(Handler(Looper.getMainLooper()) , "UI")
    launch (UI) {
        folder.listFiles().filter{
            it.getName().endsWith(".png")
        }.forEach{
            val job = async (CommonPool) {
                getBitmap(it)
            }
            iamgeLayout.addImage(job.await())
        }
    }
    

     Kotlin的语法会让很多人觉得launch()async()是两个协程方法。其实不然,真正的协程是launch()传入的闭包参数。当launch()调用的时候,会启动一个协程(本质上并不一定是立即启动,后面将会解释)。
     async()方法调用的时候又启动了一个协程,此刻外部协程的状态(包括CPU、方法调用、变量信息)会被暂存,进而切换到async()启动的协程执行。

     在上例中,launch()async()这两个方法都显式传入了两个参数:

    1. 第一个参数是一个协程的上下文,类型是CoroutineContext
        CoroutineContext不仅可用于在协程跳转的时刻传递数据,同时最主要的功能,也是在本例中的作用是用于表明协程运行与恢复时的上下文环境。
      例如launch()方法中的UI参数,它实际上是一个封装了HandleCoroutineContext对象。
    val UI = HandlerContext(Handler(Looper.getMainLooper()) , "UI")
    

    对应的还有Swing,当然在Android中是没有这个对象的,但在Java工程中是有的:

    object Swing : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor{
        override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = 
            SwingContinuation(continuation)
    }
    
    1. 第二个参数是一个lambda表达式,也就是协程体。Kotlin语法:当一个lambda是函数的最后一个参数的时候,lambda可以写在圆括号外面

    suspend修饰符

     suspend用于修饰会被暂停的函数。
     一个协程的方法(或闭包)必须被suspend修饰,同时suspend修饰的方法(或闭包)只能被suspend修饰过的方法(或闭包)调用。

     我们知道,Kotlin的闭包(lambda)在被编译后是转换成了内部类对象,而一个被suspend修饰的闭包,就是一个特殊的内部类了。例如下面的例子:

    fun test(){
        launch {
            val job = async {
                "string"
            }
            println("=======${job.await()}")
        }
    }
    

    当它被编译以后,launch()传入的闭包会被编译成下面的样子:

    final class Main$test$1 extends CoroutineImpl implements Function2<CoroutineScope, Continuatin<? super Unit>, Object>{
        public final Continuation<Unit> create(@NotNull coroutineScope $receiver, @NotNull Continuaytion<? super Unit> continuation){
        }
        
        public final Object invoke(@NotNull CoroutineScope $receiver, @NotNull Continuation<? super Unit> continuation){
        }
        
        public final Object doResume(@Nullable Ojbect obj, @Nullable Throwable th){
        }
    }
    

     而如果是一个普通方法被suspend修饰了以后,则只是会多出一个参数,例如一个普通的test()无参内容方法用suspend修饰了以后会被编译成这样:

    public final Object test(Continuation<? super Unit> continuation){
        return Unit.INSTANCE;
    }
    

     可以看到不论怎样,都会具备一个Continuation的对象。而这个Continuation就是真正的Kotlin的协程。

    协程的挂起与恢复

     理解了suspend做的事情后,再来看Kotlin的协程。上面的代码中涉及到一个协程切换的情况。就是在launch()调用的时候,启动一个协程就是suspend修饰的闭包参数。在launch()启动协程内,async()又启动了一个协程。

    实际上协程的切换,就是一个挂起当前协程,启动新协程的过程。

    协程启动流程

     挂起是指什么意思?首先要知道协程的启动流程。
    launch()源码是这样的:

    public fun launch(
        context: CoroutineContext = DefaultDispatcher,
        start: CoroutineStart = CoroutineStart.DEFAULT,
        parent: Job? = null,
        block: suspend CoroutineScope.() -> Unit
    ): Job {
        val newContext = newCoroutineContext(context, parent)
        val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
        coroutine.initParentJob(newContext[Job])
        start(block, coroutine, coroutine)
        return coroutine
    }
    

     我们看到声明,start是一个枚举对象,默认值是DEFAULT,这里实际上是调用了枚举的invoke()方法。
     我们去看下CoroutineStart这个枚举类的源码(关键部分):

    public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>) =
    when (this) {
        CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion)
        CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
        CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
        CoroutineStart.LAZY -> Unit // will start lazily
    }
    

     看到CoroutineStart.DEFAULT启动了协程block.startCoroutineCancellable(receiver,completion),再看startCoroutineCancellable这个函数:

    internal fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) =
    createCoroutineUnchecked(completion).resumeCancellable(Unit)
    

     最终会调用createCoroutineUnchecked,这是一个扩展方法,它的声明如下:

    @SinceKotlin("1.1")
    @kotlin.jvm.JvmVersion
    public fun <T> (suspend () -> T).createCoroutineUnchecked(
        completion: Continuation<T>
    ): Continuation<Unit> =
        if (this !is kotlin.coroutines.experimental.jvm.internal.CoroutineImpl)
            buildContinuationByInvokeCall(completion) {
                @Suppress("UNCHECKED_CAST")
                (this as Function1<Continuation<T>, Any?>).invoke(completion)
        }
        else
            (this.create(completion) as kotlin.coroutines.experimental.jvm.internal.CoroutineImpl).facade
    

     这段代码中,通过判断this是不是CoroutineImpl来做不同的操作。而this是什么?是一个有suspend修饰的闭包R.()->T,也就是前面launch()的参数传入的闭包。

     还记得前面讲过的suspend修饰的闭包在编译后会变成什么吗?刚好是一个CoroutineImpl类的对象。因此这里是调用了闭包的create()方法,最终将闭包创建成了Continuation对象并返回。
     这也验证了前面讲的:Continuation就是真正的Kotlin的协程

     最后在创建好协程对象后,又会调用协程Continuationresume()方法(代码在上面suspend修饰的的函数编译后),而协程的resume()方法又会调用回编译后suspend闭包转换成的那个类里面的doResume方法(后面有介绍这里)。
     所以绕一圈又回来了。

    协程的挂起

     明白了协程的启动流程以后,再来看挂起就清晰多了。我们看下面的代码:

    public final Object doResume(Object obj, Throwable th) {
        StringBuilder append;
        Object await;
        Deferred job;
        switch (this.label) {
            case 0:
                job = DeferredKt.async$default(null, null, (Function2) new 1(null), 3, null);
                append = new StringBuilder().append("========");
                this.L$0 = job;
                this.L$1 = append;
                this.label = 1;
                await = job.await(this);
                if (await == coroutine_suspended) {
                    return coroutine_suspended;
                }
                break;
            case 1:
                StringBuilder stringBuilder = (StringBuilder) this.L$1;
                job = (Deferred) this.L$0;
                if (th == null) {
                    append = stringBuilder;
                    await = obj;
                    break;
                }
                throw th;
            default:
                throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
        }
        System.out.println(append.append((String) await).toString());
        return Unit.INSTANCE;
    }
    

     在switch()内的lable变量,它就是标识符,典型的状态机设计。当前在执行的协程有多少个可能的状态,就会有多少位。

     首先看label为0时的代码:

     job = DeferredKt.async$default(null, null, (Function2) new 1(null), 3, null);
                append = new StringBuilder().append("========");
                this.L$0 = job;
                this.L$1 = append;
                this.label = 1;
                await = job.await(this);
                if (await == coroutine_suspended) {
                    return coroutine_suspended;
                }
    

     首先是job的声明并返回job对象,它对应的kotlin代码是上面的:

    val job = async{
        "string"
    }
    

     接着是StringBuilder的append(),这个应该就不用说了。
    之后我们看到,有连个临时的变量L$0L$1,它们时用来存储当前协程的临时变量所生成的对象,由语法分析后判断当前协程只需要两个临时变量就能保存所有变量的信息了,所以就只生成了两个。
     再之后,状态就被设置为1了,表示即将进入下一个协程了。
    job.await()启动了一个协程,这个方法返回了一个Objectcoroutine_suspended表示协程还在执行,还没有执行完。

     因此这里的逻辑就是启动一个协程,如果这个协程是可以立即执行完的,那就返回结果;否则直接return结束当前方法,等待下一次状态改变被触发,而这个结束当前方法,处于等待的时刻,就是被挂起的时候。

    内部协程的切换

     在协程方法async()返回的是Deferred接口类型的对象,这个接口也继承了Job接口,是它的子类。
    在前面的例子中,async()返回的实际对象是DeferredCoroutine这个类的对象,它实现了Deferred接口,更重要的是,它实现了await()接口方法。还是看代码:

    @Suppress("UNCHECKED_CAST")
    private open class DeferredCoroutine<T>(
        parentContext: CoroutineContext,
        active: Boolean
    ) : AbstractCoroutine<T>(parentContext, active), Deferred<T> {
        override fun getCompleted(): T = getCompletedInternal() as T
        suspend override fun await(): T = awaitInternal() as T
        override val onAwait: SelectClause1<T>
            get() = this as SelectClause1<T>
    }
    

    await()其实是awaitInternal()的代理,它通过一个lock-free循环,保证一定等到异常或者一个叫startInternal()的方法执行完成才会返回。
    startInternal()方法的作用是在启动类型start=LAZY时,保证协程初始化完成,所以在本例中是没有意义的。在本例中有意义的是紧跟着这个方法后面调用的awaitSuspend()

    protected suspend fun awaitInternal(): Any? {
    // fast-path -- check state (avoid extra object creation)
        while(true) { // lock-free loop on state
            val state = this.state
            if (state !is Incomplete) {
            // already complete -- just return result
                if (state is CompletedExceptionally) throw state.exception
                return state
    
            }
            if (startInternal(state) >= 0) break // break unless needs to retry
        }
        return awaitSuspend() // slow-path
    }
    
    //------>
    
    private suspend fun awaitSuspend(): Any? = suspendCancellableCoroutine { cont ->
        cont.disposeOnCompletion(invokeOnCompletion {
            val state = this.state
            check(state !is Incomplete)
            if (state is CompletedExceptionally)
                cont.resumeWithException(state.exception)
            else
                cont.resume(state)
            })
    }
    

     这个方法中的cont就是调用await()时传入的外部协程的对象。
    disposeOnCompletion()方法会调用invokeOnCompletion()方法返回的DisposableHandle对象的dispose()方法,去等待job中的内容执行完成。但如果job中的代码在invokeOnCompletion()方法返回之前就已经执行完,就会返回一个NonDisposableHandle对象表示不需要再等待了。
     然后执行闭包中的代码,去根据job内的代码是否发生了异常去返回对应的结果,这个结果就是state
     最终,又由外部协程cont调用了父类的resume()方法或者resumeWithException()方法(出异常时)。

    协程的恢复

     最终,与协程的启动流程中提及的一样,Continationresume()方法会调用suspend闭包转换成的类的doResume()方法。

    override fun resume(value: Any?) {
        processBareContinuationResume(completion!!) {
            doResume(value, null)
        }
    }
    

     而这里的参数value,就是协程在恢复时传入的,内部协程执行后的结果。
    这时,看前面提及的状态机中的label1的代码:

      StringBuilder stringBuilder = (StringBuilder) this.L$1;
                job = (Deferred) this.L$0;
                if (th == null) {
                    append = stringBuilder;
                    await = obj;
                    break;
                }
                throw th;
    

     至此,很清晰了,就是恢复之前挂起时保存起来的一系列变量的值,最后的if语句中的obj,就是前面子协程运行后的结果传递到resume的参数中的value

    相关文章

      网友评论

          本文标题:Kotlin协程

          本文链接:https://www.haomeiwen.com/subject/cdxbiftx.html