前言
理解lift原理有什么意义?
- 可以理解Rxjava最核心的原理,看懂了lift你就看懂了Rxjava
lift是Rxjava操作符的基础原理,操作符是Rxjava功能如此丰富和好用的核心,理解了lift也就理解了Rxjava最核心的原理
- 可以理解线程切换的原理,有助于灵活运用线程切换和调试线程相关的问题
线程切换也是用的操作符,所以原理也是lift
RxJava基本用法
- 创建 Observer
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
Observer 需要实现三个方法,相当于定义了三种类型的事件
- Subscriber 与 Observer的关系
Subscriber相当于增加了Subscription(订阅关系管理)功能的Observer
Subscription 包含两个方法:
unsubscribe();(取消订阅)
isUnsubscribed();(查询订阅关系)
Subscriber 还增加了一个onStart()方法:它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作
实质上,在 RxJava 的 subscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用,所以为了统一,我们就统一以Subscriber来作为观察者,就不再提Observer了。
- 创建 Observable
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});
创建Observable对象时,会传入一个 OnSubscribe 对象,OnSubscribe对象会被存储在生成的 Observable 对象中。
OnSubscribe 对象的作用,就是拿到 subscriber对象,向subscriber 对象发送事件。
这里拿到观察者Subscriber对象,并调用Subscriber实现的三个方法,就是在向观察者发送事件
- Subscribe (订阅)
observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);
Observable.subscribe(Subscriber) 的内部实现是这样的(仅核心代码):
// 注意:这不是 subscribe() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;
}
可以看到,subscriber() 做了3件事:
- 调用 Subscriber.onStart() 。这个方法在前面已经介绍过,是一个可选的准备方法。
- 调用 Observable 中的 OnSubscribe.call(Subscriber) 。在这里,事件发送的逻辑开始运行。从这也可以看出,在 RxJava 中, Observable 并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候,即当 subscribe() 方法执行的时候。
- 将传入的 Subscriber 作为 Subscription 返回。这是为了方便进行订阅关系管理,比如 unsubscribe().
操作符-对事件序列进行变换
所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。概念说着总是模糊难懂的,来看 例子。
举个例子:map()
Observable.just("images/logo.png") // 输入类型 String
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String filePath) { // 参数类型 String
return getBitmapFromPath(filePath); // 返回类型 Bitmap
}
})
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) { // 参数类型 Bitmap
showBitmap(bitmap);
}
});
map(): 对事件对象的直接变换。
变换的原理:lift()
这些变换虽然功能各有不同,但实质上都是针对事件序列的处理和再发送。而在 RxJava 的内部,它们是基于同一个基础的变换方法: lift(Operator)。首先看一下 lift() 的内部实现(仅核心代码):
// 注意:这不是 lift() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
return Observable.create(new OnSubscribe<R>() {
@Override
public void call(Subscriber subscriber) {
Subscriber newSubscriber = operator.call(subscriber);
newSubscriber.onStart();
onSubscribe.call(newSubscriber);
}
});
}
先不用关心细节,我们先知道 lift是接受一个operator参数,返回一个新的Observable对象。
在分析 lift() 的内部实现之前,我们先看一下加上操作符的一次调用的完整过程
一次包含操作符的调用的完整过程
Observable.just("images/logo.png") // 输入类型 String
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String filePath) { // 参数类型 String
return getBitmapFromPath(filePath); // 返回类型 Bitmap
}
})
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) { // 参数类型 Bitmap
showBitmap(bitmap);
}
});
做了三步:
1.生成observable对象
2.在observable对象上调用了map方法
3.在map方法的返回值上调用了.subscribe方法
问题:1.map方法内部做了什么? 2.map方法的返回值是个什么?
回答两个问题
因为map()内部不是直接调用的lift方法(跟lift的原理一样,只是没有直接使用lift方法),所以我们以take()操作符的源码为例,来看方法调用。
回答两个问题:
- take方法内部很简单,就是调用了lift方法
2.返回值就是lift方法的返回值,是个新new的observable对象。
将示例稍作修改
Observable observable1 = Observable.just("images/logo.png"); // 输入类型 String
Observable observable2 = observable1.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String filePath) { // 参数类型 String
return getBitmapFromPath(filePath); // 返回类型 Bitmap
}
});
observable2.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) { // 参数类型 Bitmap
showBitmap(bitmap);
}
});
observable2.subscribe 分析
回顾前面subscribe()的内部实现,我们发现observable2里的OnSubscribe对象的call方法会被调用。而observable2就是lift方法返回的Observable对象, observable2里的onSubscribe对象就是lift的核心重点。
observable2里的OnSubscribe对象 —lift的核心重点
observable2对象里面通过observable1对象的onSubscribe.call(newSubscriber)达到通知observable1目的。
因为observable1对象是我们初始的Observable对象,它的onSubscribe.call会发送事件到newSubscriber。
newSubscriber是operator.call(subscriber)返回的,newSubscriber做了两件事:
1.进行事件的变换操作。newSubscriber能拿到初始的事件,可以进行转换操作,这也是操作符发生效力的地方,不同的操作符的作用就是对事件进行不同的转换。
2.转发给subscriber,newSubscriber有subscriber的引用,可以将转换后的事件转发给subscriber,也就是最终的订阅者。
lift方法返回的observable2对象在调用链的中间起到了一个中转的作用,这就是lift原理的核心。
operator.call内部通过传入一个subscriber返回一个newSubscriber,newSubscriber能达到事件转换和转发的目的。
是如何做到的?
我们来举个例子
Operator的一个例子
多个操作符的情况
多个操作符相当于中间经过了多层中转,原理都一样
关于事件发送的触发
结合上面多个操作符的图,强调一下:
事件发送的触发是从调用subscribe()方法后开始的,前面哪怕调用了N多的操作符方法只要还没有调用subscribe()方法其实并没有触发事件的发送。
事件发送之前有一个从下往上通知的过程,当subscribe()方法被调用之后,先是通过observable2对象里调用observable1对象的onSubscribe.call通知observable1对象,如果observable1对象不是最开始发送事件的Observable对象(多个操作符的情况),那么同样的还会继续往上通知,直到通知到初始的Observable对象,才会开始事件的发送。
所以RxJava是 先从下往上通知,然后再从上往下发送事件
参考文献
给 Android 开发者的 RxJava 详解 --扔物线
http://gank.io/post/560e15be2dca930e00da1083
网友评论