前言
上一篇文章,对Rx进行了梳理,大致了解了什么是Rx,并对RxJava的基本流程进行了跟踪,如忘记其中关键点,及时回顾。
关于链式
从Rx的机制上来说,事件的持续进行运转于调用链之上,也就是说,调用链上的每一环节,承载了各自的任务,并对事件的最终完成或异常状态提供了对应出口。那分门别类的任务,是如何嵌入调用链中,并得以完成的呢? 此篇文章,将对此进行解析。
链式
回顾
在解析之前,先对基本流程进行回顾:
- 拿到Observable
- Observable.subscribe()与观察者签订
- subscribeActual()获得执行时机,执行具体逻辑,通知观察者已签订
- 数据或事件到来,观察者进行响应
案例
Observable<Integer> ob = Observable.fromArray(new Integer[]{1,2,3,4,5,6}); // 数据
ob.filter(new Predicate<Integer>() { // 过滤
@Override
public boolean test(Integer integer) throws Exception {
return integer % 2 == 0;
}
}).map(new Function<Integer, String>() { // 数据转换
@Override
public String apply(Integer integer) throws Exception {
return integer.toString();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) { // 响应
Log.d(TAG, "onNext: " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() { // 响应
Log.d(TAG, "onComplete: ");
}
});
从上游向下游发射了6个数据,过滤掉不满足过滤条件 integer % 2 == 0 的数据,并将过滤后的数据转换为String类型,下游最终拿到3个数据,进行了4次响应。
这一调用链中,进行了两个额外任务——过滤、转换,但却能保证最终到达下游,那,如何完成?
上游的构建
从基本流程看,第一步,拿到Observable。从每次调用的返回看,包括Observable.fromArray(),Observable.filter(),Observable.map()均会拿到Observable,但,Observable会有不同。分别进行跟踪,在当前情况下,拿到的Observable依次如下:
// Observable.filter()
public final Observable<T> filter(Predicate<? super T> predicate) {
ObjectHelper.requireNonNull(predicate, "predicate is null");
return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));
}
// Observable.map()
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
// Observable.fromArray()
public ObservableFromArray(T[] array) {
this.array = array;
}
Observable.filter()
public ObservableFilter(ObservableSource<T> source, Predicate<? super T> predicate) {
super(source);
this.predicate = predicate;
}
Observable.map()
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
除了fromArray()外,均接收一个ObservableSource参数,并返回一个Observable。而接收到的ObservableSource,就是调用当前接口的Observable , 显然,Observable被装饰了,并且是层层装饰。 注意到,在Rx提供的类似fromArray(),比如前文的Observable.create()的一系列方法,接收参数并不遵守这一规则。原因是,这一系列接口所提供的Observable所处位置比较特别,这类Observable处于事件的的发源地,作为接收数据的源头,而数据的来源方式,千差万别,因此这类Observable的构造方式也大不相同。
这里需插播一个信息,即Observable.map()与Observable.filter()这一类操作所拿到的Observable被标示为AbstractObservableWithUpstream,取map()来看,如下
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U>
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T>
AbstractObservableWithUpstream的作用在于,指明此类Observable,将会对所包装的Observable进行额外处理。 其实,Observer也会有相应的行为,下面再谈。
回到当前,说到经过一系列的调用之后,形成了一条调用链,而拿到的Observable是被层层包装的,因此,实际拿到的是一条包装链,如下图:
上游链条.jpg
案例中,到Observable.map()为止,上游链条建立完毕,由外至内为ObservableMap -> ObservableFilter -> ObservableFromArray。紧接着,与观察者签订。
下游的构建
与上游对应的,其实在签订之后,下游也会形成链条的。
再次回忆之前所说的基本流程:
- 拿到Observable
- Observable.subscribe()与观察者签订
- subscribeActual()获得执行时机,执行具体逻辑,通知观察者已签订
- 数据或事件到来,观察者进行响应
此刻,拿到了最外层为ObservableMap的上游链条(最后调用为Observable.map()),因此,实际与案例里定义的观察者签订的Observable为ObservableMap。在第一篇讲解中说过,subscribe()会将Observer交给subscribeActual(),由subscribeActual()来执行具体的逻辑。
// map()拿到的ObservableMap
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
// filter()拿到的ObservableFilter
@Override
public void subscribeActual(Observer<? super T> s) {
source.subscribe(new FilterObserver<T>(s, predicate));
}
// fromArray()拿到的ObservableFromArray
@Override
public void subscribeActual(Observer<? super T> s) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
s.onSubscribe(d);
if (d.fusionMode) {
return;
}
d.run();
}
可以看到,从最外层的Observable开始,不断地执行subscribe() -> subscribeActual() -> subscribe() -> subscribeActual() ... 直到达到最内层Observable的 subscribeActual()。 期间通过subscribe()交接的Observer也经历了对应的包装变化。Observer的包装次序为 自定义的Observer <- MapObserver <- FilterObserver 。 除最内层的Observer外,其他的Observer均以另一个Observer为构造参数,以便装饰,可通过跟踪任意Observer源码看出,此不贴出。
当前,下游也形成了一条调用链。上下游的调用链如图:
运行时调用链.jpg
在到达最内层的Observable之后,Observer链构建完毕,并且最内的Observable将Observer链的最外层以Disposable形式交给API的使用者, 以上执行此承操作的代码为
s.onSubscribe(d)。
以上上下游链条的构建过程需要仔细琢磨,很重要。
事件的处理
到此处为止,RxJava的调用链式构造完毕了的。那么,为什么要这样做呢?
一句话回答:“处理差异性需求。”
在Rx中,创建、过滤、变换、结合、辅助操作等,提供的以操作符形式的让得以以链条形式完成一系列工作的行为,均是为了处理差异性需求,在本文案例中表现为fromArray(),filter(),map()。而对于种种的差异性需求来说,先以上游或下游为大环境,再以其中某一节点为具体场景进行处理。因此需要分别在上下游分别建立相应的调用链,并构建出场景明确的链条节点,以此达到Rx完成需求的目的。
在有了双链并确定了节点场景之后,处理差异性需求就简单了,仅需在当前场景处理响应事件之后,让事件继续流动向下一节点皆可。如图
节点需求.jpg
当事件或数据来到当前节点时,一般会遵循如图顺序流动。
回到案例。当前情况,已在上下游构建好相应的调用链,而在最内的Observable里,发射了数据
// ObservableFromArray
@Override
public void subscribeActual(Observer<? super T> s) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
s.onSubscribe(d);
if (d.fusionMode) {
return;
}
// 发射数据
d.run();
}
// FromArrayDisposable
void run() {
T[] a = array; // 要发射的数据
int n = a.length;
for (int i = 0; i < n && !isDisposed(); i++) {
T value = a[I];
if (value == null) {
actual.onError(new NullPointerException("The " + i + "th element is null"));
return;
}
// actual为最外的Observer,为FilterObserver
actual.onNext(value);
}
if (!isDisposed()) {
actual.onComplete();
}
}
在数据开始推送后,下游链的每个节点,依次接收到数据事件的到来,并进行相应的处理。根据之前的分析,当前案例的下游链为自定义的FilterObserver -> MapObserver -> 自定义的Observer 。 对应代码如下
// FilterObserver
@Override
public void onNext(T t) {
if (sourceMode == NONE) {
......
if (b) {
// 将满足过滤条件的数据发送下一节点,为MapObserver
actual.onNext(t);
}
} else {
actual.onNext(null);
}
// MapObserver
@Override
public void onNext(T t) {
......
try {
// 将数据t进行转换
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
// 将数据v发送给下一节点,为自定义的Observer
actual.onNext(v);
}
当事件到达调用链的某一节点,此节点将进行对应的处理,并根据需求决定事件应如何收发,而在最终的节点里,将会收到经历了所有变迁的事件。
注意到,在Observable.fromArray()、Observable.filter()、Observable.map()等操作符里,需要携带对应的参数,如Function、Predicate等。此类参数的作用为,携带链上节点所需信息,协助构建节点或协助处理事件,但不为本文重点,故不提及。
总结
Rx中,为了在链式表达中完整一系列事件需求,分别在上下游构建了对应的调用链。
- 上游 : 对于上游来说,链的形成通过相应操作符的操作构建
- 下游 : 对于下游来说,链通过Observable.subscribe()与Observable.subscribeActual()构建。
事件到来时,节点将会进行响应,并根据自身特点在合适的时机将事件推送给下一节点。通过上下游调用链的运行机制,能让Rx得以以链条不断裂的前提下,完成复杂的事件交互,将事件的流程主干铺陈出来,剥离异常与分支流程。而Rx可插拔的运作方式也体现在节点的删减中。
顺带提一句,节点拥有处理事件然后推送给下一节点的过程,而其中经历的时间与具体的处理是变化无穷,因此,节点是多维度的,仔细体会,自行想象。
网友评论