协程(Coroutine)
协程引入
异步加载图片
-
普通代码:
val view = ... loadImageAsync(url, callback{ bitmap -> uiThread{ view.setImageBitmap(bitmap) } })
异常捕获:
try { val view = ... loadImageAsync(url, callback{ fun onSuccess(bitmap: Bitmap){ uiThread{ try { view.setImageBitmap(bitmap) }catch (error: Exception){ handleError(error) } } } fun onError(error: Throwable){ handleError(error) } }) }catch (error: Exception){ handleError(error) }
-
协程代码:
launch(UI) { val view = ... val deferred = async { loadImage(url) } view.setImageBitmap(deferred.await()) }
异常捕获:
launch(UI) { try { val view = ... val deferred = async { loadImage(url) } view.setImageBitmap(deferred.await()) }catch (error: Exception){ handleError(error) } }
通过上面的例子对比,可以发现使用协程的优点:
把异步的代码转换成同步的写法,免除了回调的问题,并且不会阻塞线程;
请求结果返回时协程自动帮我们切回到主线程;
异常的处理比较简洁,协程内部的异常使用一个try...catch即可捕获;
协程概念
协程是一种非抢占式或者说协作式的计算机程序并发调度的实现,程序可以主动挂起或者恢复执行。
协程和线程,进程不同,它通常不是由操作系统底层直接支持,而是通过编译器和应用层的库实现。
image
Kotlin协程框程架
协程正式版1.0在Kotlin 1.3 上发布,协程的语言支持与 API 已完全稳定。
此后 kotlin 协程不再会被标注为 experimental,在 Kotlin 1.3. 之后的版本就可以使用协程代码了。
引入协程库
dependencies {
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.2.2'
}
image
协程基本术语
-
协程(coroutine)
可挂起计算的实例。它在概念上类似于线程,在这个意义上,它需要一个代码块运行,并具有类似的生命周期 —— 它可以被创建与启动,但它不绑定到任何特定的线程。
它可以在一个线程中挂起其执行,并在另一个线程中恢复。而且,像 future 或 promise 那样,它在执行结束时可能返回某种结果(值或异常)。
-
挂起函数(suspending function)
suspend 修饰符标记的函数。它可能会通过调用其他 挂起函数 挂起执行代码,而不阻塞当前执行线程。
挂起函数 不能在常规代码中被调用,只能在其他 挂起函数 或 挂起 lambda 表达式 中。
在 Android Studio 中 挂起函数 旁边会有一个特殊的标记符号,告诉开发者这是一个 挂起函数。
image
-
挂起 lambda 表达式(suspending lambda)
必须在 协程 中运行的代码块。它看起来很像一个普通的 lambda 表达式,但它的函数类型被 suspend 修饰符标记。
挂起 lambda 表达式 是 匿名挂起函数 的短语法形式。它可能会通过调用其他 挂起函数 挂起执行代码,而不阻塞当前执行线程。
比如Kotlin协程库的 launch 函数的最后一个参数就是 挂起 lambda 表达式。
image
suspend CoroutineScope.() -> Unit,这个类型被称之为 挂起函数类型。 (suspending function type)
-
协程构建器(coroutine builder)
使用一些 挂起 lambda 表达式 作为参数来创建一个 协程 的函数。
Kotlin 定义了一些基础的构建 协程 的函数,比如 launch{}、future{} 、 sequence{} 等。
使用这些基础的 建构函数 可以构造其他 协程 实例。
-
挂起点(suspension point)
协程 执行过程中可能被挂起的位置。
从语法上说,挂起点 是对一个 挂起函数 的调用,但实际的挂起在 挂起函数 调用了标准库中的原始 挂起函数 时发生。
-
续体(continuation)
续体 是挂起的 协程 在 挂起点 时的状态。它在概念上代表它在 挂起点 之后的剩余应执行的代码。
这里,每次调用挂起函数 yield()时,协程都会挂起,其执行的剩余部分被视作 续体。sequence { for (i in 1..10) yield(i * i) println("over") }
所以有 10 个续体:循环运行第一次后,i=2,挂起;循环运行第二次后,i=3,挂起……最后一次打印“over”并完结协程。
协程的启动
1.简单以 CoroutineScope 的扩展函数 launch 启动一个协程:
import kotlinx.coroutines.*
fun main() {
GlobalScope.launch { // 在后台启动一个新的协程并继续
delay(1000L) // 非阻塞的等待 1 秒钟(默认时间单位是毫秒)
println("World!") // 在延迟后打印输出
}
println("Hello,") // 协程已在等待时主线程还在继续
Thread.sleep(2000L) // 阻塞主线程 2 秒钟来保证 JVM 存活
}
launch 函数有三个参数:分别是 上下文、启动模式、协程体。
launch 函数的返回值是 Job 类型。
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
-
参数:上下文
上下文,它的接口类型是 CoroutineContext
-
参数:启动模式
启动模式为 CoroutineStart 枚举类型public enum class CoroutineStart { DEFAULT, LAZY, @ExperimentalCoroutinesApi ATOMIC, @ExperimentalCoroutinesApi UNDISPATCHED; }
四个启动模式中,最常用的其实是 DEFAULT 和 LAZY。
启动模式 | 功能特性 |
---|---|
DEFAULT | 立即开始执行协程体 |
LAZY | 只有在需要(start、join、await)时才开始执行 |
ATOMIC | 立即执行协程体,但在第一个挂起点前不能被取消 |
UNDISPATCHED | 立即在当前线程执行协程体,直到第一个挂起点(后面取决于调度器) |
- 参数:协程体
协程体为一个 挂起函数,类型为suspend CoroutineScope.() -> Unit,是个 挂起lambda表达式。
- 返回值:Job
CoroutineScope.launch 函数返回一个 Job 对象,该对象代表了这个刚刚创建的 协程实例。
Job 对象有不同的状态(刚创建的状态、活跃的状态、执行完毕的状态、取消状态等)。
State | isActive | isCompleted | isCancelled |
---|---|---|---|
New (可选的初始状态) | false |
false |
false |
Active (默认的初始状态) | true |
false |
false |
Completing (中间状态) | true |
false |
false |
Cancelling (中间状态) | false |
false |
true |
Cancelled (最终状态) | false |
true |
true |
Completed (最终状态) | false |
true |
false |
一般而言,Job 创建后都处于 active 状态,表示这个 Job 已经被创建并且被启动了。
通过 协程构建器 函数的 start 参数可以修改这个状态。
比如如果使用 CoroutineStart.LAZY 作为 start 参数,则创建的 Job 处于 new 状态,
这个时候需要通过调用 Job 的 start 或者 join 函数来把该 Job 转换为 active 状态。
处于 active 状态的 Job 表示 协程 正在执行。
如果执行过程中抛出了异常则会把该 Job 标记为 cancelling 状态。
除此之外,还可以通过调用 cancel 函数来把 Job 转换为 cancelling 状态。
然后当 Job 完成后就处于 cancelled 状态。
具有多个子 Job 的父 Job 会等待所有子 job 完成(或者取消)后,自己才会执行完成。
示例,调用 cancel 函数取消协程执行。
import kotlinx.coroutines.*
fun main() = runBlocking {
//sampleStart
val job = launch {
repeat(1000) { i ->
println("job: I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L) // 延迟一段时间
println("main: I'm tired of waiting!")
job.cancel() // 取消该任务
job.join() // 等待任务执行结束
println("main: Now I can quit.")
//sampleEnd
}
程序执行后的输出:
job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.
2.使用 async 启动一个协程:
在概念上,async 就类似于 launch。它启动了一个单独的协程,这是一个轻量级的线程并与其它所有的协程一起并发地工作。
不同之处在于 launch 返回一个Job 并且不附带任何结果值,而 async 返回一个 Deferred。
Deferred 继承自 Job,所以通过 Deferred 也可以和 Job 一样来控制这个 协程。
Deferred 类似于 Java 里面的 future,一个轻量级的非阻塞 future。
通过调用 Deferred 对象的 await() 函数来等待异步结果的返回。
import kotlinx.coroutines.*
fun main() = runBlocking<Unit> {
val one = async { doSomethingUsefulOne() }
val two = async { doSomethingUsefulTwo() }
println("The answer is ${one.await() + two.await()}")
}
suspend fun doSomethingUsefulOne(): Int {
delay(1000L)
return 13
}
suspend fun doSomethingUsefulTwo(): Int {
delay(1000L)
return 29
}
程序执行后的输出:
The answer is 42
协程的调度
上面提到协程上下文作为launch函数的参数。
CoroutineContext 接口的定义:
@SinceKotlin("1.3")
public interface CoroutineContext {
public operator fun <E : Element> get(key: Key<E>): E?
public fun <R> fold(initial: R, operation: (R, Element) -> R): R
public operator fun plus(context: CoroutineContext): CoroutineContext = ...
public fun minusKey(key: Key<*>): CoroutineContext
public interface Key<E : Element>
public interface Element : CoroutineContext {
public val key: Key<*>
...
}
}
CoroutineContext 类似一个集合,它的元素就是源码中看到的 Element。
每一个 Element 都有一个 key,因此它可以作为元素出现,同时它也是 CoroutineContext 的子接口,因此也可以作为集合出现。
通常我们见到的上下文的类型是 CombinedContext 或者 EmptyCoroutineContext
一个表示上下文的组合,另一个表示什么功能都没有的空的上下文。
CoroutineContext 有两个非常重要的元素:Job 和 Dispatcher。
Job 是当前的协程的实例,而 Dispatcher 决定了当前协程体执行的线程。
我们在协程体里面访问到的 coroutineContext 大多是这个 CombinedContext 类型,表示有很多具体的上下文实现的集合。
我们如果想要找到某一个特别的上下文实现,就需要用对应的 Key 来查找:
import kotlinx.coroutines.launch
import kotlinx.coroutines.*
import kotlin.coroutines.ContinuationInterceptor
fun main() {
GlobalScope.launch {
println(coroutineContext[Job]) //StandaloneCoroutine{Active}@4f5a4106
println(coroutineContext[ContinuationInterceptor]) //DefaultDispatcher
}
Thread.sleep(2000)
}
Job 是一个 CoroutineContext.Element 的实现,内部有个伴生对象 Key;
coroutineContext[Job] 这里的 Job 实际上是对它的伴生对象的引用。
public interface Job : CoroutineContext.Element {
/**
* Key for [Job] instance in the coroutine context.
*/
public companion object Key : CoroutineContext.Key<Job> { ... }
...
}
ContinuationInterceptor 表示续体拦截器(以下简称拦截器),也是一个 CoroutineContext.Element 的实现。
@SinceKotlin("1.3")
public interface ContinuationInterceptor : CoroutineContext.Element {
/**
* The key that defines *the* context interceptor.
*/
companion object Key : CoroutineContext.Key<ContinuationInterceptor>
public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
...
}
拦截器可以左右你的协程的执行。
以下是自定义拦截器的例子。让协程代码块运行在自定义的线程中。
suspend fun main() {
val job: Job = GlobalScope.launch(MyContinuationInterceptor()) {
Logger.debug(1)
delay(1000)
Logger.debug(2)
launch {
Logger.debug(3)
}
}
job.join()
}
class MyContinuationInterceptor : ContinuationInterceptor {
override val key = ContinuationInterceptor
override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {
return MyContinuation(continuation)
}
}
class MyContinuation<T>(val continuation: Continuation<T>) : Continuation<T> {
override val context: CoroutineContext = continuation.context
private val executor = Executors.newSingleThreadExecutor {
Thread(it, "MyThreadExecutor").also { it.isDaemon = true }
}
override fun resumeWith(result: Result<T>) {
Logger.debug(result)
executor.submit {
//切换线程(模拟简单的协程调度器)
continuation.resumeWith(result)
}
}
}
所有协程启动的时候,都会有一次 Continuation.resumeWith 的操作。
对于受MyContinuationInterceptor影响的协程启动时,都会执行MyContinuation.resumeWith
delay 是挂起点,1000ms 之后需要继续调度执行该协程,也会执行MyContinuation.resumeWith
18:01:41:337 [main] MyContinuation.resumeWith(ConsoleMain3.kt:46): Success(kotlin.Unit) // ①
18:01:41:358 [MyThreadExecutor] ConsoleMain3Kt$main$job$1.invokeSuspend(ConsoleMain3.kt:20): 1
18:01:42:371 [kotlinx.coroutines.DefaultExecutor] MyContinuation.resumeWith(ConsoleMain3.kt:46): Success(kotlin.Unit) // ②
18:01:42:372 [MyThreadExecutor] ConsoleMain3Kt$main$job$1.invokeSuspend(ConsoleMain3.kt:22): 2
18:01:42:375 [MyThreadExecutor] MyContinuation.resumeWith(ConsoleMain3.kt:46): Success(kotlin.Unit) // ③
18:01:42:376 [MyThreadExecutor] ConsoleMain3Kt$main$job$1$1.invokeSuspend(ConsoleMain3.kt:24): 3
Kotlin协程库中有专门的拦截器的实现,叫做 CoroutineDispatcher(协程调度器,以下简称调度器)
它确定了相应的协程在执行时使用哪些线程。
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
...
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
...
}
它本身是协程上下文的子类,同时实现了拦截器的接口, dispatch 方法会在拦截器的方法 interceptContinuation 中调用,进而实现协程的调度。
所以如果我们想要实现自己的调度器,继承这个类就可以了。
不过通常我们都用协程框架提供的,它们定义在 Dispatchers 当中:
public actual object Dispatchers {
@JvmStatic
public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
@JvmStatic
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
@JvmStatic
public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
@JvmStatic
public val IO: CoroutineDispatcher = DefaultScheduler.IO
}
调度器 | JVM |
---|---|
Default | 线程池 |
Main | UI线程 |
Unconfined | 直接执行,直到遇到第一个挂起点 |
IO | 线程池 |
指定调度器启动协程:
suspend fun main() {
GlobalScope.launch(/** Dispatchers.Default **/) {
Logger.debug(1)
launch(Dispatchers.Main) {
Logger.debug(2)
}
}.join()
}
GlobalScope.launch 默认使用 DefaultDispatcher 调度任务。
Dispatchers.Main 来确保 launch 启动的协程在调度时始终调度到 UI 线程。
在 Android 当中,Dispatchers.Main 引用的实例是 HandlerDispatcher 。
执行结果如下:
19:07:31:494 [DefaultDispatcher-worker-1] DispatchKt$main$2.invokeSuspend(Dispatch.kt:23): 1
19:07:31:591 [AWT-EventQueue-0] DispatchKt$main$2$1.invokeSuspend(Dispatch.kt:25): 2
其他创建调度器的方式:
- newSingleThreadContext 创建线程池中只有一个线程的Dispatcher,使用完毕后需要close。
- newFixedThreadPoolContext 创建在私有的线程池中运行的 Dispatcher,使用完毕后需要close。
- Executor.asCoroutineDispatcher 把Executor 对象转换为一个 Dispatcher 使用。
suspend fun main() {
Executors.newSingleThreadExecutor { r -> Thread(r, "MyThread") }
.asCoroutineDispatcher()
.use { dispatcher ->
GlobalScope.launch(dispatcher) {
Logger.debug(1)
}.join()
}
Logger.debug(2)
}
运行结果:
19:01:36:027 [MyThread] DispatchKt$main$3$1.invokeSuspend(Dispatch.kt:13): 1
19:01:36:051 [MyThread] DispatchKt.main(Dispatch.kt:16): 2
协程的异常处理
通过一个在 GlobalScope 中创建协程的示例来看一下协程的异常处理,中间添加了异常捕获。
fun main() = runBlocking {
val job = GlobalScope.launch{
try {
Logger.debug("launch new coroutine")
launch {
Logger.debug("Throwing exception from launch")
throw IndexOutOfBoundsException()
}
} catch (e: Exception) {
Logger.error("Caught launch Exception: $e") //①
}
}
job.join()
Logger.debug("Joined failed job")
val deferred = GlobalScope.async {
Logger.debug("Throwing exception from async")
throw ArithmeticException() // 没有打印任何东西,依赖用户去调用等待
}
try {
deferred.await()
Logger.debug("Unreached")
} catch (e: ArithmeticException) {
Logger.debug("Caught ArithmeticException") //②
}
}
输出结果:
16:05:04:654 [DefaultDispatcher-worker-1 @coroutine#2] ConsoleExceptionDemoKt$main$1$job$1.invokeSuspend(ConsoleExceptionDemo.kt:9): launch new coroutine
16:05:04:671 [DefaultDispatcher-worker-1 @coroutine#3] ConsoleExceptionDemoKt$main$1$job$1$1.invokeSuspend(ConsoleExceptionDemo.kt:11): Throwing exception from launch
Exception in thread "DefaultDispatcher-worker-1 @coroutine#3" java.lang.IndexOutOfBoundsException
at com.coroutine.console.exception.ConsoleExceptionDemoKt$main$1$job$1$1.invokeSuspend(ConsoleExceptionDemo.kt:12)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(Dispatched.kt:238)
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:594)
at kotlinx.coroutines.scheduling.CoroutineScheduler.access$runSafely(CoroutineScheduler.kt:60)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:742)
16:05:04:678 [main @coroutine#1] ConsoleExceptionDemoKt$main$1.invokeSuspend(ConsoleExceptionDemo.kt:19): Joined failed job
16:05:04:680 [DefaultDispatcher-worker-1 @coroutine#4] ConsoleExceptionDemoKt$main$1$deferred$1.invokeSuspend(ConsoleExceptionDemo.kt:22): Throwing exception from async
16:05:04:712 [main @coroutine#1] ConsoleExceptionDemoKt$main$1.invokeSuspend(ConsoleExceptionDemo.kt:29): Caught ArithmeticException
从上面的输出的log,会发现并没有捕获到①处的异常。②处的异常却被捕获到了。
不同的协程构建器函数有不同的异常传递策略,在协程中异常传递分为两种类型。
一种是自动向上传递(launch 和 actor),另外一种是把错误信息暴露给调用者(async 和 produce)。
前者对待异常是不处理的,类似于 Java 的 Thread.uncaughtExceptionHandler ,而后者依赖调用者来最终处理异常。
如果把代码改写成以下就可以捕获到异常:
fun main() = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
Logger.error("Caught $exception")
}
val job = GlobalScope.launch(handler) {
Logger.debug("launch new coroutine")
launch {
Logger.debug("Throwing exception from launch")
throw IndexOutOfBoundsException()
}
}
job.join()
Logger.debug("Joined failed job")
val deferred = GlobalScope.async {
Logger.debug("Throwing exception from async")
throw ArithmeticException() // 没有打印任何东西,依赖用户去调用等待
}
try {
deferred.await()
Logger.debug("Unreached")
} catch (e: ArithmeticException) {
Logger.debug("Caught ArithmeticException") //②
}
}
16:03:28:861 [DefaultDispatcher-worker-1 @coroutine#2] ConsoleExceptionDemoKt$main$1$job$1.invokeSuspend(ConsoleExceptionDemo.kt:13): launch new coroutine
16:03:28:877 [DefaultDispatcher-worker-1 @coroutine#3] ConsoleExceptionDemoKt$main$1$job$1$1.invokeSuspend(ConsoleExceptionDemo.kt:15): Throwing exception from launch
16:03:28:881 [DefaultDispatcher-worker-1 @coroutine#3] ConsoleExceptionDemoKt$main$1$invokeSuspend$$inlined$CoroutineExceptionHandler$1.handleException(CoroutineExceptionHandler.kt:82): Caught java.lang.IndexOutOfBoundsException
16:03:28:882 [main @coroutine#1] ConsoleExceptionDemoKt$main$1.invokeSuspend(ConsoleExceptionDemo.kt:20): Joined failed job
16:03:28:883 [DefaultDispatcher-worker-1 @coroutine#4] ConsoleExceptionDemoKt$main$1$deferred$1.invokeSuspend(ConsoleExceptionDemo.kt:23): Throwing exception from async
16:03:28:915 [main @coroutine#1] ConsoleExceptionDemoKt$main$1.invokeSuspend(ConsoleExceptionDemo.kt:30): Caught ArithmeticException
CoroutineExceptionHandler
对于通过 launch 和 actor 构建器创建的协程,如果里面抛出了异常需要通过 CoroutineExceptionHandler 来捕获异常。
CoroutineExceptionHandler 类似于 Thread.uncaughtExceptionHandler 全局异常处理。
CoroutineExceptionHandler 并不算是一个全局的异常捕获,因为它只能捕获对应协程内未捕获的异常。
从上面代码看到 CoroutineExceptionHandler 可以作为 launch 函数的参数,也能猜到其类型属于 CoroutineContext。
CoroutineExceptionHandler 接口定义:
public interface CoroutineExceptionHandler : CoroutineContext.Element {
/**
* Key for [CoroutineExceptionHandler] instance in the coroutine context.
*/
public companion object Key : CoroutineContext.Key<CoroutineExceptionHandler>
/**
* Handles uncaught [exception] in the given [context]. It is invoked
* if coroutine has an uncaught exception.
*/
public fun handleException(context: CoroutineContext, exception: Throwable)
}
异常的传播
异常传播还涉及到 CoroutineScope (协程作用域,以下简称作用域)的概念。
CoroutineScope接口的定义:
public interface CoroutineScope {
public val coroutineContext: CoroutineContext
}
CoroutineScope 是 kotlinx.coroutines 提供的一个抽象的封装,用来管理协程的生命周期。
比如:
在Android程序在上下文中启动了多个协程来为某个activity 进行异步操作来拉取以及更新数据,或作动画等。
当 activity 被销毁的时候这些协程必须被取消以防止内存泄漏。就需要用到 CoroutineScope,
这样当这个 CoroutineScope 被取消的时候,里面所有的子协程也会自动取消。
每个协程构建器都是 CoroutineScope 的扩展函数,并且自动的继承了当前作用域的 coroutineContext。
所以要使用协程必须要先创建一个对应的 CoroutineScope。
协程执行代码块的 this 字段就代表了当前使用的 CoroutineScope 实例。
所以就能够在其内部继续启动子协程,比如下面的嵌套的 launch
-
取消与异常
取消与异常紧密相关。协程内部使用 CancellationException 来进行取消。
这个异常会被所有的处理者忽略,所以那些可以被 catch 代码块捕获的异常 仅仅应该被用来作为额外调试信息的资源。
例子1:fun main() = runBlocking { val job1 = GlobalScope.launch { Logger.debug("1") val job2 = launch { Logger.debug("2") throw IndexOutOfBoundsException() } try { job2.join() }catch (e: Exception){ Logger.error("Caught exception: $e") } delay(100) Logger.debug("3") } job1.join() Logger.debug("4") }
子协程launch抛出异常之后,会把父协程给cancel掉了。
父协程已经被cancel,如果再调用 job2.join 就会抛出 JobCancellationException。
运行结果:17:35:28:135 [DefaultDispatcher-worker-1 @coroutine#2] ConsoleExceptionDemoKt$main$1$job1$1.invokeSuspend(ConsoleExceptionDemo.kt:10): 1 17:35:28:151 [DefaultDispatcher-worker-1 @coroutine#3] ConsoleExceptionDemoKt$main$1$job1$1$job2$1.invokeSuspend(ConsoleExceptionDemo.kt:12): 2 17:35:28:172 [DefaultDispatcher-worker-1 @coroutine#2] ConsoleExceptionDemoKt$main$1$job1$1.invokeSuspend(ConsoleExceptionDemo.kt:18): Caught exception: kotlinx.coroutines.JobCancellationException: StandaloneCoroutine is cancelling; job="coroutine#2":StandaloneCoroutine{Cancelling}@31498115 Exception in thread "DefaultDispatcher-worker-1 @coroutine#2" java.lang.IndexOutOfBoundsException at com.coroutine.console.exception.ConsoleExceptionDemoKt$main$1$job1$1$job2$1.invokeSuspend(ConsoleExceptionDemo.kt:13) at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) at kotlinx.coroutines.DispatchedTask.run(Dispatched.kt:238) at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:594) at kotlinx.coroutines.scheduling.CoroutineScheduler.access$runSafely(CoroutineScheduler.kt:60) at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:742) 17:35:28:179 [main @coroutine#1] ConsoleExceptionDemoKt$main$1.invokeSuspend(ConsoleExceptionDemo.kt:24): 4
上面例子也说明了异常的传播方向,子协程会把异常传给了父协程,导致父协程被取消。
例子2:fun main() = runBlocking { val job1 = GlobalScope.launch(CoroutineExceptionHandler { _, exception -> Logger.debug("#1 Caught $exception") //① }) { Logger.debug("1") val job2 = launch(CoroutineExceptionHandler { _, exception -> Logger.debug("#2 Caught $exception") //② }) { Logger.debug("2") throw IndexOutOfBoundsException() } job2.join() Logger.debug("3") } job1.join() Logger.debug("4") }
从运行结果来看,②处并没有输出异常信息,①处输出了异常信息。再次证明了子协程会把异常传给了父协程。
18:04:51:688 [DefaultDispatcher-worker-2 @coroutine#2] ConsoleExceptionDemoKt$main$1$job1$2.invokeSuspend(ConsoleExceptionDemo.kt:14): 1 18:04:51:706 [DefaultDispatcher-worker-2 @coroutine#3] ConsoleExceptionDemoKt$main$1$job1$2$job2$2.invokeSuspend(ConsoleExceptionDemo.kt:18): 2 18:04:51:726 [DefaultDispatcher-worker-2 @coroutine#2] ConsoleExceptionDemoKt$main$1$invokeSuspend$$inlined$CoroutineExceptionHandler$1.handleException(CoroutineExceptionHandler.kt:82): #1 Caught java.lang.IndexOutOfBoundsException 18:04:51:727 [main @coroutine#1] ConsoleExceptionDemoKt$main$1.invokeSuspend(ConsoleExceptionDemo.kt:25): 4
例子3:
父协程遇到异常也会把子协程给取消掉。取消异常向下传播fun main() = runBlocking { val job1 = GlobalScope.launch { Logger.debug("1") launch { Logger.debug("2") delay(3000) Logger.debug("3") } delay(1000) throw IndexOutOfBoundsException() } job1.join() Logger.debug("4") }
运行结果:
18:35:03:788 [DefaultDispatcher-worker-2 @coroutine#2] ConsoleExceptionDemoKt$main$1$job1$1.invokeSuspend(ConsoleExceptionDemo.kt:12): 1 18:35:03:806 [DefaultDispatcher-worker-2 @coroutine#3] ConsoleExceptionDemoKt$main$1$job1$1$1.invokeSuspend(ConsoleExceptionDemo.kt:14): 2 Exception in thread "DefaultDispatcher-worker-3 @coroutine#3" java.lang.IndexOutOfBoundsException at com.coroutine.console.exception.ConsoleExceptionDemoKt$main$1$job1$1.invokeSuspend(ConsoleExceptionDemo.kt:19) at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) at kotlinx.coroutines.DispatchedTask.run(Dispatched.kt:238) at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:594) at kotlinx.coroutines.scheduling.CoroutineScheduler.access$runSafely(CoroutineScheduler.kt:60) at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:742) 18:35:04:856 [main @coroutine#1] ConsoleExceptionDemoKt$main$1.invokeSuspend(ConsoleExceptionDemo.kt:22): 4
上面的例子取消都是双向的:
取消父任务会同时取消所有子任务,而某一个子任务出现的异常被取消,则会导致父任务和所有其他子任务也被同时取消。
还有一种情况,取消是单向的。这种需求很常见。
比如在Android应用中,打开一个界面,在这个Activity 的作用域内启动了多个任务加载不同的数据,
而这些加载不同数据的子任务是相互独立的,某一个失败了不应该影响其他子任务的执行;
而如果这个 Activity 退出被销毁了,则所有请求数据的子任务就没有必要了,需要被取消。
这种行为可以通过 SupervisorJob(监督任务)来实现。
SupervisorJob 它类似于常规的 Job,唯一的区别是 取消 只会向下传播。
示例:
子任务失败,不会取消其他子任务,也不会取消父任务。而父任务能够取消所有子任务的执行。fun main() = runBlocking { val supervisor = SupervisorJob() with(CoroutineScope(coroutineContext + supervisor)) { // 启动第一个子任务 val firstChild = launch(CoroutineExceptionHandler { _, _ -> Logger.error("Caught Exception") }) { Logger.debug("First child is failing") throw AssertionError("First child is cancelled") } // 启动第二个子任务 val secondChild = launch { // 取消了第一个子任务且没有传播给第二个子任务 Logger.debug("First child is cancelled: ${firstChild.isCancelled}, but second one is still active") try { delay(Long.MAX_VALUE) } finally { // 但是取消了监督的传播 Logger.debug("Second child is cancelled because supervisor is cancelled") } } // 等待直到第一个子任务失败且执行完成 firstChild.join() Logger.debug("Cancelling supervisor") supervisor.cancel() secondChild.join() } }
执行结果:
20:34:24:828 [main @coroutine#2] ConsoleExceptionDemoKt$main$1$1$firstChild$2.invokeSuspend(ConsoleExceptionDemo.kt:11): First child is failing 20:34:24:838 [main @coroutine#2] ConsoleExceptionDemoKt$main$1$$special$$inlined$CoroutineExceptionHandler$1.handleException(CoroutineExceptionHandler.kt:82): Caught Exception 20:34:24:839 [main @coroutine#3] ConsoleExceptionDemoKt$main$1$1$secondChild$1.invokeSuspend(ConsoleExceptionDemo.kt:17): First child is cancelled: true, but second one is still active 20:34:24:842 [main @coroutine#1] ConsoleExceptionDemoKt$main$1.invokeSuspend(ConsoleExceptionDemo.kt:27): Cancelling supervisor 20:34:24:864 [main @coroutine#3] ConsoleExceptionDemoKt$main$1$1$secondChild$1.invokeSuspend(ConsoleExceptionDemo.kt:22): Second child is cancelled because supervisor is cancelled
supervisorScope
除了使用 SupervisorJob,也可以使用supervisorScope代替coroutineScope来实现相同的目的。
它与 coroutineScope 的差异在于,它仅单向上传播 取消,并且只有在自身失败时才取消所有孩子。
另外,子任务的执行失败不会传播给它的父任务。每一个子任务应该通过异常处理机制处理自身的异常。
示例:fun main() = runBlocking { val job1 = GlobalScope.launch(CoroutineExceptionHandler { _, exception -> Logger.debug("#1 Caught $exception") //① }) { Logger.debug("1") supervisorScope { launch(CoroutineExceptionHandler { _, exception -> Logger.debug("#2 Caught $exception") //② }) { Logger.debug("2") throw IndexOutOfBoundsException() }.join() Logger.debug("3") launch { Logger.debug("4") } } } job1.join() }
运行结果:
20:24:37:322 [DefaultDispatcher-worker-1 @coroutine#2] ConsoleExceptionDemoKt$main$1$job1$2.invokeSuspend(ConsoleExceptionDemo.kt:11): 1 20:24:37:347 [DefaultDispatcher-worker-1 @coroutine#3] ConsoleExceptionDemoKt$main$1$job1$2$1$2.invokeSuspend(ConsoleExceptionDemo.kt:16): 2 20:24:37:348 [DefaultDispatcher-worker-1 @coroutine#3] ConsoleExceptionDemoKt$main$1$job1$2$1$invokeSuspend$$inlined$CoroutineExceptionHandler$1.handleException(CoroutineExceptionHandler.kt:82): #2 Caught java.lang.IndexOutOfBoundsException 20:24:37:349 [DefaultDispatcher-worker-1 @coroutine#2] ConsoleExceptionDemoKt$main$1$job1$2$1.invokeSuspend(ConsoleExceptionDemo.kt:19): 3 20:24:37:350 [DefaultDispatcher-worker-1 @coroutine#4] ConsoleExceptionDemoKt$main$1$job1$2$1$3.invokeSuspend(ConsoleExceptionDemo.kt:21): 4
总结:
- GlobeScope 启动的协程单独启动一个协程作用域,异常自己消化,不向外部传播。内部的子协程遵从默认的作用域规则。
- coroutineScope 是继承外部 Job 的上下文创建作用域,在其内部的取消操作是双向传播的,子协程未捕获的异常也会向上传递给父协程。它更适合一系列对等的协程并发的完成一项工作,任何一个子协程异常退出,那么整体都将退出。
- supervisorScope 同样继承外部作用域的上下文创建作用域,但其内部的取消操作是单向传播的,父协程向子协程传播,反之则不然,这意味着子协程出了异常并不会影响父协程以及其他兄弟协程。它更适合一些独立不相干的任务,任何一个任务出问题,并不会影响其他任务的工作。
协程的取消
取消是协作的
协程的取消是 协作 的。一段协程代码必须协作才能被取消。
所有 kotlinx.coroutines 中的挂起函数都是 可被取消的。
它们检查协程的取消, 并在取消时抛出 CancellationException 。
示例:
fun main() = runBlocking {
val job1 = launch { // ①
Logger.debug(1)
delay(1000) // ②
Logger.debug(2)
}
delay(100)
Logger.debug(3)
job1.cancel() // ③
Logger.debug(4)
}
执行结果:
14:50:47:604 [main @coroutine#2] ConsoleExceptionDemoKt$main$1$job1$1.invokeSuspend(ConsoleExceptionDemo.kt:9): 1
14:50:47:645 [main @coroutine#1] ConsoleExceptionDemoKt$main$1.invokeSuspend(ConsoleExceptionDemo.kt:14): 3
14:50:47:647 [main @coroutine#1] ConsoleExceptionDemoKt$main$1.invokeSuspend(ConsoleExceptionDemo.kt:16): 4
程序分析:
这段代码 ① 处启动了一个子协程,它内部先输出 1,接着开始 delay, delay 与线程的 sleep 不同,它不会阻塞线程。
你可以认为它实际上就是触发了一个延时任务,告诉协程调度系统 1000ms 之后再来执行后面的代码;
而在这期间,我们在 ③ 处对刚才启动的协程触发了取消,因此在 ② 处的 delay 还没有回调的时候协程就被取消了。
因为 delay 可以响应取消,因此 delay 后面的代码就不会再执行了,因为② 处的 delay 会抛一个 CancellationException。
下面来捕获一下这个Exception:
fun main() = runBlocking {
val job1 = launch { // ①
Logger.debug(1)
try {
delay(1000) // ②
} catch (e: Exception) {
Logger.error(e)
}
Logger.debug(2)
}
delay(100)
Logger.debug(3)
job1.cancel() // ③
Logger.debug(4)
}
15:16:25:924 [main @coroutine#1] ConsoleExceptionDemoKt$main$1.invokeSuspend(ConsoleExceptionDemo.kt:20): 3
15:16:25:926 [main @coroutine#1] ConsoleExceptionDemoKt$main$1.invokeSuspend(ConsoleExceptionDemo.kt:22): 4
15:16:25:943 [main @coroutine#2] ConsoleExceptionDemoKt$main$1$job1$1.invokeSuspend(ConsoleExceptionDemo.kt:15): kotlinx.coroutines.JobCancellationException: Job was cancelled; job="coroutine#2":StandaloneCoroutine{Cancelling}@2be94b0f
15:16:25:943 [main @coroutine#2] ConsoleExceptionDemoKt$main$1$job1$1.invokeSuspend(ConsoleExceptionDemo.kt:17): 2
如果协程正在执行 计算任务,并且没有检查取消的话,那么它是不能被取消的。
fun main() = runBlocking {
//sampleStart
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (i < 5) { // 一个执行计算的循环,只是为了占用 CPU
// 每秒打印消息两次
if (System.currentTimeMillis() >= nextPrintTime) {
Logger.debug("job: I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // 等待一段时间
Logger.debug("main: I'm tired of waiting!")
job.cancelAndJoin() // 取消一个任务并且等待它结束
Logger.debug("main: Now I can quit.")
//sampleEnd
}
输出结果:
15:21:11:418 [DefaultDispatcher-worker-2 @coroutine#2] ConsoleExceptionDemoKt$main$1$job$1.invokeSuspend(ConsoleExceptionDemo.kt:16): job: I'm sleeping 0 ...
15:21:11:856 [DefaultDispatcher-worker-2 @coroutine#2] ConsoleExceptionDemoKt$main$1$job$1.invokeSuspend(ConsoleExceptionDemo.kt:16): job: I'm sleeping 1 ...
15:21:12:356 [DefaultDispatcher-worker-2 @coroutine#2] ConsoleExceptionDemoKt$main$1$job$1.invokeSuspend(ConsoleExceptionDemo.kt:16): job: I'm sleeping 2 ...
15:21:12:665 [main @coroutine#1] ConsoleExceptionDemoKt$main$1.invokeSuspend(ConsoleExceptionDemo.kt:22): main: I'm tired of waiting!
15:21:12:856 [DefaultDispatcher-worker-2 @coroutine#2] ConsoleExceptionDemoKt$main$1$job$1.invokeSuspend(ConsoleExceptionDemo.kt:16): job: I'm sleeping 3 ...
15:21:13:356 [DefaultDispatcher-worker-2 @coroutine#2] ConsoleExceptionDemoKt$main$1$job$1.invokeSuspend(ConsoleExceptionDemo.kt:16): job: I'm sleeping 4 ...
15:21:13:357 [main @coroutine#1] ConsoleExceptionDemoKt$main$1.invokeSuspend(ConsoleExceptionDemo.kt:24): main: Now I can quit.
可以看到它连续打印出了“I'm sleeping” ,甚至在调用取消后, 任务仍然执行了五次循环迭代并运行到了它结束为止。
使计算代码可取消
我们有两种方法来使执行计算的代码可以被取消。
第一种方法是定期调用挂起函数来检查取消。对于这种目的,使用 yield 是一个好的选择。
另一种方法是显式的检查取消状态。让我们试下第二种方法。
fun main() = runBlocking {
//sampleStart
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (isActive) { // 可以被取消的计算循环
// 每秒打印消息两次
if (System.currentTimeMillis() >= nextPrintTime) {
Logger.debug("job: I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // 等待一段时间
Logger.debug("main: I'm tired of waiting!")
job.cancelAndJoin() // 取消一个任务并且等待它结束
Logger.debug("main: Now I can quit.")
//sampleEnd
}
运行结果
15:34:57:214 [DefaultDispatcher-worker-1 @coroutine#2] ConsoleExceptionDemoKt$main$1$job$1.invokeSuspend(ConsoleExceptionDemo.kt:16): job: I'm sleeping 0 ...
15:34:57:649 [DefaultDispatcher-worker-1 @coroutine#2] ConsoleExceptionDemoKt$main$1$job$1.invokeSuspend(ConsoleExceptionDemo.kt:16): job: I'm sleeping 1 ...
15:34:58:149 [DefaultDispatcher-worker-1 @coroutine#2] ConsoleExceptionDemoKt$main$1$job$1.invokeSuspend(ConsoleExceptionDemo.kt:16): job: I'm sleeping 2 ...
15:34:58:460 [main @coroutine#1] ConsoleExceptionDemoKt$main$1.invokeSuspend(ConsoleExceptionDemo.kt:22): main: I'm tired of waiting!
15:34:58:462 [main @coroutine#1] ConsoleExceptionDemoKt$main$1.invokeSuspend(ConsoleExceptionDemo.kt:24): main: Now I can quit.
可以看到,现在循环被取消了。isActive 是一个可以被使用在 CoroutineScope 中的扩展属性。
在 finally 中释放资源
可取消的挂起函数在被取消时会抛出 CancellationException,我们通常使用以下方式处理后续的收尾工作。
-
try {……} finally {……} 表达式
运行结果:fun main() = runBlocking { //sampleStart val job = launch { try { repeat(1000) { i -> println("job: I'm sleeping $i ...") delay(500L) } } finally { println("job: I'm running finally") } } delay(1300L) // 延迟一段时间 println("main: I'm tired of waiting!") job.cancelAndJoin() // 取消该任务并且等待它结束 println("main: Now I can quit.") //sampleEnd }
16:04:37:647 [main @coroutine#2] ConsoleExceptionDemoKt$main$1$job$1.invokeSuspend(ConsoleExceptionDemo.kt:14): job: I'm sleeping 0 ... 16:04:38:162 [main @coroutine#2] ConsoleExceptionDemoKt$main$1$job$1.invokeSuspend(ConsoleExceptionDemo.kt:14): job: I'm sleeping 1 ... 16:04:38:662 [main @coroutine#2] ConsoleExceptionDemoKt$main$1$job$1.invokeSuspend(ConsoleExceptionDemo.kt:14): job: I'm sleeping 2 ... 16:04:38:898 [main @coroutine#1] ConsoleExceptionDemoKt$main$1.invokeSuspend(ConsoleExceptionDemo.kt:22): main: I'm tired of waiting! 16:04:38:948 [main @coroutine#2] ConsoleExceptionDemoKt$main$1$job$1.invokeSuspend(ConsoleExceptionDemo.kt:18): job: I'm running finally 16:04:38:953 [main @coroutine#1] ConsoleExceptionDemoKt$main$1.invokeSuspend(ConsoleExceptionDemo.kt:24): main: Now I can quit.
- Kotlin 的 use 函数
实现了 Closeable 接口的对象可调用use函数
use 函数会自动关闭调用者(无论中间是否出现异常)
可以看出,use 函数内部实现也是通过 try-catch-finally 块捕捉的方式。@RequireKotlin("1.2", versionKind = RequireKotlinVersionKind.COMPILER_VERSION, message = "Requires newer compiler version to be inlined correctly.") public inline fun <T : Closeable?, R> T.use(block: (T) -> R): R { var exception: Throwable? = null try { return block(this) } catch (e: Throwable) { exception = e throw e } finally { when { apiVersionIsAtLeast(1, 1, 0) -> this.closeFinally(exception) this == null -> {} exception == null -> close() else -> try { close() } catch (closeException: Throwable) { // cause.addSuppressed(closeException) // ignored here } } } }
所以不用担心会有异常抛出导致程序退出,无论是正常结束还是出现异常,都能正确关闭调用者。
运行不可取消的代码块
在之前例子中任何尝试在 finally 块中调用挂起函数的代码都会抛出 CancellationException,因为运行此代码的协程被取消了。
所以良好的关闭操作(关闭一个文件、取消一个任务、或是关闭任何一种 通信通道)通常都是非阻塞的,并且不会调用任何挂起函数。
然而,在真实的案例中,当你需要在一个 被取消的协程 中调用挂起函数, 你可以将相应的代码包装在 withContext(NonCancellable) {……} 中。
使用 withContext 函数以及 NonCancellable 上下文,见如下示例所示:
fun main() = runBlocking {
//sampleStart
val job = launch {
try {
repeat(1000) { i ->
Logger.debug("job: I'm sleeping $i ...")
delay(500L)
}
} finally {
withContext(NonCancellable) {
Logger.debug("job: I'm running finally")
delay(1000L) //此处调用挂起函数
Logger.debug("job: And I've just delayed for 1 sec because I'm non-cancellable")
}
}
}
delay(1300L) // 延迟一段时间
Logger.debug("main: I'm tired of waiting!")
job.cancelAndJoin() // 取消该任务并等待它结束
Logger.debug("main: Now I can quit.")
//sampleEnd
}
运行结果:
16:33:36:683 [main @coroutine#2] ConsoleExceptionDemoKt$main$1$job$1.invokeSuspend(ConsoleExceptionDemo.kt:16): job: I'm sleeping 0 ...
16:33:37:199 [main @coroutine#2] ConsoleExceptionDemoKt$main$1$job$1.invokeSuspend(ConsoleExceptionDemo.kt:16): job: I'm sleeping 1 ...
16:33:37:700 [main @coroutine#2] ConsoleExceptionDemoKt$main$1$job$1.invokeSuspend(ConsoleExceptionDemo.kt:16): job: I'm sleeping 2 ...
16:33:37:931 [main @coroutine#1] ConsoleExceptionDemoKt$main$1.invokeSuspend(ConsoleExceptionDemo.kt:28): main: I'm tired of waiting!
16:33:37:990 [main @coroutine#2] ConsoleExceptionDemoKt$main$1$job$1$2.invokeSuspend(ConsoleExceptionDemo.kt:21): job: I'm running finally
16:33:38:991 [main @coroutine#2] ConsoleExceptionDemoKt$main$1$job$1$2.invokeSuspend(ConsoleExceptionDemo.kt:23): job: And I've just delayed for 1 sec because I'm non-cancellable
16:33:39:001 [main @coroutine#1] ConsoleExceptionDemoKt$main$1.invokeSuspend(ConsoleExceptionDemo.kt:30): main: Now I can quit.
超时取消
在大多数情况下,取消协程的执行是因为它执行的时间超过预期的时间了。
虽然,你可以手动获取到协程相应 Job 对象的引用,并启动另外一个协程在延迟一段时间后通过 Job 对象取消那个协程。
然而Kotlin协程库已经为我们准备好 withTimeout 函数来做这件事。
fun main() = runBlocking {
//sampleStart
withTimeout(1300L) {
repeat(1000) { i ->
Logger.debug("I'm sleeping $i ...")
delay(500L)
}
}
//sampleEnd
}
运行结果:
17:02:06:826 [main @coroutine#1] ConsoleExceptionDemoKt$main$1$1.invokeSuspend(ConsoleExceptionDemo.kt:15): I'm sleeping 0 ...
17:02:07:343 [main @coroutine#1] ConsoleExceptionDemoKt$main$1$1.invokeSuspend(ConsoleExceptionDemo.kt:15): I'm sleeping 1 ...
17:02:07:844 [main @coroutine#1] ConsoleExceptionDemoKt$main$1$1.invokeSuspend(ConsoleExceptionDemo.kt:15): I'm sleeping 2 ...
Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms
at kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:128)
at kotlinx.coroutines.TimeoutCoroutine.run(Timeout.kt:94)
at kotlinx.coroutines.EventLoopImplBase$DelayedRunnableTask.run(EventLoop.kt:307)
at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.kt:116)
at kotlinx.coroutines.DefaultExecutor.run(DefaultExecutor.kt:68)
at java.lang.Thread.run(Thread.java:745)
withTimeout 抛出的 TimeoutCancellationException 是 CancellationException 的子类。
我们之前没有看到它的堆栈跟踪打印在控制台上。
这是因为在取消的协程中,CancellationException 被认为是协程完成的正常原因。
但是,在这个例子中,我们在main函数中使用了 withTimeout。
自定义可取消的挂起函数
delay 可以响应取消操作,看下 delay 函数的实现:
public suspend fun delay(timeMillis: Long) {
if (timeMillis <= 0) return // don't delay
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
}
}
关键部分是其使用 suspendCancellableCoroutine 这个挂起内联函数给包装了一下。
/**
* Suspends coroutine similar to [suspendCoroutine], but provide an implementation of [CancellableContinuation] to
* the [block]. This function throws [CancellationException] if the coroutine is cancelled or completed while suspended.
*/
public suspend inline fun <T> suspendCancellableCoroutine(
crossinline block: (CancellableContinuation<T>) -> Unit
): T =
suspendCoroutineUninterceptedOrReturn { uCont ->
val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
// NOTE: Before version 1.1.0 the following invocation was inlined here, so invocation of this
// method indicates that the code was compiled by kotlinx.coroutines < 1.1.0
// cancellable.initCancellability()
block(cancellable)
cancellable.getResult()
}
包括其他的可取消的挂起函数都是使用 suspendCancellableCoroutine 进行了封装。
自定义可取消的挂起函数:
使用Retrofit创建一个网络请求接口:
import okhttp3.OkHttpClient
import retrofit2.Call
import retrofit2.converter.gson.GsonConverterFactory
import retrofit2.http.GET
import retrofit2.http.Path
val gitHubServiceApi by lazy {
val retrofit = retrofit2.Retrofit.Builder()
.client(OkHttpClient.Builder().build())
.baseUrl("https://api.github.com")
.addConverterFactory(GsonConverterFactory.create())
.build()
retrofit.create(GitHubServiceApi::class.java)
}
interface GitHubServiceApi {
@GET("users/{login}")
fun getUser(@Path("login") login: String): Call<User>
}
data class User(val id: String, val name: String, val url: String)
普通调用的代码:
fun main() {
val callback = object : Callback<User> {
override fun onFailure(call: Call<User>, t: Throwable) {
Logger.debug("onFailure: $t")
}
override fun onResponse(call: Call<User>, response: Response<User>) {
Logger.debug("onResponse: ${response.code()}")
if (response.isSuccessful) {
Logger.debug(response.body())
}else{
Logger.error(HttpException(response))
}
}
}
gitHubServiceApi.getUser("JakeWharton").enqueue(callback)
Thread.sleep(3000)
}
运行结果:
15:28:33:274 [OkHttp https://api.github.com/...] GitHubServiceKt$main$1.onResponse(GitHubService.kt:36): onResponse: 200
15:28:33:292 [OkHttp https://api.github.com/...] GitHubServiceKt$main$1.onResponse(GitHubService.kt:38): User(id=66577, name=Jake Wharton, url=https://api.github.com/users/JakeWharton)
转换成可取消的挂起函数:
suspend fun getUser(name: String) = suspendCancellableCoroutine<User> { continuation ->
val call = gitHubServiceApi.getUser(name)
continuation.invokeOnCancellation {
Logger.debug("invokeOnCancellation: cancel the request.")
call.cancel()
}
call.enqueue(object : Callback<User> {
override fun onFailure(call: Call<User>, t: Throwable) {
Logger.debug("onFailure: $t")
continuation.resumeWithException(t)
}
override fun onResponse(call: Call<User>, response: Response<User>) {
Logger.debug("onResponse: ${response.code()}")
if (response.isSuccessful) {
response.body()?.let { continuation.resume(it) } ?: continuation.resumeWithException(
NullPointerException("User is Null")
)
} else {
continuation.resumeWithException(HttpException(response))
}
}
})
}
在协程中调用并且取消:
suspend fun main() {
val job = GlobalScope.launch {
Logger.debug(1)
val user = getUser("JakeWharton")
Logger.debug(user)
}
Logger.debug(2)
job.cancelAndJoin()
Logger.debug(3)
}
运行结果:
15:53:07:886 [main] ConsoleCancellableKt.main(ConsoleCancellable.kt:15): 2
15:53:07:886 [DefaultDispatcher-worker-1] ConsoleCancellableKt$main$job$1.invokeSuspend(ConsoleCancellable.kt:11): 1
15:53:08:450 [DefaultDispatcher-worker-1] GetUserKt$getUser$2$1.invoke(GetUser.kt:18): invokeOnCancellation: cancel the request.
15:53:08:453 [DefaultDispatcher-worker-1] ConsoleCancellableKt.main(ConsoleCancellable.kt:17): 3
15:53:08:454 [OkHttp https://api.github.com/...] GetUserKt$getUser$2$2.onFailure(GetUser.kt:24): onFailure: java.io.IOException: Canceled
从日志中看到,取消的回调被调用了,OkHttp 也确实停止了网络请求,并且回调给我们一个 IO 异常,这时候我们的协程已经被取消。
在处于 取消状态的协程 上调用 Continuation.resume 、 Continuation.resumeWithException 或者 Continuation.resumeWith 都会被忽略。
因此 OkHttp 回调中我们收到 IO 异常后调用的 continuation.resumeWithException(e) 不会有任何副作用。
Retrofit 2.6.0中使用了协程,也是用了同样的方式。
suspend fun launchForEach() {
coroutineScope {
launch {
listOf("JakeWharton", "abreslav", "yole", "elizarov")
.forEach {
try {
val user = gitHubServiceApi.getUser(it).await()
Logger.debug(user)
} catch (e: Exception) {
Logger.error(e)
}
delay(1000)
}
}.join()
}
}
14:07:31:568 [OkHttp https://api.github.com/...] GitHubServiceKt$gitHubServiceApi$2$retrofit$1.intercept(GitHubService.kt:19): request: 200
14:07:31:587 [main] Demo0Kt$launchForEach$2$1.invokeSuspend(Demo0.kt:16): User(id=66577, name=Jake Wharton, url=https://api.github.com/users/JakeWharton)
14:07:32:931 [OkHttp https://api.github.com/...] GitHubServiceKt$gitHubServiceApi$2$retrofit$1.intercept(GitHubService.kt:19): request: 200
14:07:32:933 [main] Demo0Kt$launchForEach$2$1.invokeSuspend(Demo0.kt:16): User(id=888318, name=Andrey Breslav, url=https://api.github.com/users/abreslav)
14:07:34:285 [OkHttp https://api.github.com/...] GitHubServiceKt$gitHubServiceApi$2$retrofit$1.intercept(GitHubService.kt:19): request: 200
14:07:34:287 [main] Demo0Kt$launchForEach$2$1.invokeSuspend(Demo0.kt:16): User(id=46553, name=Dmitry Jemerov, url=https://api.github.com/users/yole)
14:07:35:626 [OkHttp https://api.github.com/...] GitHubServiceKt$gitHubServiceApi$2$retrofit$1.intercept(GitHubService.kt:19): request: 200
14:07:35:636 [main] Demo0Kt$launchForEach$2$1.invokeSuspend(Demo0.kt:16): User(id=478679, name=Roman Elizarov, url=https://api.github.com/users/elizarov)
Android中使用协程
引入协程库
dependencies {
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.2.2'
}
kotlinx-coroutines-android 依赖的库:
kotlinx-coroutines-android 这个库里面的代码很少,协程的大部分实现都在 kotlinx-coroutines-core 这个库里。
任务调度器
kotlinx-coroutines-android 提供了 Dispatchers.Main 调度器 在Android平台上的实现 HandlerDispatcher。
可以从源码上看到,任务的调度都是通过 Handler 来实现:
UI 生命周期作用域
在Android中, 我们想让发出去的请求能够在当前 UI 或者 Activity 退出或者销毁的时候能够自动取消,就要与其生命周期绑定。
协程有一个很天然的特性能刚够支持这一点,那就是作用域。官方也提供了 MainScope 这个函数。
/**
* Creates the main [CoroutineScope] for UI components.
*
* Example of use:
* ```
* class MyAndroidActivity {
* private val scope = MainScope()
*
* override fun onDestroy() {
* super.onDestroy()
* scope.cancel()
* }
* }
*
* ```
*
* The resulting scope has [SupervisorJob] and [Dispatchers.Main] context elements.
* If you want to append additional elements to the main scope, use [CoroutineScope.plus] operator:
* `val scope = MainScope() + CoroutineName("MyActivity")`.
*/
@Suppress("FunctionName")
public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)
从 MainScope 的源码可以看到,MainScope 是由 SupervisorJob() 和 Dispatchers.Main 共同完成。
使用 SupervisorJob 保证子任务失败不会导致父任务被取消;父任务能够取消所有的子任务。
使用 Dispatchers.Main 让协程体运行在主线程中。因此作用域内除非明确声明调度器,协程体都调度在主线程执行。
示例:
class MainActivity : AppCompatActivity() {
private val mainScope = MainScope()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
button.setOnClickListener {
//启动一个协程
mainScope.launch {
Logger.debug(1)
textView.text = withContext(Dispatchers.IO) { //任务调度在IO线程池
Logger.debug(2)
delay(3000) //模拟耗时操作
Logger.debug(3)
"Hello Coroutines"
}
Logger.debug(4)
}
}
}
override fun onDestroy() {
super.onDestroy()
mainScope.cancel() //Activity退出时取消协程
}
}
运行结果:
D/Coroutine: [main] MainActivity$onCreate$1$1.invokeSuspend(MainActivity.kt:19): 1
D/Coroutine: [DefaultDispatcher-worker-1] MainActivity$onCreate$1$1$1.invokeSuspend(MainActivity.kt:21): 2
D/Coroutine: [DefaultDispatcher-worker-2] MainActivity$onCreate$1$1$1.invokeSuspend(MainActivity.kt:23): 3
D/Coroutine: [main] MainActivity$onCreate$1$1.invokeSuspend(MainActivity.kt:26): 4
带有作用域的抽象Activity
尽管我们在上面直接使用 MainScope 可以很方便的控制其作用域范围内的协程的取消,
以及能够无缝将异步任务切回主线程,这都是我们想要的特性,不过写法上还是不够美观。
官方推荐我们定义一个抽象的 Activity,例如:
abstract class ScopedActivity : AppCompatActivity(), CoroutineScope by MainScope() {
override fun onDestroy() {
super.onDestroy()
cancel()
}
}
这样在 Activity 退出的时候,对应的作用域就会被取消,所有在该 Activity 中发起的请求都会被取消掉。
使用时,只需要继承这个抽象类即可:
class CoroutineActivity : ScopedActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
button.setOnClickListener {
//启动一个协程
launch {
Logger.debug(1)
textView.text = withContext(Dispatchers.IO) { //任务调度在IO线程池
Logger.debug(2)
delay(3000) //模拟耗时操作
Logger.debug(3)
"Hello Coroutines"
}
Logger.debug(4)
}
}
}
}
除了在当前 Activity 内部获得 MainScope 的能力外,还可以将这个 作用域 实例传递给其他需要的模块。
例如 Presenter 通常也需要与 Activity保持同样的生命周期,因此必要时也可以将该作用域传递过去:
class CoroutineActivity : ScopedActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
button.setOnClickListener {
//启动一个协程
launch {
Logger.debug(1)
textView.text = withContext(Dispatchers.IO) {
//任务调度在IO线程池
Logger.debug(2)
delay(3000) //模拟耗时操作
Logger.debug(3)
"Hello Coroutines"
}
Logger.debug(4)
}
}
next.text = "Get User Data"
next.setOnClickListener {
CoroutinePresenter(this@CoroutineActivity).getUserData(textView)
}
}
}
class CoroutinePresenter(private val scope: CoroutineScope) : CoroutineScope by scope {
fun getUserData(textView: TextView) {
Logger.debug(1)
launch {
delay(2000)
Logger.debug(3)
textView.text = "Kotlin Coroutines"
}
Logger.debug(2)
}
}
多数情况下,Presenter 的方法也会被 Activity 直接调用,因此也可以将 Presenter 的方法生命成 suspend 方法,
然后用 coroutineScope 嵌套作用域,这样 MainScope 被取消后,嵌套的子作用域一样也会被取消,进而达到取消全部子协程的目的:
next.setOnLongClickListener {
launch {
val data = CoroutinePresenter2().getUserData()
Logger.debug("set User data")
textView.text = data
textView.setTextColor(Color.RED)
}
false
}
class CoroutinePresenter2 {
suspend fun getUserData() = coroutineScope{
Logger.debug("start get user data")
delay(1000)
val deferred = async(Dispatchers.IO) {
delay(2000)
Logger.debug("return user data")
"User Data is Empty"
}
deferred.await()
}
}
输出log:
D/Coroutine: [main] CoroutinePresenter2$getUserData$2.invokeSuspend(CoroutinePresenter2.kt:9): start get user data
D/Coroutine: [DefaultDispatcher-worker-1] CoroutinePresenter2$getUserData$2$deferred$1.invokeSuspend(CoroutinePresenter2.kt:13): return user data
D/Coroutine: [main] CoroutineActivity$onCreate$3$1.invokeSuspend(CoroutineActivity.kt:42): set User data
总结:
在 Android 上使用协程,更多的就是简化异步逻辑,把异步的代码改成同步的写法。
协程为我们提供了 Dispatchers.Main 调度器,让我们的UI 逻辑在 UI 线程中处理。
如果涉及到一些 io 操作,使用 async 将其调度到 Dispatchers.IO 上,结果返回时协程会自动帮我们切回到主线程。
对于一些 UI 不相关的逻辑,通常使用 Dispatchers.Default 就足够使用了。
网友评论