美文网首页
kotlin--协程的启动和取消

kotlin--协程的启动和取消

作者: aruba | 来源:发表于2021-08-30 14:28 被阅读0次

    一、协程的启动

    1.launch与async构建器都用来启动新协程

    launch:我们之前已经使用过了GlobalScope的launch来启动协程,它返回一个Job
    async:返回一个Deferred,它也是一个Job,但是可以使用await函数获得运行的结果
    除了之前结构化并发中介绍的几种指定CoroutineScope的API外,我们还可以使用runBlocking函数来指定CoroutineScope,他会使用主线程来转换成协程
    launch和async内如果有子协程,那么该协程会等待子协程执行结束

    fun `test coroutine build`() = runBlocking {
        val job1 = launch {
            delay(200)
            println("job1 finished")
        }
    
        val job2 = async {
            delay(200)
            println("job2 finished")
            "job2 result"
        }
    
        println(job2.await())
    }
    
    
    fun main() {
        `test coroutine build`()
    }
    

    runBlocking 作用域内就是main函数的主线程,会等到所有子协程运行完毕后才结束

    2.join

    Job的join函数,会让后面的协程等待当前的协程执行完毕后再执行

    fun `test join build`() = runBlocking {
        val job1 = launch {
            delay(200)
            println("job1 finished :${System.currentTimeMillis()}")
            delay(2000)
        }
        job1.join()
        println("job1 finished")
    
        val job2 = launch {
            delay(200)
            println("job2 finished:${System.currentTimeMillis()}")
        }
    
        val job3 = launch {
            delay(200)
            println("job2 finished:${System.currentTimeMillis()}")
        }
    
    }
    
    
    fun main() {
        `test join build`()
    }
    
    3.Deferred的await函数,也能达到同样的效果
    fun `test async build`() = runBlocking {
        val job1 = async {
            delay(200)
            println("job1 finished :${System.currentTimeMillis()}")
            delay(2000)
        }
        job1.await()
        println("job1 finished")
    
        val job2 = async {
            delay(200)
            println("job2 finished:${System.currentTimeMillis()}")
        }
    
        val job3 = async {
            delay(200)
            println("job2 finished:${System.currentTimeMillis()}")
        }
    }
    
    fun main() {
        `test async build`()
    }
    
    4.async组合并发

    首先来看下下面代码:

    suspend fun doOne(): Int {
        delay(1000)
    
        return 10
    }
    
    suspend fun doTwo(): Int {
        delay(1000)
    
        return 20
    }
    
    fun `test sync`() = runBlocking {
        val time = measureTimeMillis {//统计时间函数
            val doOne = doOne()
            val doTwo = doTwo()
    
            println("sum : ${doOne + doTwo}")
        }
    
        println("time : $time")
    }
    

    结果:
    sum : 30
    time : 2032

    doOne执行完才会执行doTwo,如果想要它们可以同时执行,又能获取返回结果,可以使用async:

    fun `test combine async`() = runBlocking {
        val time = measureTimeMillis {
            //统计时间函数
            val doOne = async { doOne() }
            val doTwo = async { doTwo() }
    
            println("sum : ${doOne.await() + doTwo.await()}")
        }
    
        println("time : $time")
    }
    
    fun main() {
        `test combine async`()
    }
    

    结果:
    sum : 30
    time : 1085

    5.协程启动模式

    kotlin的协程有4中启动模式
    1.DEFAULT:协程创建后,立即开始调度,在调度前如果被取消,直接进入取消响应状态
    2.ATOMIC:协程创建后,立即开始调度,协程执行到第一个挂起点之前不响应取消
    3.LAZY:只有协程被需要时,包括主动调用协程的start、join或await等函数时才会开始调度,如果调度前被取消,那么该协程将直接进入异常结束状态
    4.UNDISPATCHED:协程创建后立即在当前函数调用栈中执行,直到遇到第一个真正挂起的点

    1-3是需要调度器的,还记得我们前面学习过的调度器么,如果调度器中线程池满了,那么不会马上执行,4则不依赖调度器

    1.DEFAULT:取消后,将不会再执行

    fun `test start mode`() = runBlocking {
        val job = launch(start = CoroutineStart.DEFAULT) {
            delay(10000)
            println("job finished")
        }
    
        delay(1000)
        job.cancel()
        println("finish")
    }
    

    结果:
    finish

    2.ATOMIC:取消了,会先执行第一个挂起函数之前的代码

    fun `test start mode`() = runBlocking {
        val job = launch(start = CoroutineStart.ATOMIC) {
            println("第一个挂起函数之前")
    
            //delay是一个挂起函数
            delay(10000)
            println("job finished")
        }
    
        delay(1000)
        job.cancel()
        println("finish")
    }
    

    结果:
    第一个挂起函数之前
    finish

    3.LAZY:可以先定义好,然后在先要执行的地方调用相应函数执行

    fun `test start mode`() = runBlocking {
        val job = launch(start = CoroutineStart.LAZY) {
            println("第一个挂起函数之前")
    
            //delay是一个挂起函数
            delay(10000)
            println("job finished")
        }
    
        delay(1000)
    //    job.cancel()
        job.start()
        println("finish")
    }
    

    4.UNDISPTATCHED:挂起函数之前为主线程

    fun `test start mode`() = runBlocking {
        val job1 = async(context = Dispatchers.IO, start = CoroutineStart.UNDISPATCHED) {
            println("Thread ${Thread.currentThread()}")
        }
    }
    

    结果:
    Thread Thread[main,5,main]

    6.协程的作用域构建器

    1.runBlocking与coroutineScope
    runBlocking是常规函数,它会阻塞主线程,而coroutineScope是挂起函数,它们都会等待其协程体和子协助执行结束,作用域构建器使用的是父协程的上下文,协程的上下文会在下一篇中重点介绍

    fun `test coroutineScope`() = runBlocking {
        coroutineScope {
            val job1 = launch {
                delay(2000)
                println("job finished")
            }
    
            val job2 = launch {
                delay(1000)
                println("job2 finished")
                throw IllegalArgumentException()
            }
        }
    }
    

    coroutineScope作用域中,如果一个子协程出错了,那么其所有子协程都会退出并结束

    2.coroutineScope与supervisorScope
    supervisorScope中,一个子协程出错了,不会影响其他子协程

    fun `test supervisorScope`() = runBlocking {
        supervisorScope {
            val job1 = launch {
                delay(2000)
                println("job finished")
            }
    
            val job2 = launch {
                delay(1000)
                println("job2 finished")
                throw IllegalArgumentException()
            }
        }
    
        println("finished")
    }
    

    结果:

    job2 finished
    Exception in thread "main" java.lang.IllegalArgumentException
        at com.aruba.mykotlinapplication.coroutine.RunCoroutineKt$test supervisorScope$1$1$job2$1.invokeSuspend(runCoroutine.kt:148)
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
        at kotlinx.coroutines.ResumeModeKt.resumeMode(ResumeMode.kt:67)
        at kotlinx.coroutines.DispatchedKt.resume(Dispatched.kt:309)
        at kotlinx.coroutines.DispatchedKt.dispatch(Dispatched.kt:298)
        at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:250)
        at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl(CancellableContinuationImpl.kt:260)
        at kotlinx.coroutines.CancellableContinuationImpl.resumeUndispatched(CancellableContinuationImpl.kt:332)
        at kotlinx.coroutines.EventLoopImplBase$DelayedResumeTask.run(EventLoop.kt:298)
        at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.kt:116)
        at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:80)
        at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:54)
        at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
        at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:36)
        at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
        at com.aruba.mykotlinapplication.coroutine.RunCoroutineKt.test supervisorScope(runCoroutine.kt:138)
        at com.aruba.mykotlinapplication.coroutine.RunCoroutineKt.main(runCoroutine.kt:156)
        at com.aruba.mykotlinapplication.coroutine.RunCoroutineKt.main(runCoroutine.kt)
    job finished
    finished
    
    Process finished with exit code 0
    
    7.Job对象

    对于每一个创建的协程(通过launch或async),会返回一个Job对象,它是协程的唯一标识,并且复制管理协程的生命周期
    一个任务可以包含一系列状态:新创建(New)、活跃(Active)、完成中(Completing)、已完成(Completed)、取消中(Canceling)和已取消(Canceled)。虽然无法直接访问这些状态,但我们可以通过访问Job的属性:isActive、isCanceled和isCompleted

    8.协程的生命周期

    如果协程处于活跃状态,协程运行出错或取消都会将该协程置为取消中状态(isActive=false,isCanceled=true)。当所有子协程都完成后,协程会进入已取消状态,此时isCompleted=true。


    二、协程取消

    1.协程的取消

    1.取消作用域会取消它的子协程,CoroutineScope是创建一个全新的协程上下文,和coroutineScope作用域是不同的,作用域构建器使用的是父协程的上下文

    fun `test cancel`() = runBlocking {
        //自定义一个作用域
        val scope = CoroutineScope(Dispatchers.Default)
    
        scope.launch {
            delay(1000)
            println("job finished")
        }
    
        scope.launch {
            delay(1000)
            println("job2 finished")
        }
    
        delay(100)
        scope.cancel()
    
        //主线程等待1200ms,因为runBlocking不会等自定义作用域执行完毕
        delay(1200)
        scope
    }
    

    2.被取消的子协程,不影响其他兄弟协程

    fun `test cancel`() = runBlocking {
        //自定义一个作用域
        val scope = CoroutineScope(Dispatchers.Default)
    
        val job = scope.launch {
            delay(1000)
            println("job finished")
        }
    
        val job2 =scope.launch {
            delay(1000)
            println("job2 finished")
        }
    
        delay(100)
        job.cancel()
    
        //主线程等待1200ms,因为runBlocking不会等自定义作用域执行完毕
        delay(1200)
        scope
    }
    

    结果:
    job2 finished

    3.协程是通过抛出一个特殊的异常:CancellationException 来处理取消操作

    fun `test cancellationException`() = runBlocking {
        val job = GlobalScope.launch {
            try {
                delay(1000)
                println("finished")
            } catch (e: Exception) {
                e.printStackTrace()
            }
        }
    
        delay(100)
        job.cancel()
    //    可以自定义
    //    job.cancel(CancellationException("取消了"))
        job.join()
    }
    
    fun main() {
        `test cancellationException`()
    }
    

    结果:
    kotlinx.coroutines.JobCancellationException: Job was cancelled; job=StandaloneCoroutine{Cancelling}@759ebb3d

    4.所有kotlinx.coroutines中的挂起函数都是可以取消的

    2.CPU密集型任务取消

    1.isActive是一个可以被使用在CorountineScope中的扩展属性,检查Job是否处于活跃状态

    fun `test cancel cpu`() = runBlocking {
        //指定别的调度器,不然会阻塞runBlocking
        val job = launch (Dispatchers.Default){
            var count = 0
            var time = System.currentTimeMillis()
    
            while (count < 10 && isActive) {//调用cancel后,isActive会变成false
                if (System.currentTimeMillis() >= time) {//500ms打印一次
                    println("sleep:${count++}")
                    time += 500
                }
            }
        }
        
        delay(1000)
        println("start cancel")
        job.cancelAndJoin()
        println("finished")
    }
    

    结果:
    sleep:0
    sleep:1
    sleep:2
    start cancel
    finished

    2.ensureActive函数,如果job处于非活跃状态,那么它会抛出异常

    fun `test cancel cpu ensure`() = runBlocking {
        //指定别的调度器,不然会阻塞runBlocking
        val job = launch(Dispatchers.Default) {
            var count = 0
            var time = System.currentTimeMillis()
    
            while (count < 10) {
                //调用ensureActive,非活跃会抛出取消的异常
                ensureActive()
                if (System.currentTimeMillis() >= time) {//500ms打印一次
                    println("sleep:${count++}")
                    time += 500
                }
            }
        }
    
        delay(1000)
        println("start cancel")
        job.cancelAndJoin()
        println("finished")
    }
    

    结果和1.相同

    3.yield函数,它会检查所在协程状态,如果已取消,则抛出取消的异常。此外,它还会尝试让出线程的执行权,给其他协程提供执行机会

    fun `test cancel cpu yield`() = runBlocking {
        //指定别的调度器,不然会阻塞runBlocking
        val job = launch(Dispatchers.Default) {
            var count = 0
            var time = System.currentTimeMillis()
    
            while (count < 10) {
                //调用yield,取消的协程会抛出取消的异常
                yield()
                if (System.currentTimeMillis() >= time) {//500ms打印一次
                    println("sleep:${count++}")
                    time += 500
                }
            }
        }
    
        delay(1000)
        println("start cancel")
        job.cancelAndJoin()
        println("finished")
    }
    

    结果也是相同的

    3.协程取消的资源释放

    1.可以捕获取消异常,然后在finally中释放
    2.use函数,该函数只能被实现了closeable的对象使用,程序结束时,会自动调用close方法,适合文件对象

    4.不能取消的任务

    处于取消中状态的协程不能被挂起,如果想要协程取消后,还能调用挂起函数,我们需要将清理的代码放入NoCancellable CoroutineContext中
    这样会挂起运行中的代码,并保持协程的取消中状态,直到任务处理完成

    fun `test cancel cpu ensure finally`() = runBlocking {
        //指定别的调度器,不然会阻塞runBlocking
        val job = launch(Dispatchers.Default) {
            var count = 0
            var time = System.currentTimeMillis()
    
            try {
                while (count < 10) {
                    //调用ensureActive,非活跃会抛出取消的异常
                    ensureActive()
                    if (System.currentTimeMillis() >= time) {//500ms打印一次
                        println("sleep:${count++}")
                        time += 500
                    }
                }
            } finally {
                withContext(NonCancellable) {
                    println("isCanceled")
                    delay(1000)
                    println("delay 1000ms finished")
                }
            }
        }
    
        delay(1000)
        println("start cancel")
        job.cancelAndJoin()
        println("finished")
    }
    
    fun main() {
        `test cancel cpu ensure finally`()
    }
    

    结果:
    sleep:0
    sleep:1
    sleep:2
    start cancel
    isCanceled
    delay 1000ms finished
    finished

    5.超时任务

    很多情况下,取消一个协程的原因可能是超时
    1.withTimeout:指定时间,如果规定时间内没有完成,就抛出一个异常

    fun `test cancel withtimeout`() = runBlocking { 
        withTimeout(2000){
            repeat(100){
                println("delay:$it")
                delay(1000)
            }
        }
    }
    
    fun main() {
        `test cancel withtimeout`()
    }
    

    结果:

    delay:0
    delay:1
    Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 2000 ms
        at kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:128)
        at kotlinx.coroutines.TimeoutCoroutine.run(Timeout.kt:94)
        at kotlinx.coroutines.EventLoopImplBase$DelayedRunnableTask.run(EventLoop.kt:307)
        at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.kt:116)
        at kotlinx.coroutines.DefaultExecutor.run(DefaultExecutor.kt:68)
        at java.lang.Thread.run(Thread.java:745)
    

    2.withTimeoutOrNull:如果超时则返回null,否则返回lambda结果

    fun `test cancel withTimeoutOrNull`() = runBlocking {
        val withTimeoutOrNull = withTimeoutOrNull(2000) {
            repeat(100) {
                println("delay:$it")
                delay(1000)
            }
    
            "Done"
        }
        
        println(withTimeoutOrNull)
    }
    

    相关文章

      网友评论

          本文标题:kotlin--协程的启动和取消

          本文链接:https://www.haomeiwen.com/subject/vkpwiltx.html