美文网首页
kotlin 协程的启动与取消

kotlin 协程的启动与取消

作者: Bfmall | 来源:发表于2023-08-13 11:54 被阅读0次

    一、启动构建器

    launch与async构建器都用来启动新协程:

    1、launch,返回一个Job,并且不附带任何结果值
    2、async,返回一个Deferred,Deferred也是一个Job,可以使用.await()在一个延期的值上得到它的最终结果。
    

    等待一个作业:

    1、线程切换
    
        launch(Dispatchers.Default) {
            println("1")
            withContext(Dispatchers.IO) {
                delay(1000)
                println("2")
            }
            println("3")
        }
    
    withContext 实现`异步线程同步化`,打印顺序是:1 2 3
    
    2、join
    
        val job = launch {
            delay(1000)
            println("1")
        }
        job.join() // 等待job执行完毕
        launch {
            println("2")
        }
    
    使用 join 函数实现等待效果。 以上代码的执行顺序是:1 2
    
    3、await
    
        async {
            delay(1000)
            println("1")
        }.await()// 等待job执行完毕
        launch {
            println("2")
        }
    
      使用 await 函数实现等待效果。 以上代码的执行顺序是:1 2
    
    4、async 组合并发
    
    已知有两个耗时任务:
    
        suspend fun doOne(): Int {
            delay(1000)
            return 1
        }
        
        suspend fun doTwo(): Int {
            delay(1000)
            return 2
        }
    
    现在开始执行这两个任务,计算两个挂起函数的返回值之和:
    
        val time = measureTimeMillis {
            val one = doOne()
            val two = doTwo()
            println("两数之和:" + (one + two))
        }
        println("两个任务耗时:$time")
    
    以上程序的执行是顺序的,先执行 doOne,再执行 doTwo,总耗时为:2037毫秒。
    
    这种做法往往不太可取,因为太过耗时,往往不推荐。
    
    另一种方法是使用 async 实现:
    
        val time = measureTimeMillis {
            val one = async {
                doOne()
            }.await()
            val two = async {
                doTwo()
            }.await()
            println("两数之和:" + (one + two))
        }
        println("两个任务耗时:$time")
    
    但是,两个任务仍然是顺序执行的,耗时和前者差不多。
    
    为了解决两个任务不必要的耗时问题,推荐使用 async 组合并发:
    
        val time = measureTimeMillis {
            val job1 = async {
                doOne()
            }
            val job2 = async {
                doTwo()
            }
            val one = job1.await()
            val two = job2.await()
            println("两数之和:" + (one + two))
        }
        println("两个任务耗时:$time")
    
    最终,耗时为 1068 毫秒。
    

    二、启动模式

    CoroutineStart.DEFAULT:协程创建后,立即开始调度,在调度前如果协程被取消,其将直接进入取消响应的状态;
    CoroutineStart.ATOMIC(原子):协程创建后,立即开始调度,协程执行到第一个挂起点之前不响应取消;
    CoroutineStart.LAZY:只有协程被需要时,包括主动调用协程的start、join或者await等函数时才会开始调度,如果调度前就被取消,那么该协程将直接进入异常结束状态。
    CoroutineStart.UNDIPATCHED:协程创建后立即在当前函数调用栈中执行,直到遇到遇到一个真正被挂起的点。
    
    取消的时机:
    
    DEFAULT:调用cacel后,直接取消调度
    ATOMIC:调用cacel后,当协程执行到第一个挂起函数时才会取消调度,挂起函数一般是耗时操作,
        第一个挂起函数之前是必须执行的代码,如果存在此场景,则使用 ATOMIC 启动模式
    LAZY:当协程被使用的时候才开始调度,可以在调度前取消协程,也可以在调度过程中取消协程
    UNDIPATCHED:不分发,即使指定了调度器,也会默认在当前函数的调用栈执行,而且是立即执行。
    
    private fun test() = runBlocking {
        launch (context = Dispatchers.IO, start = CoroutineStart.UNDISPATCHED) {
            println(Thread.currentThread().name)
        }
    }
    
    runBlocking 在主线程中执行,在协程中,即使指定了 Dispatchers.IO,依然在主线程中执行。
    

    三、作用域构建器

    runBlocking 和 coroutineScope:
    
    1、runBlocking 是常规函数,而 coroutineScope 是挂起函数
    2、它们都会等待其协程体以及所有子协程结束,主要区别在于 runBlocking 方法会阻塞当前线程来等待,
        而 coroutineScope 只是挂起,会释放底层线程用于其他用途
    
    coroutineScope 和 supervisorScope:
    1、coroutineScope 一个协程失败了,所有其它兄弟协程也会被取消
    2、supervisorScope 一个协程失败了,不会影响其它兄弟协程
    
    使用场景:
    
    runBlocking:只可用于调试
    coroutineScope:等待其协程体以及所有子协程结束,并且一个协程取消,其它兄弟协程全部取消的场景
    

    四、Job的生命周期

    协程的生命周期是由Job对象来管理的,job对象获取生命周期的方法有:

            job.isActive // 是否激活
            job.isCancelled // 是否取消
            job.isCompleted // 是否完成
    

    一个协程包括的状态是:新建(New)、激活(Active)、完成中(Completing)、已完成(Completed)、取消中(Cacelling)、已取消(Cacelled)。

    五、协程的取消

    【1】取消作用域会取消它的子协程。

        // 定义一个协程作用域
        val scope = CoroutineScope(Dispatchers.Default)
        scope.launch {
            delay(1000)
            println(1)
        }
        scope.launch {
            delay(1000)
            println(2)
        }
        scope.cancel()
    

    此时,两个协程全部被取消。

    【2】被取消的子协程并不会影响其余兄弟协程

        // 定义一个协程作用域
        val scope = CoroutineScope(Dispatchers.Default)
        val job1 = scope.launch {
            delay(1000)
            println(1)
        }
        val job2 = scope.launch {
            delay(1000)
            println(2)
        }
        job1.cancel()
    

    job1被取消,job2 没有被取消。

    【3】协程通过抛出一个特殊的异常 CancellationException 来处理取消操作

        // 定义一个协程作用域
        val job = launch {
            delay(1000)
            println(1)
        }
        delay(100) // 延迟100毫秒,防止还没指定到delay就被取消
        job.cancel()
        job.join()
    
    delay 是一个挂起函数,当它被取消时,会报 CancellationException 异常,此时协程会直接被取消,我们可以利用 try...catch来捕获次异常:
    
        val job = launch {
            try {
                delay(1000) // 一个挂起函数,处理耗时任务
                println(1)
            } catch (e: Exception) {
                e.printStackTrace()
            }
        }
        delay(100) // 延迟100毫秒,防止还没指定到delay就被取消
        job.cancel()
        job.join()
    }
    

    此时,打印了异常:

    kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@1b68ddbd
    

    并且,协程被取消。

    六、CPU密集型任务取消

        val job = launch(Dispatchers.Default) {
            var count = 0
            while (count < 1000000000) {
                count += 1
                if (count % 1000 == 0) {
                    println(count)
                }
            }
        }
        delay(10)
        job.cancelAndJoin()
    

    以上代码使用了while循环,执行时CPU高度运作,当协程执行密集型任务时,协程无法被取消。

    这时,需要结合job的生命周期,修改后的代码如下:

        val job = launch(Dispatchers.Default) { // 注意:必须指定调度器,否则CPU密集型协程可能会取消失败
            var count = 0
            while (count < 1000000000 && isActive) {
                count += 1
                if (count % 1000 == 0) {
                    println(count)
                }
            }
        }
        delay(10)
        job.cancelAndJoin()
    

    仅仅在 while 中 添加了 isActive,此时是可以被取消的。

    还有一种方法是添加 ensureActive 实现:

        val job = launch(Dispatchers.Default) {  // 注意:必须指定调度器,否则CPU密集型协程可能会取消失败
            var count = 0
            while (count < 1000000000) {
                ensureActive()
                count += 1
                if (count % 1000 == 0) {
                    println(count)
                }
            }
        }
        delay(10)
        job.cancelAndJoin()
    

    使用 ensureActive 和 使用 isActive, 从效果上是差不多的。

    isActive: 是一个可以被使用在 CoroutineScope 中的扩展属性,检查 Job 是否处于活跃状态。

    ensureActive():如果Job处于非活跃状态,则抛出异常。

    yield 函数会检查所在协程的状态,如果已经取消,则抛出异常。次外,它还会尝试让出线程的执行权,给其他协程协程提供执行机会。

      val job = launch(Dispatchers.Default) { // 注意:必须指定调度器,否则CPU密集型协程可能会取消失败
          var count = 0
          while (count < 1000000000) {
              yield()
              count += 1
              if (count % 1000 == 0) {
                  println(count)
              }
          }
      }
      delay(10)
      job.cancelAndJoin()
    

    yield() 会让出线程执行权,其它线程执行完会继续执行。

    比较:

    1、isActive 和 ensureActive 可以让CPU密集型协程成功取消;
    2、yield 既可以让CPU密集型协程成功取消,也可以让出一部分执行权给其它任务处理(当程序非常密集的时候使用)
    

    七、协程取消的副作用

    协程被取消时,会抛出异常,导致下面代码无法执行到,下面的代码可能是必须执行的逻辑,比如释放资源。

    可以将必须要执行的代码放在 finally 中执行:

        val job = launch(Dispatchers.Default) {
            try {
                delay(1000)
            } catch (e: Exception) {
                e.printStackTrace()
            } finally {
                // 释放资源
                println("release")
            }
        }
        delay(10)
        job.cancelAndJoin()
    

    如果是文件操作,需要在 finally 中关闭资源:

        val readBuffer = BufferedReader(FileReader("D:\\xx.text"))
        readBuffer.apply {
            try {
                var line: String? = null
                while (true) {
                    line = readLine() ?: break
                    println(line)
                }
            } catch (e: Exception) {
                e.printStackTrace()
            } finally {
                close()
            }
        }
    

    使用标准库中的 use 函数,可以简化代码:

        val readBuffer = BufferedReader(FileReader("D:\\xx.text"))
        readBuffer.use {
            var line: String? = null
            while (true) {
                line = readLine() ?: break
                println(line)
            }
        }
    

    use 函数中已经封装了try...catch,以及释放资源。

    八、不能被取消的任务

        val job = launch(Dispatchers.Default) {
            withContext(NonCancellable) {
                delay(1000)
                println("1111111")
            }
            delay(1000)
            println("2222222222")
        }
        delay(10)
        job.cancelAndJoin()
    

    使用 withContext(NonCancellable) ,任务可以不被取消。

    九、超时任务

        val result = withTimeoutOrNull(1000) { // 1秒超时任务
            repeat(1000) {
                println(1111)
                delay(10)
            }
            "Success"
        } ?: "Failed"
        println(result)
    

    使用 withTimeoutOrNull 实现超时任务。

    作者:NoBugException
    链接:https://www.jianshu.com/p/83148dc6b168
    来源:简书
    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

    相关文章

      网友评论

          本文标题:kotlin 协程的启动与取消

          本文链接:https://www.haomeiwen.com/subject/kzbfmdtx.html