概述
在上一篇博客中,我们学习了如何在后台执行耗时的运算任务。倘若我们想终止运算呢?
这篇文章就是关于取消执行任务的。
取消执行任务意味着什么呢?
我们想要能够取消一个已经被 RxJava 或者 Coroutines 创建的运算执行。这个运算或同步或异步。
在 Android 开发中,这是一个很重要的用例,最常见的场景可能是当 View 正被销毁的时候。在这个场景下,我们可能通常想要取消真正执行的任务,比如:网络请求、一个耗时的对象初始化,等等。
RxJava
正如上一篇博客一样,我们打算暂且忽略 RxJava 传输流的能力。仅仅思考我们如何使用 RxJava 取消执行呢?(注:这里其实有点推锅的意思,大体意思可以理解为,文章里的代码不是标准的范例,甚至是违背了 RxJava 的编码特质的,文章的聚焦点主要是对比 RxJava 和 协程。)
让我们想像我们使用 间隔操作符 创建一个 定时器 ,代码如下:
Observable.interval(1, TimeUnit.SECONDS)
当你订阅这个 observable ,定时器将被触发,然后它将每秒发射一个事件到订阅者。
你如何取消这个定时器呢?
当订阅定时器(调用 .subscribe() ),它将返回一个 Disposable 对象,代码如下:
val disposable: Disposable =
Observable.interval(1, TimeUnit.SECONDS).subscribe()
你能调用 Disposable 的 dispose() 来取消执行。这个 Observable 将结束发射,如下:
disposable.dispose()
注意事项
如果你手动的创建 Observable,没有使用任何创建类运算符(如:interval),那么你不需要处理取消运算。(注:通常自定 Observable 是不推荐的,最好都要使用 RxJava 库已有运算符创建 Observable,当然大神除外,推荐大家可以通过阅读文章 关于RxJava的Tips & Tricks 和 Common Mistakes in RxJava )
如果这个 Observable 可以被取消,我们必须在被订阅的发射器调用之前,检查发射器是否被处理掉了,如下:
Observable.create<Int> { emitter ->
for (i in 1..5) {
if (!emitter.isDisposed) {
emitter.onNext(i)
} else {
break
}
}
emitter.onComplete()
}
上面的代码可以看到,如果订阅者被销毁,我们能跳过这次多余的发射。根据 Observable.create 的源码,若我们不跳过,这代码会继续执行下去,并且 Observable 会忽略 emitter.onNext(i) 的调用。(注:其实在平时开发中,取消正在执行的 RxJava 异步任务,由于业务逻辑复杂,通常不止一个 Observable ,更多的还是用到 CompositeDisposable)
协程
协程本身就是一个运算实例。取消协程意味着停止它的挂起 lambda 表达式的执行任务。
我们能够使用 Coroutine Job 来取消执行任务,它是 Coroutine Context 的一部分。
Coroutine Job 暴露了一个取消协程任务的方法。当我们期望取消时,就可以调用 cancel() 方法。
例如,Coroutine Builder launch 就返回一个协程的 Job 接口实例。
val job = launch(CommonPool) {
// my suspending block
}
...
job.cancel()
如上所示,我们赋值到一个变量中,然后就可以调用取消了。
以上就是从协程获取 Job 然后取消它的例子了。那么还能有另一种方式实现吗?当然,还能通过指定一个 Job 给协程。这样可以实现更复杂的业务逻辑。
一些协程构造者(如:launch 和 async)能接收一个名为 parent 的参数,你能设置它作为协程创建的 Job ,代码如下:
val parentJob = Job()
async(CommonPool, parent = parentJob) {
// my suspending block
}
parentJob.cancel()
上面实现途径的好处之一就是,你能共享上面的 parentJob 实例给多个协程,这样当你调用 parentJob.cancel() 时,你就能取消所有持有 parentJob 的协程。
这个方式有点类似于 RxJava 的 CompositeDisposable,调用一次就可以销毁多个订阅者。
val parentJob = Job()
val deferred1 = async(CommonPool, parent = parentJob) {
// my suspending block
}
val deferred2 = async(CommonPool, parent = parentJob) {
// my suspending block
}
parentJob.cancel() // Cancels both Coroutines
当共享 Job 给不同的协程时,一定要注意:当你取消一个 Job 后,你必须重新创建它,你不可以再将已取消的 Job 分配给另一个协程。
有另一种实现方式,那就是通过组合 Coroutine Context。你能使用加号运算符实现它。
val parentJob = Job()
launch(parentJob + CommonPool) {
// my suspending block
}
parentJob.cancel()
在上面的例子,协程的上下文结果就是由 parentJob 和 CommonPool 组合而成。这个线程化策略被 CommonPool 定义然后 Job 的值来源于 parentJob。
如果你想了解更多关于组合上下文的内容,你可以阅读 Kotlin 协程文档的 这一章节: lifecycle-and-coroutine-parent-child-hierarchy
注意事项
正如 RxJava 一样,你必须认真考虑在协程取消这一场景。
val job = launch(CommonPool) {
for (i in 1..5) {
heavyComputation()
}
}
job.cancel()
如果我们尝试执行这段代码,它将重复执行 5 次耗时运算,由于这段代码并未做好被取消的准备。
我们要如何改进它呢?
如同在 RxJava 中检查订阅者是否存在一样,我们需要检查协程是否活跃。
val job = launch(CommonPool) {
for (i in 1..5) {
if (!isActive) { break }
heavyComputation()
}
}
job.cancel()
isActive 是 Job 实例的一个内置变量,它能在协程内被访问(coroutineContext 是另一个变量)。
一些挂起函数 (suspending functions) 存在于协程标准库中,为我们处理取消协程。让我们来看看 delay 这个函数。
val job = launch(CommonPool) {
doSomething()
delay(300) // It’s going to cancel at this point
doSomething()
}
job.cancel()
Delay 是一个挂起函数,它能为我们处理取消任务。然而,如果你使用 Thread.sleep 代替 delay ,由于它是阻塞线程的并且没有挂起协程,所以它不会被取消。
val job = launch(CommonPool) {
doSomething()
Thread.sleep(300) // It’s NOT going to cancel execution
doSomething()
}
job.cancel()
Thread.sleep 不会让我们取消任务。它甚至都不是一个挂起函数! 即使我们调用 job.cancel(),协程也不会被取消。
在上面的例子中 Thread.sleep 你并不会使用它。如果你非常非常需要,那么在阻塞的前后都要检查协程是否活跃,如下所示:
val job = launch(CommonPool) {
doSomething()
if (!isActive) return
Thread.sleep(300) // It’s NOT going to cancel execution
if (!isActive) return
doSomething()
}
job.cancel()
网友评论