一、什么是RxJava?
RxJava 在 GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。这就是 RxJava ,概括得非常精准。
二、为什么要用RxJava?
1、异步操作很关键的一点是程序的简洁性,因为在调度过程比较复杂的情况下,异步代码经常会既难写也难被读懂。 Android 创造的 AsyncTask 和Handler ,其实都是为了让异步代码更加简洁。RxJava 的优势也是简洁,但它的简洁的与众不同之处在于,随着程序逻辑变得越来越复杂,它依然能够保持简洁,增加了工程的可维护性。
2、轻松实现线程的切换工作,只需调用 subscribeOn() 和 observeOn() 两个方法。
三、RxJava原理概括
这里以RxJava1来阐述原理,RxJava2引入了最新的背压概念,用于解决上流事件发送太快,下流事件处理不及时问题。我们在使用过程中,只需了解RxJava的基本原理即可。最重要的就是理解RxJava为什么可以实现切换线程以及RxJava中流的转换过程。
订阅的实现:
1、RxJava的异步实现,是通过一种扩展的观察者模式来实现的。
RxJava 的观察者模式大致如下图:
![](https://img.haomeiwen.com/i6023277/6f0777d8b254372c.jpg)
2、 RxJava 使用 create() 方法来创建一个被观察者即Observable ,并为它定义事件触发规则
首先创建一个OnSubscribe对象,在这个对象中定义了一些触发规则,然后调用create()方法来新建一个Observable对象。
Observable observable = Observable.create(new Observable.OnSubscribe() {
@Override
public void call(String subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});
3、创建观察者Subscriber或者Observer,Subscriber 对 Observer 接口进行了一些扩展,但他们的基本使用方式是完全一样的。不仅基本使用方式一样,实质上,在 RxJava 的 subscribe(订阅) 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用。
注意点:这里的概念仅限于RxJava1,在RxJava2中这个概念是不同的,关于不同点可以查看文章最后的RxJava2的相关文章。
Subscriber subscriber = new Subscriber() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
4、订阅
创建了 Observable 和 Observer 之后,再用 subscribe() 方法将它们联结起来,整条链子就可以工作了。代码形式很简单
observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);
我们来看一下subscribe()方法中做了哪些事情?
// 注意:这不是 subscribe() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;
}
可以看到,subscriber() 做了3件事:
- 调用 Subscriber.onStart() 。这个方法在前面已经介绍过,是一个可选的准备方法。
- 调用 Observable 中的 OnSubscribe.call(Subscriber) 。在这里,事件发送的逻辑开始运行。从这也可以看出,在 RxJava 中, Observable 并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候,即当 subscribe() 方法执行的时候。
- 将传入的 Subscriber 作为 Subscription 返回。这是为了方便 unsubscribe()
![](https://img.haomeiwen.com/i6023277/2a530ae89d676b84.jpg)
线程切换、流的变换实现:
1、在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。
在RxJava 中,Scheduler ——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:
-
Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
-
Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
-
Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
-
Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
-
另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。
2、在了解RxJava为什么可以切换线程之前,需要了解一下RxJava的变换
- 首先看一个 map() 的例子
Observable.just("images/logo.png") // 输入类型 String
.map(new Func1() {
@Override
public Bitmap call(String filePath) { // 参数类型 String
return getBitmapFromPath(filePath); // 返回类型 Bitmap
}
})
.subscribe(new Action1() {
@Override
public void call(Bitmap bitmap) { // 参数类型 Bitmap
showBitmap(bitmap);
}
});
这里通过map()函数实现了String->Bitmap的流的转换。
![](https://img.haomeiwen.com/i6023277/d24d87ad061201f6.jpg)
- 如果要实现一对多的转换该怎么办?假设有多个学生,一个学生有多门课程,我要打印每个学生的每门课程该怎么办?使用flatMap()!
Student[] students = ...;
Subscriber subscriber = new Subscriber() {
@Override
public void onNext(Course course) {
Log.d(tag, course.getName());
}
...
};
Observable.from(students)
.flatMap(new Func1() {
@Override
public Observable call(Student student) {
return Observable.from(student.getCourses());
}
})
.subscribe(subscriber);
flatMap() 的原理是这样的:
-
使用传入的事件对象创建一个 Observable 对象。
-
并不发送这个 Observable, 而是将它激活,于是它开始发送事件。
-
每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法
这三个步骤,把事件拆成了两级,通过一组新创建的 Observable 将初始的对象『铺平』之后通过统一路径分发了下去。
![](https://img.haomeiwen.com/i6023277/210f1f026d0f65ac.jpg)
3、变换的原理:lift()
这些变换虽然功能各有不同,但实质上都是针对事件序列的处理和再发送。而在 RxJava 的内部,它们是基于同一个基础的变换方法: lift(Operator)。首先看一下 lift() 的内部实现(仅核心代码):
// 注意:这不是 lift() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
public Observable lift(Operator operator) {
return Observable.create(new OnSubscribe() {
@Override
public void call(Subscriber subscriber) {
Subscriber newSubscriber = operator.call(subscriber);
newSubscriber.onStart();
onSubscribe.call(newSubscriber);
}
});
}
这段代码很有意思:它生成了一个新的 Observable 并返回,而且创建新 Observable 所用的参数 OnSubscribe 的回调方法 call() 中的实现竟然看起来和前面讲过的 Observable.subscribe() 一样!然而它们并不一样哟~不一样的地方关键就在于第二行 onSubscribe.call(subscriber) 中的 onSubscribe 所指代的对象不同(高能预警:接下来的几句话可能会导致身体的严重不适)
- subscribe() 中这句话的 onSubscribe 指的是 Observable 中的 onSubscribe 对象,这个没有问题,但是 lift() 之后的情况就复杂了点。
当含有 lift() 时:
- lift() 创建了一个 Observable 后,加上之前的原始 Observable,已经有两个 Observable 了;
- 而同样地,新 Observable 里的新 OnSubscribe 加上之前的原始 Observable 中的原始 OnSubscribe,也就有了两个 OnSubscribe;
- 当用户调用经过 lift() 后的 Observable 的 subscribe() 的时候,使用的是 lift() 所返回的新的 Observable ,于是它所触发的 onSubscribe.call(subscriber),也是用的新 Observable 中的新 OnSubscribe,即在 lift() 中生成的那个 OnSubscribe;
- 而这个新 OnSubscribe 的 call() 方法中的 onSubscribe ,就是指的原始 Observable 中的原始 OnSubscribe ,在这个 call() 方法里,新 OnSubscribe 利用 operator.call(subscriber) 生成了一个新的 Subscriber(Operator 就是在这里,通过自己的 call() 方法将新 Subscriber 和原始 Subscriber 进行关联,并插入自己的『变换』代码以实现变换),然后利用这个新 Subscriber 向原始 Observable 进行订阅。
![](https://img.haomeiwen.com/i6023277/bb5061e2d561f149.jpg)
4、如果经过多次变换也就是多次lift()之后是什么样的?
如下图所示:
![](https://img.haomeiwen.com/i6023277/f16197960c25f0a9.jpg)
5、变换和线程切换有什么关系?
我们知道subscribeOn() 和 observeOn() 可以实现线程的切换,其实subscribeOn() 和 observeOn() 内部的实现也是lift()。
subscribeOn() 原理图:
![](https://img.haomeiwen.com/i6023277/fbc5c3da04408ffa.jpg)
observeOn() 原理图:
![](https://img.haomeiwen.com/i6023277/5538297d3de31d38.jpg)
从图中可以看出,subscribeOn() 和 observeOn() 都做了线程切换的工作(图中的 "schedule..." 部位)。不同的是, subscribeOn() 的线程切换发生在 OnSubscribe 中,即在它通知上一级 OnSubscribe 时,这时事件还没有开始发送,因此 subscribeOn() 的线程控制可以从事件发出的开端就造成影响;而 observeOn() 的线程切换则发生在它内建的 Subscriber 中,即发生在它即将给下一级 Subscriber 发送事件时,因此 observeOn() 控制的是它后面的线程。
6、最后,我用一张图来解释当多个 subscribeOn() 和 observeOn() 混合使用时,线程调度是怎么发生的(由于图中对象较多,相对于上面的图对结构做了一些简化调整)
![](https://img.haomeiwen.com/i6023277/2d36cb611c3971f0.jpg)
图中共有 5 处含有对事件的操作。由图中可以看出,①和②两处受第一个 subscribeOn() 影响,运行在红色线程;③和④处受第一个 observeOn() 的影响,运行在绿色线程;⑤处受第二个 observeOn() 影响,运行在紫色线程;而第二个 subscribeOn() ,由于在通知过程中线程就被第一个 subscribeOn() 截断,因此对整个流程并没有任何影响。
所以:当使用了多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起作用,作用于 subscribe()函数。每指定一次observeOn()线程则实现了一次线程的切换工作,在observeOn()之后的转换工作或者Subscriber都运行在observeOn()指定的线程中。
关于RxJava相关的拓展
1、原文链接:http://gank.io/post/560e15be2dca930e00da1083
2、Rxjava2:https://www.jianshu.com/p/0cd258eecf60
3、关于背压:https://www.jianshu.com/p/2c4799fa91a4
4、GitHub:https://github.com/ReactiveX/RxJava
网友评论