美文网首页
Kotlin 协程

Kotlin 协程

作者: 旺仔_100 | 来源:发表于2022-05-07 19:14 被阅读0次

    一、关于协程

    协程是Kotlin中最重要最难学的一块!

    为什么协程如此重要?

    协程是 Kotlin 对比 Java 的最大优势。Java 也在计划着实现自己的协程:Loom,不过这个毕竟还处于相当初级的阶段。而 Kotlin 的协程,可以帮我们极大地简化异步、并发编程、优化软件架构。通过协程,我们不仅可以提高开发效率,还能提高代码的可读性,由此也就可以降低代码出错的概率。

    要记住协程的几个 API 很容易,困难的是形成一套完整的协程知识体系。其实,学习协程,相当于一次编程思维的升级。协程思维,它与我们常见的线程思维迥然不同,当我们能够用协程的思维来分析问题以后,线程当中某些棘手的问题在协程面前都会变成小菜一碟。因此,我们相当于多了一种解决问题的手段。

    其实,如果要用简单的语言来描述协程的话,我们可以将其称为:“互相协作的程序”。

     ///先了解下携程与普通程序执行的区别
    fun  main() = runBlocking {
            val sequence = getSequence()
            printSequence(sequence)
        }
    
        fun getSequence() = sequence {
            println("Add 1")
            yield(1)
            println("Add 2")
            yield(2)
            println("Add 3")
            yield(3)
            println("Add 4")
            yield(4)
        }
    
        fun printSequence(sequence: Sequence<Int>) {
            val iterator = sequence.iterator()
            val i = iterator.next()
            println("Get$i")
            val j = iterator.next()
            println("Get$j")
            val k = iterator.next()
            println("Get$k")
            val m = iterator.next()
            println("Get$m")
        }
    
    输出结果如下:
    I/System.out: Add 1
    I/System.out: Get1
    I/System.out: Add 2
    I/System.out: Get2
    I/System.out: Add 3
    I/System.out: Get3
    I/System.out: Add 4
    I/System.out: Get4
    
    协程执行顺序图.png

    协程与普通程序的区别:

    • 普通程序在被调用以后,只会在末尾的地方返回,并且只会返回一次,而协程则不受此限制,协程的代码可以在任意 yield 的地方挂起(Suspend)让出执行权,然后等到合适的时机再恢复(Resume)。在这个情况下,yield 是代表了“让步”的意思。
    • 普通程序需要一次性收集完所有的值,然后统一返回;而协程则可以每次只返回(yield)一个值,比如我们前面写的 getSequence() 方法。在这个情况下,yield 既有“让步”的意思,也有“产出”的意思。它不仅能让出执行权,还同时产生一个值,比如前面的 yield(1),就代表产出的值为 1。

    除了 yield 以外,我们也可以借助 Kotlin 协程当中的 Channel 来实现类似的代码模式:

    
    // 看不懂代码没关系,目前咱们只需要关心代码的执行结果
    fun main() = runBlocking {
        val channel = getProducer(this)
        testConsumer(channel)
    }
    
    fun getProducer(scope: CoroutineScope) = scope.produce {
        println("Send:1")
        send(1)
        println("Send:2")
        send(2)
        println("Send:3")
        send(3)
        println("Send:4")
        send(4)
    }
    
    suspend fun testConsumer(channel: ReceiveChannel<Int>) {
        delay(100)
        val i = channel.receive()
        println("Receive$i")
        delay(100)
        val j = channel.receive()
        println("Receive$j")
        delay(100)
        val k = channel.receive()
        println("Receive$k")
        delay(100)
        val m = channel.receive()
        println("Receive$m")
    }
    
    执行结果:
    I/System.out: Send:1
    I/System.out: Receive1
    I/System.out: Send:2
    I/System.out: Receive2
    I/System.out: Send:3
    I/System.out: Receive3
    I/System.out: Send:4
    I/System.out: Receive4
    

    可见,以上代码中的 getProducer() 和 testConsumer() 之间,它们也是交替执行的。

    如何理解 Kotlin 的协程?

    在 Kotlin 当中,协程是一个独立的框架。跟 Kotlin 的反射库类似,协程并不是直接集成在标准库当中的。如果我们想要使用 Kotlin 的协程,就必须手动进行依赖:

    
    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.0'
    

    业界一直有两种说法:
    一种是Kotlin 协程其实就是一个封装的线程框架。如果我们站在框架的层面来看的话,这种说法也有一定道理:协程框架将线程池进一步封装,对开发者暴露出统一的协程 API。(扔物线)
    另一种是:
    从包含关系上看,协程跟线程的关系,有点像线程与进程的关系,毕竟协程不可能脱离线程运行。所以,协程可以理解为运行在线程当中的、更加轻量的 Task。(朱涛)

    协程的轻量级
    
    // 直接使用线程
    fun main() {
        repeat(1000_000_000) {
            thread {
                Thread.sleep(1000000)
            }
        }
    
        Thread.sleep(10000L)
    }
    
    /*
    输出结果:
    2022-04-24 17:38:15.736 17290-24330/com.example.conroutinesdemo A/conroutinesdem: thread.cc:4192] Unable to create protected region in stack for implicit overflow check. Reason: Out of memory size:  4096
    */
    
    //使用协程
    fun main() =
        runBlocking {
            repeat(10_000_000){
                launch { delay(10000) }
            }
            delay(10000L)
        }
    

    上面这个例子是官方的demo,为了说明协程性能更好,实际上是有漏洞的。上面代码是开了十亿个线程。下面的代码实际上只有一个线程池。所以修改为线程池做对比更好,如下:

    fun  main() {
        val executor = Executors.newSingleThreadExecutor();
        val task = java.lang.Runnable {
            Thread.sleep(1000)
            print(".")
    
        }
        repeat(10_000_000){
            executor.execute(task)
        }
    }
    

    还有就是sleep和delay也是有些区别的,实际上我们要做对比应该是使用线程池newSingleThreadExecutor来做对比,这两个实际上是一样的。如下:

    fun main(){
        val executor = Executors.newSingleThreadScheduledExecutor()
        val task = java.lang.Runnable {
            print(".")
        }
        repeat(10_000_000){
            executor.schedule(task,1,TimeUnit.SECONDS)
        }
    }
    
    

    这样对比之后,性能其实差不多。

    如何理解协程的非阻塞?

    首选聊一下线程阻塞是什么?简单来说,Android主线程会进行轮询执行任务,如果做一些耗时任务就会导致主线程的阻塞,如何不阻塞,那么就是开线程执行任务。协程实际上也是通过线程来不阻塞主线程,只是写法上看不出来切了线程,看起来就像是非阻塞的。这也是协程的特点,使用同步代码来写异步。其原因就是协程可以自动切走和切回线程,这个过程也叫挂起。

    协程的使用

    如何启动一个协程

    一、使用CoroutineScope.launch()
    ///通过launch启动协程
    fun  main(){
        //实际开发不用使用GlobalScope 这里只是讲解基础
        GlobalScope.launch {
            println("协程开始")
            delay(1000)
            println("hello world")
        }
    
        println("按照同步的思维,这应该在协程之后")
        //这里休眠两分钟是因为主线程销毁了,协程也不会执行了
        Thread.sleep(2000)
        println("主程序停止")
    }
    
    

    看下launch的源码

    public fun CoroutineScope.launch(
        context: CoroutineContext = EmptyCoroutineContext,
        start: CoroutineStart = CoroutineStart.DEFAULT,
        block: suspend CoroutineScope.() -> Unit
    ): Job {
     ....
    }
    

    首先是 CoroutineScope.launch(),代表了 launch 其实是一个扩展函数,而它的“扩展接收者类型”是 CoroutineScope。这就意味着,我们的 launch() 会等价于 CoroutineScope 的成员方法。而如果我们要调用 launch() 来启动协程,就必须要先拿到 CoroutineScope 的对象。前面的案例,我们使用的 GlobalScope,其实就是 Kotlin 官方为我们提供的一个 CoroutineScope 对象,方便我们开发者直接启动协程。

    接着是第一个参数:CoroutineContext,它代表了我们协程的上下文,它的默认值是 EmptyCoroutineContext,如果我们不传这个参数,默认就会使用 EmptyCoroutineContext。一般来说,我们也可以传入 Kotlin 官方为我们提供的 Dispatchers,来指定协程运行的线程池。

    然后是第二个参数:CoroutineStart,它代表了协程的启动模式。如果我们不传这个参数,它会默认使用 CoroutineStart.DEFAULT。CoroutineStart 其实是一个枚举类,一共有:DEFAULT、LAZY、ATOMIC、UNDISPATCHED。我们最常使用的就是 DEFAULT、LAZY,它们分别代表:立即执行、懒加载执行。

    第三个参数就是 需要一个无参数,无返回值的挂起函数。

    二、runBlocking 启动协程
    fun  main(){
        runBlocking {
            println("协程开始")
            delay(1000)
            println("hello world")
        }
    
        println("按照同步的思维,这应该在协程之后")
        //    Thread.sleep(2000)
        println("主程序停止")
    }
    
    协程开始
    hello world
    按照同步的思维,这应该在协程之后
    主程序停止
    
    
    
    fun main() {
        runBlocking {
            println("First:${Thread.currentThread().name}")
            delay(1000L)
            println("Hello First!")
        }
        runBlocking {
            println("Second:${Thread.currentThread().name}")
            delay(1000L)
            println("Hello Second!")
        }
        runBlocking {
            println("Third:${Thread.currentThread().name}")
            delay(1000L)
            println("Hello Third!")
        }   
    
       // 删掉了 Thread.sleep    println("Process end!")
    }
    
    First:main
    Hello First!
    Second:main
    Hello Second!
    Third:main
    Hello Third!
    

    从结果就可以看出来,runBlocking是会阻塞主线程的,协程也会阻塞。这种方式一般用来做测试的,代码中尽量少用。

    三、async 启动协程

    async在dart中也有这么一种方式。它能通过返回的句柄拿到协程执行的结果。

    ///需要在as的VM options中配置-Dkotlinx.coroutines.debug才能看到
    fun main() = runBlocking{
        println("In runBlocking : ${Thread.currentThread().name}")
        val deferred: Deferred<String> = async {
            println("In async: ${Thread.currentThread().name}")
            delay(1000)
            return@async "任务完成";
        }
        println("after async : ${Thread.currentThread().name}")
    
        println("${deferred.await()}")
    }
    
    
    In runBlocking : main @coroutine#1
    after async : main @coroutine#1
    In async: main @coroutine#2
    任务完成
    

    async 的协程和runBlocking并不在一个上面,runBlocking本来是阻塞的,但是async的协程在打印语句之后才执行。它是在deferred.await()之后才开始执行的。然后async 可以拿到返回值,这也是async 和launch的区别。

    kotlin 挂起函数的核心
    • 挂起函数可以极大的简化异步编程,让我们以同步的方式写异步。
    • 要定义一个挂起函数,我们只要在普通的函数上面增加一个suspend关键字。
    • 挂起函数拥有挂起和恢复的能力,对于同一行代码来说,=左边和右边的代码在不同的线程上,这些都是由- kotlin编译器在做。
    • 挂起函数的本质是Callback,只是kotlin底层用了一个高大上的名字叫Contiunation.Kotlin编译器把Suspend 变成Continuation的过程叫做CPS。
    • 挂起函数只能在挂起函数中调用,或者是在协程中调用。
    协程的生命周期

    Job是协程的句柄,当我们用launch或async创建协程的时候,会同时创建一个Job并返回。我们通过job来理解协程的生命周期和并发。

    查看launch返回值

    public fun CoroutineScope.launch(
        context: CoroutineContext = EmptyCoroutineContext,
        start: CoroutineStart = CoroutineStart.DEFAULT,
        block: suspend CoroutineScope.() -> Unit
    ): Job {
        val newContext = newCoroutineContext(context)
        val coroutine = if (start.isLazy)
            LazyStandaloneCoroutine(newContext, block) else
            StandaloneCoroutine(newContext, active = true)
        coroutine.start(start, coroutine, block)
        return coroutine
    }
    
    

    查看async的返回值

    public fun <T> CoroutineScope.async(
        context: CoroutineContext = EmptyCoroutineContext,
        start: CoroutineStart = CoroutineStart.DEFAULT,
        block: suspend CoroutineScope.() -> T
    ): Deferred<T> {
        val newContext = newCoroutineContext(context)
        val coroutine = if (start.isLazy)
            LazyDeferredCoroutine(newContext, block) else
            DeferredCoroutine<T>(newContext, active = true)
        coroutine.start(start, coroutine, block)
        return coroutine
    }
    
    
    public interface Deferred<out T> : Job {
    

    通过Job我们能干什么?

    • 监控协程的生命状态
    • 使用Job来操控协程
    ///协程的生命周期
    fun main() = runBlocking{
        val job = launch {
            delay(1000)
        }
        job.log()
        job.cancel()
        job.log()
    }
    
    fun  Job.log(){
        logX("""
           isActive = $isActive
            isCancelled = $isCancelled
            isCompleted = $isCompleted
        """.trimIndent())
    }
    fun logX(any: Any?){
        println("""
            ============================================================
            $any
            Thread:${Thread.currentThread().name}
            ============================================================
        """.trimIndent())
    }
    

    isActive = true 表示协程处于获取阶段
    调用job.cancel()以后,协程任务就取消了,isCancel = true 表示协程任务处于取消状态。job.log其实就是对协程的监控,不过是被动的监控。cancel就是对协程的操作。
    除了cancel操作,还可以job.start,它一般和CoroutineStart.LAZY一起使用。


    协程的生命周期图.png
    协程的监听
    ///协程的监听
    - job.invokeOnCompletion 通过这个api我们可以主动监听协程完成
    - job.join 是一个“挂起函数”,它的作用就是:挂起当前的程序执行流程,等待 job 当中的协程任务执行完毕,然后再恢复当前的程序执行流程.它和await的功能是类似的。
    fun main() = runBlocking {
        suspend fun download() {
            //模拟下载任务
            val  time = (Random.nextDouble() * 1000).toLong()
            logX("Delay time = $time")
            delay(time)
        }
    
        val job = launch(start = CoroutineStart.LAZY) {
            logX("Coroutine start!")
            download()
            logX("Coroutine end!")
        }
        delay(500)
        job.log()
        job.start()
        job.log()
        job.invokeOnCompletion {
            //协程执行完成调用这里代码
            job.log()
        }
        //等待协程完毕执行
        job.join()
        logX("Process end")
    }
    
    

    Job的api

    
    public interface Job : CoroutineContext.Element {
    
        // 省略部分代码
    
        // ------------ 状态查询API ------------
    
        public val isActive: Boolean
    
        public val isCompleted: Boolean
    
        public val isCancelled: Boolean
    
        public fun getCancellationException(): CancellationException
    
        // ------------ 操控状态API ------------
    
        public fun start(): Boolean
    
        public fun cancel(cause: CancellationException? = null)
    
        public fun cancel(): Unit = cancel(null)
    
        public fun cancel(cause: Throwable? = null): Boolean
    
        // ------------ 等待状态API ------------
    
        public suspend fun join()
    
        public val onJoin: SelectClause0
    
        // ------------ 完成状态回调API ------------
    
        public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
    
        public fun invokeOnCompletion(
            onCancelling: Boolean = false,
            invokeImmediately: Boolean = true,
            handler: CompletionHandler): DisposableHandle
    
    }
    
    Job与结构化并发

    结构化并发是kotlin协程的第二大优势。简单来说就是带有结构和层级的并发

    
    fun  main() = runBlocking{
        val parentJob : Job
        var job1 : Job? = null
        var job2 : Job? = null
        var job3 : Job? = null
    
        parentJob = launch {
            job1 = launch {
                delay(1000)
            }
            job2 = launch {
                delay(3000)
            }
            job3 = launch {
                delay(5000)
            }
        }
    
        delay(500)
        parentJob.children.forEachIndexed{
            index, job ->
            when (index){
                0 -> println("job1 === job is ${job1 === job}")
                1 -> println("job2 === job is ${job2 === job}")
                2 -> println("job3 === job is ${job3 === job}")
            }
        }
    
        parentJob.join()
        logX("Process end")
    
    }
    
    job1 === job is true
    job2 === job is true
    job3 === job is true
    ============================================================
    Process end
    Thread:main @coroutine#1
    ============================================================
    
    

    上面的结果说明嵌套的协程,是属于父子关系, parentJob.join()会等待子job都执行完,才会恢复挂起。

    
    fun  main() = runBlocking {
        val parentJob: Job
        var job1 : Job? = null
        var job2 : Job? = null
        var job3 : Job? = null
    
        parentJob = launch {
            job1 = launch {
                logX("Job1 start!")
                delay(1000)
                logX("Job1 done") //不会走
            }
    
            job2 = launch {
                logX("Job2 start!")
                delay(1000)
                logX("Job2 done")//不会走
            }
    
            job3 = launch {
                logX("Job3 start!")
                delay(1000)
                logX("Job3 done")//不会走
            }
        }
        delay(500)
        parentJob.children.forEachIndexed{
            index, job ->
            when (index){
                0 -> println("job1 === job is ${job1 === job}")
                1 -> println("job2 === job is ${job2 === job}")
                2 -> println("job3 === job is ${job3 === job}")
            }
        }
    
        parentJob.cancel();
        logX("Process end!")
    }
    
    ============================================================
    Job1 start!
    Thread:main @coroutine#3
    ============================================================
    ============================================================
    Job2 start!
    Thread:main @coroutine#4
    ============================================================
    ============================================================
    Job3 start!
    Thread:main @coroutine#5
    ============================================================
    job1 === job is true
    job2 === job is true
    job3 === job is true
    ============================================================
    Process end!
    Thread:main @coroutine#1
    ============================================================
    
    

    通过上面的运行结果,可以看出来,实际上parentJob.cancel是会取消掉子job的。

    最后,来一个简单的实战优化。

    fun  main() = runBlocking {
        suspend fun  getResult1() : String {
            delay(1000)//模式耗时操作
            return "Result1"
        }
    
        suspend fun  getResult2() : String {
            delay(1000)//模式耗时操作
            return "Result2"
        }
    
        suspend fun  getResult3() : String {
            delay(1000)//模式耗时操作
            return "Result3"
        }
    
        val results = mutableListOf<String>()
    
        val time = measureTimeMillis {
            results.add(getResult1())
            results.add(getResult2())
            results.add(getResult3())
        }
    
        println("$time")
        println(results)
    
    }
    

    通过结果可以看出来,这个是同步在执行。我们通过协程可以修改成异步的。

    fun main() = runBlocking {
        suspend fun getResult1(): String {
            delay(1000)//模式耗时操作
            return "Result1"
        }
    
        suspend fun getResult2(): String {
            delay(1000)//模式耗时操作
            return "Result2"
        }
    
        suspend fun getResult3(): String {
            delay(1000)//模式耗时操作
            return "Result3"
        }
    
    
        val results: List<String>
        val time = measureTimeMillis {
    
            var result1 = async { getResult1() }
            var result2 = async { getResult2() }
            var result3 = async { getResult3() }
            results = listOf(result1.await(), result2.await(), result3.await())
        }
        println("$time")
        println("$results")
    }
    
    1037
    [Result1, Result2, Result3]
    
    CoroutineContext

    万物皆context,学习下kotin的context,CoroutineContext。它的最主要的用处是切换线程池。

    fun main() = runBlocking {
         val user = getUserInfo()
        logX(user)
    }
    
    suspend fun getUserInfo():String{
        logX("Before IO Context")
        withContext(Dispatchers.IO){
            logX("In IO Context")
            delay(1000)
        }
        logX("After IO Context")
        return  "BoyCoder"
    }
    
    
    ============================================================
    Before IO Context
    Thread:main @coroutine#1
    ============================================================
    ============================================================
    In IO Context
    Thread:DefaultDispatcher-worker-1 @coroutine#1
    ============================================================
    ============================================================
    After IO Context
    Thread:main @coroutine#1
    ============================================================
    ============================================================
    BoyCoder
    Thread:main @coroutine#1
    ============================================================
    
    

    通过上面的结果,我们可以发现withContext可以切换到自定的线程池工作,然后后面的代码会自动切回之前的线程。

    讲一下kotlin内置的几个Dispatcher

    • **Dispatchers.Main ** 它只在UI编程平台才有意义,在Android、Swing之类的平台上,一般只有Main线程才能绘制UI。
    • Dispatchers.Unconfined 代表无所谓,当前协程可以运行在任意线程之上。
    • Dispatcher.Default 它代表CPU密集型任务的线程池。一般来说,它内部线程个数跟CPU核心数量保持一致,最小限制是2.
    • Dispatcher.IO 它代表IO密集型任务的线程池。它内部的线程数量一般比较多,比如64.
    CoroutineScope

    在学习launch的时候,我们实际上是有协程作用域的,也就是CoroutineScope。

    public interface CoroutineScope {
        /**
         * The context of this scope.
         * Context is encapsulated by the scope and used for implementation of coroutine builders that are extensions on the scope.
         * Accessing this property in general code is not recommended for any purposes except accessing the [Job] instance for advanced usages.
         *
         * By convention, should contain an instance of a [job][Job] to enforce structured concurrency.
         */
        public val coroutineContext: CoroutineContext
    }
    

    它就一个成员变量CoroutineContext,它是对CoroutineContext的一层封装,主要是用来做批量控制携程。

    fun main()= runBlocking {
        val scope = CoroutineScope(Job())
        scope.launch {
    
            logX("First start")
            delay(1000)
            logX("First end") //不会执行
        }
    
        scope.launch {
            logX("Second start")
            delay(1000)
            logX("Second end")//不会执行
        }
    
        scope.launch {
            logX("Third start")
            delay(1000)
            logX("Third end")//不会执行
        }
    
        delay(500)
        scope.cancel()
        delay(1000)
    }
    
    ============================================================
    Second start
    Thread:DefaultDispatcher-worker-2 @coroutine#3
    ============================================================
    ============================================================
    First start
    Thread:DefaultDispatcher-worker-1 @coroutine#2
    ============================================================
    ============================================================
    Third start
    Thread:DefaultDispatcher-worker-3 @coroutine#4
    ============================================================
    
    Process finished with exit code 0
    
    

    从上面的结果发现: scope.cancel()会直接把三个协程都取消。

    相关文章

      网友评论

          本文标题:Kotlin 协程

          本文链接:https://www.haomeiwen.com/subject/aimtyrtx.html