5. 线程控制:Scheduler (二)
除了灵活的变换,RxJava 另一个牛逼的地方,就是线程的自由控制。
5.1 Scheduler 的 API (二)
前面讲到了,可以利用subscribeOn()
结合observeOn()
来实现线程控制,让事件的产生和消费发生在不同的线程。可是在了解了map() flatMap()
等变换方法后,有些好事的(其实就是当初刚接触 RxJava 时的我)就问了:能不能多切换几次线程?
答案是:能。因为observeOn()
指定的是Subscriber
的线程,而这个 Subscriber
并不是(严格说应该为『不一定是』,但这里不妨理解为『不是』)subscribe()
参数中的Subscriber
,而是 observeOn()
执行时的当前Observable
所对应的Subscriber
,即它的直接下级 Subscriber
。换句话说,observeOn()
指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次observeOn()
即可。上代码:
Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map(mapOperator) // 新线程,由 observeOn() 指定
.observeOn(Schedulers.io())
.map(mapOperator2) // IO 线程,由 observeOn() 指定
.observeOn(AndroidSchedulers.mainThread)
.subscribe(subscriber); // Android 主线程,由 observeOn() 指定
如上,通过observeOn()
的多次调用,程序实现了线程的多次切换。不过,不同于observeOn()
,subscribeOn()
的位置放在哪里都可以,但它是只能调用一次的。又有好事的(其实还是当初的我)问了:如果我非要调用多次subscribeOn()
呢?会有什么效果?
这个问题先放着,我们还是从 RxJava 线程控制的原理说起吧。
5.2 Scheduler 的原理(二)
其实,subscribeOn()
和observeOn()
的内部实现,也是用的lift()
。具体看图(不同颜色的箭头表示不同的线程):

observeOn()
原理图:

从图中可以看出,subscribeOn()
和observeOn()
都做了线程切换的工作(图中的 "schedule..." 部位)。不同的是,subscribeOn()
的线程切换发生在OnSubscribe
中,即在它通知上一级OnSubscribe
时,这时事件还没有开始发送,因此subscribeOn()
的线程控制可以从事件发出的开端就造成影响;而observeOn()
的线程切换则发生在它内建的 Subscriber
中,即发生在它即将给下一级Subscriber
发送事件时,因此observeOn()
控制的是它后面的线程。
最后,我用一张图来解释当多个 subscribeOn() 和 observeOn() 混合使用时,线程调度是怎么发生的(由于图中对象较多,相对于上面的图对结构做了一些简化调整):

图中共有 5 处含有对事件的操作。由图中可以看出,①和②两处受第一个 subscribeOn() 影响,运行在红色线程;③和④处受第一个observeOn()
的影响,运行在绿色线程;⑤处受第二个onserveOn()
影响,运行在紫色线程;而第二个subscribeOn()
,由于在通知过程中线程就被第一个 subscribeOn()
截断,因此对整个流程并没有任何影响。这里也就回答了前面的问题:当使用了多个subscribeOn()
的时候,只有第一个 subscribeOn()
起作用。
5.3 延伸:doOnSubscribe()
然而,虽然超过一个的subscribeOn()
对事件处理的流程没有影响,但在流程之前却是可以利用的。
在前面讲Subscriber
的时候,提到过Subscriber
的onStart()
可以用作流程开始前的初始化。然而onStart()
由于在subscribe()
发生时就被调用了,因此不能指定线程,而是只能执行在subscribe()
被调用时的线程。这就导致如果onStart()
中含有对线程有要求的代码(例如在界面上显示一个ProgressBar
,这必须在主线程执行),将会有线程非法的风险,因为有时你无法预测subscribe()
将会在什么线程执行。
而与Subscriber.onStart()
相对应的,有一个方法 Observable.doOnSubscribe()
。它和Subscriber.onStart()
同样是在subscribe()
调用后而且在事件发送前执行,但区别在于它可以指定线程。默认情况下,doOnSubscribe()
执行在subscribe()
发生的线程;而如果在doOnSubscribe()
之后有subscribeOn()
的话,它将执行在离它最近的subscribeOn()
所指定的线程。
示例代码:
Observable.create(onSubscribe)
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0() {
@Override
public void call() {
progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行
}
})
.subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
如上,在doOnSubscribe()
的后面跟一个subscribeOn()
,就能指定准备工作的线程了。
RxJava 的适用场景和使用方式
1. 与 Retrofit 的结合
Retrofit 是 Square 的一个著名的网络请求库。没有用过 Retrofit 的可以选择跳过这一小节也没关系,我举的每种场景都只是个例子,而且例子之间并无前后关联,只是个抛砖引玉的作用,所以你跳过这里看别的场景也可以的。
2. RxBinding
RxBinding 是 Jake Wharton 的一个开源库,它提供了一套在 Android 平台上的基于 RxJava 的 Binding API。所谓 Binding,就是类似设置 OnClickListener、设置 TextWatcher这样的注册绑定对象的 API。
3. 各种异步操作
前面举的 Retrofit 和 RxBinding 的例子,是两个可以提供现成的 Observable 的库。而如果你有某些异步操作无法用这些库来自动生成 Observable,也完全可以自己写。例如数据库的读写、大图片的载入、文件压缩/解压等各种需要放在后台工作的耗时操作,都可以用 RxJava 来实现,有了之前几章的例子,这里应该不用再举例了。
4. RxBus
RxBus 名字看起来像一个库,但它并不是一个库,而是一种模式,它的思想是使用 RxJava 来实现了 EventBus ,而让你不再需要使用 Otto 或者 GreenRobot 的 EventBus。至于什么是 RxBus,可以看这篇文章。顺便说一句,Flipboard 已经用 RxBus 替换掉了 Otto ,目前为止没有不良反应。
最后
对于 Android 开发者来说, RxJava 是一个很难上手的库,因为它对于 Android 开发者来说有太多陌生的概念了,可是它真的很牛逼。希望此文能给始终搞不明白什么是 RxJava 的人一些入门的指引,或者能让正在使用 RxJava 但仍然心存疑惑的人看到一些更深入的解析。无论如何,只要能给各位同为 Android 工程师的你们提供一些帮助,该系列的目的就达到了。希望大家多多支持,关注我后续还有更多Android开发经验分享。
【附录】

需要资料的朋友可以加入Android架构交流QQ群聊:513088520
点击链接加入群聊【Android移动架构总群】:加入群聊
获取免费学习视频,学习大纲另外还有像高级UI、性能优化、架构师课程、NDK、混合式开发(ReactNative+Weex)等Android高阶开发资料免费分享。
网友评论