注意
文章中说的被观察者和观察者可以看------RxJava的基本执行流程。比如Observable.create传入的参数是被观察者,而subscribe传入的参数是观察者,因为前者是事件的发射地,而后者是接收事件的地方,事件发射地的变化,我们都能第一时间得知,就好像我们在观察前者一样。
Compose
通过对其应用特定操作。比如在网络请求中,需要在观察者接收通知前对返回的数据做一定的处理后,再通知观察者。
FlatMap
FlatMap将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable。
FlatMap功能由
ObservableFlatMap
ObservableMapNotification
ObservableInternalHelper
FlatMapWithCombinerOuter
ObservableFlatMapCompletableCompletable
ObservableFlattenIterable
FlatMapIntoIterable
ObservableFlatMapMaybe
ObservableFlatMapSingle
实现
ObservableFlatMap
首先调用ObservableFlatMap的subscribeActual方法,并观察者实例。
public void subscribeActual(Observer<? super U> t) {
if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)){return;}
source.subscribe(new MergeObserver<>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
tryScalarXMapSubscribe方法
如果被观察者(比如Observable.create传入的参数)是Supplier类型,返回true。
直接调用Supplier的get方法获取事件数据,然后调用FlatMap传入的数据转换包装实例的apply方法,得到一个包装后,类型为ObservableSource的c。
如果得到的数据包装实例也是Supplier类型,者直接调用Supplier的get方法获取新的数据,新数据可以被转换,也可以没有转换。然后调用观察者的onSubscribe方法,紧接着调用观察者的onNext,onComplete等方法。
如果得到的数据包装实例不是Supplier类型,这直接调用数据包装实例的subscribe方法,并把观察者实例传过去。这样,可以由数据包装实例对数据转换后自己给观察者发出通知。
source.subscribe(new MergeObserver<>(t, mapper, delayErrors, maxConcurrency, bufferSize))
调用被观察者subscribe方法,传入MergeObserver实例,看上去是不是像是又创建了一个新的被观察者和观察者关系。在观察者subscribe里实现事件发射逻辑,在MergeObserver接收事件。下面重点看一下onNext方法。
-
onNext
首先获取数据包装实例,如果数据个数大于最大允许的个数,将不在往队列里面插入,直接返回。 -
subscribeInner
如果数据包装实例是Supplier类型,者从队列取出数据包装实例,调用Supplier的get方法获取新的转换数据,然后通知观察者。如果不是Supplier类型,创建InnerObserver实例,传入上面创建的MergeObserver实例,添加到,然后调用数据包装实例的subscribe方法,就好像数据包装实例做被观察者,InnerObserver作为观察者。发射的数据会被保存在InnerObserver中的队列queue中,然后一一发射给观察者。queue是线程可见的队列,这样可以运用到多线程中。
RxJava很多操作符都对多线程进行了支持和处理,如果想在多线程中使用,可以自己查看一下源码。也不是很难。
ObservableMapNotification
这个类大概就是通过传入的数据包装实例对数据做转换,然后通知观察者,具体逻辑可以自己查看源码,源码很少很简单。
FlatMapWithCombinerOuter
简单看了一下代码,大概功能是,将得到的两个包装数据实例进行合并,前一个包装数据实例会被传入到第二包装实例里面,然后返回最终的包装数据。
当有两个数据需要合并成一个新的数据实例时,可以用这个。
ObservableFlatMapCompletableCompletable
将值序列映射到CompletableSources中,并等待其终止。代码不多,自己查看。
ObservableFlattenIterable
将序列映射为Iterable并发出其值。然后遍历迭代器发射数据。
flatMapIntoIterable
功能和ObservableFlattenIterable差不多,只是被观察者本身就是一个迭代器。
ObservableFlatMapMaybe
将上游值映射到MaybeSources中,并将其信号合并为一个序列。
ObservableFlatMapSingle
将上游值映射到SingleSources并将其信号合并为一个序列。
GroupBy
将一个Observable分拆为一些Observables集合,它们中的每一个发射原始Observable的一个子序列,GroupBy操作符将原始Observable分拆为一些Observables集合,它们中的每一个发射原始Observable数据序列的一个子序列。哪个数据项由哪一个Observable发射是由一个函数判定的,这个函数给每一项指定一个Key,Key相同的数据会被同一个Observable发射。
只有ObservableGroupBy实现类。在ObservableGroupBy维护着 groups(Map<Object, GroupedUnicast<K, V>>)这样一个数组,当发射数据是,首先获取key,然后从groups获取key对应的GroupedUnicast实例,GroupedUnicast中有 State<T, K> state实例,而state维护着queue(SpscLinkedArrayQueue)数组来存放发射的数据。
group.onNext(v);
if (newGroup) {
downstream.onNext(group);
if (group.state.tryAbandon()) {
cancel(key);
group.onComplete();
}
}
如果不是新创建的GroupedUnicast实例,那么可以推断直接已经发射过,所以在GroupedUnicast中已经存在观察者实例。如果是新创建的,执行downstream.onNext(group)。进入后面的流程。
GroupJoin
由ObservableGroupBy实现。将不同数据来源的数据进行合并,合并规则自己查看源码。
Map
由ObservableMap实现。对Observable发射的每一项数据应用一个函数,执行变换操作。就相当于FlatMap中的获取包装数据,然后直接通知观察者。源码很简单,自己查看。
Scan
实现类有
ObservableScan
ObservableScanSeed
连续地对数据序列的每一项应用一个函数,然后连续发射结果。Scan操作符对原始Observable发射的第一项数据应用一个函数,然后将那个函数的结果作为自己的第一项数据发射。它将函数的结果同第二项数据一起填充给这个函数来产生它自己的第二项数据。它持续进行这个过程来产生剩余的数据序列。这个操作符在某些情况下被叫做accumulator。
源码很简单,自己查看。
Window
这个就带大家分析源码了,有兴趣的可以自己查看,如果只是想知道用法,请参考Window
总结
其实网上有很多对RxJava操作符的讲解,还有对官网文档的翻译,我觉得,要想对RxJava用好,还是要自己去查看源码。其实RxJava虽然很多,但是每个操作符都有独立的实现,分析源码其实也不难。只要你了解了被观察者和观察者的调用关系,很多操作符只不过是在中间有添加了被观察者和观察者过程。相当于嵌套。如果你了解这个执行过程,你可以很快找到关键的执行代码。
网友评论