在上一篇响应式编程开源库 RxJava2——Stream API中主要介绍了Java 8的Stream API,理解了什么是流,以及为什么要用流。它的实现基本上运用了前面part1、part2中学习过的重要概念。我们准备了这么久,下面将真正的进入RxJava的学习。
在前面已经介绍过RxJava的官方定义,在Java VM上使用可观察序列(即ReactiveX中提到的可观测流)编写异步和基于事件的程序的库。前面我们也对RxJava的基本使用进行了简单解析。RxJava可以使用Observable把某个对象转变为一个可观测的序列(可观测流),对其进行相应操作后作出相应的响应。
下面就从操作符入手,来学习使用RxJava。
操作符
Observable类里提供了很多操作符(intermediate operators)我们从里面不难发现有的操作符在前面介绍过的Stream API中也是有的。所以对于编程来说,用万变不离其宗,殊途同归来说的话一点不夸张。
1.create()
在前面part1我们已经初步学习过该操作符。它可以把某对象转化为可观测的序列,简单来说就是把某个对象转变为被观察者,并具有流的特性。
2.just()单个参数
源码中共有十个just重载方法。他们能接收不同数量的参数。目前最大支持10个参数。
可以从下面这种图看出他的含义,在前面已经说明了这种图所代表的意义。它能将多个参数转变到同一个可观测源中并且一次性发送。
@SuppressLint("CheckResult")
public static void main(String[] args){
Observable.just("s",2,true)
.subscribe(s->System.out.println(s));
}
上段代码运行结果如下。
还是按照惯例从源码再一次进行分析。先看看
just(T item)
只有一个参数的方法。
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, "The item is null");
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}
其中onAssembly
方法返回的就是ObservableJust
对象。那么just
方法实际上返回的对象就是ObservableJust
对象。从源码中看到它是Observable
的子类,public final class ObservableJust<T> extends Observable<T>
。
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
下面我们看一看在ObservableJust
中到底进行了什么操作。
/**
* Represents a constant scalar value.
* @param <T> the value type
*/
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
private final T value;
public ObservableJust(final T value) {
this.value = value;
}
@Override
protected void subscribeActual(Observer<? super T> s) {
ObservableScalarXMap.ScalarDisposable<T> sd = new ObservableScalarXMap.ScalarDisposable<T>(s, value);
s.onSubscribe(sd);
sd.run();
}
@Override
public T call() {
return value;
}
}
构造方法中对传入的数据进行了从新赋值,这就使得我们控制了对象的不可变性,我们进行的 Intermediate中间操作只是对 ObservableJust
类中的变量value进行操作,并不会影响传入的参数值。从下图 可以看出,ObservableJust中subscribeActual(Observer<? super T> s)
方法是基类 Observable
中的抽象方法实现。Observable在重写了ObservableSource接口中的方法subscribe(Observer<? super T> observer)时调用subscribeActual(observer);在ObservableJust中我们具体实现了subscribeActual(Observer<? super T> s)方法。s.onSubscribe(sd);
给了观察者一个中断器 ScalarDisposable
sd.run();
方法将调用Observer接口中的onNext方法回调数据。
@Override
public void run() {
if (get() == START && compareAndSet(START, ON_NEXT)) {
observer.onNext(value);
if (get() == ON_NEXT) {
lazySet(ON_COMPLETE);
observer.onComplete();
}
}
}
我们再来梳理一次Just()操作符的整个代码内部调用过程,Just(T item)会将数据赋值给ObservableJust类的value变量,严格控制了源数据的不可变性,之后调用Observable中的 subscribe(Observer<? super T> observer)方法将被观察者和观察者建立订阅关系(RxJavaPlugins.onSubscribe(this, observer) ),之后通过ObservableJust中实现父类的抽象方法,void subscribeActual(Observer<? super T> observer)来一次性将数据发送。
-
Just()多个参数
当Just()方法有多个参数时,其内部和单个参数是有一定区别的。从以下源码就可以看出,当Just有多个参数时,它实际上是利用了fromArray
操作符。
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> just(T item1, T item2) {
ObjectHelper.requireNonNull(item1, "The first item is null");
ObjectHelper.requireNonNull(item2, "The second item is null");
return fromArray(item1, item2);
}
那我们就来研究下fromArray
操作符内部具体是怎么实现的。
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> fromArray(T... items) {
ObjectHelper.requireNonNull(items, "items is null");
if (items.length == 0) {
return empty();
} else
if (items.length == 1) {
return just(items[0]);
}
return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}
可以看出,当只有一个参数时,实际上还是运用的Just(T item)操作符。当多个参数时会将数据丢给 ObservableFromArray
类。
final T[] array;
public ObservableFromArray(T[] array) {
this.array = array;
}
@Override
public void subscribeActual(Observer<? super T> s) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
s.onSubscribe(d);
if (d.fusionMode) {
return;
}
d.run();
}
可以看到任然是重新赋值,保证源数据的不可变性。之后通过d.run();
方法发送数据。
void run() {
T[] a = array;
int n = a.length;
for (int i = 0; i < n && !isDisposed(); i++) {
T value = a[i];
if (value == null) {
actual.onError(new NullPointerException("The " + i + "th element is null"));
return;
}
actual.onNext(value);
}
if (!isDisposed()) {
actual.onComplete();
}
}
可以看到这里使用了for循环遍历,将元素一个个发送。通过上面的分析不难发现之前提到过的,当observable被订阅后才会开始发送数据。因为只有subscribe后才会执行Observable的各个子类中的方法subscribeActual(Observer<? super T> s)从而执行各子类的run方法进行数据的发送。并且可以发现,和create方式并不相同的是这里并不需要ObservableEmitter
数据发射器的协助。这里的发送数据操作实际上就是回调。
3.以from开头的操作符
-
fromArray(T... items)
可以将一组数据转为可观测流,并且逐个发送参数中的每个元素。在上面的Just
学习中,我们已经大概了解了fromArray
这个操作符。
fromArray与just的主要区别主要取决于他们的参数个数。
- 单个参数 相当于都是Just,将参数的映射value一次发送。
- 多个参数 相当于fromArray 将参数的映射value遍历后发送。
-
fromIterable(Iterable<? extends T> source)
可以看到它只接收继承Iterable接口的数据,而在Java中集合类基本都继承了该接口。所以这里只接收集合参数。可以看到大致过程和前面学习的两个操作符差不多,只是内部遍历用上了集合特有的迭代器遍历。它会遍历集合,并将每个元素发送。
void run() {
boolean hasNext;
do {
if (isDisposed()) {
return;
}
T v;
try {
v = ObjectHelper.requireNonNull(it.next(), "The iterator returned a null value");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
actual.onError(e);
return;
}
actual.onNext(v);
if (isDisposed()) {
return;
}
try {
hasNext = it.hasNext();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
actual.onError(e);
return;
}
} while (hasNext);
if (!isDisposed()) {
actual.onComplete();
}
}
-
fromCallable(Callable<? extends T> supplier)
它将会发送Callable返回的参数。
-
fromFuture(Future<? extends T> future)
它将会发送Future中get函数的返回值。其中timeout参数是调用get()方法之前最多等待时间。
-
fromPublisher(Publisher<? extends T> publisher)
通过官方介绍,可以看出该操作符并不推荐,而是希望尽可能用create代替。所以这里不过多学习。
If possible, use
create(ObservableOnSubscribe)
to create a source-likeObservable
instead.
4.range()
它会发送指定区间的int数,每次发送比前一次数据大1,相当于for循环。
5.interval()
从源码可以看出该操作符的作用是每隔一段时间发送一个从0开始的Long型数据,可以说相当于一个定时器。在java中相当于Timer
和TimerTask
的应用。
这里每次发送的是long型的count,发送一次后count+1。
6.timer()
延迟指定时间后发送数据0。其实也就是延时操作,相当于handler的postDelay。
7.empty()
不会发送任何数据,并且立即调用onComplete
终止操作。这在前面很多操作符里都有调用。比如当数组为空就会首先调用empty终止操作。
8.map()
首先看看map()
在ReactiveX中的解释。
transform the items emitted by an Observable by applying a function to each item
应用一个函数对每个发出的元素进行转换。
所以对每个发出的元素应用指定函数,并发送函数的返回值。从下图也可以看出,它可以进行数据的转换,比如将int转为String。map()接收一个函数式接口Function
作为参数,而在前面介绍过它能接收一个参数,返回另一个参数。所以决定它有转换功能。
我们在结合之前的
just()
或者说fromArray()
来看看map
是怎么进行数据的转换的。
Observable.just(1,2,3,4).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "map"+integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
log(s);
}
});
}
Observable.just(1,2,3,4)
我们已经清楚,当被观察者订阅后,就会遍历依次发送每个元素。发送出去后被 map
接收。那么map
是怎么接收的呢?还是从源码入手。
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
它最终返回的是ObservableMap
对象,注意看它的参数。它接收了当前的Observable对象,和Function函数。我们在来看ObservableMap内部。从下面看出ObservableMap的继承关系。
ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U>
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U>
在ObservableMap中,注意构造方法的super(source);
在父类AbstractObservableWithUpstream
中通过它的构造方法将会获得Observable.just(1,2,3,4)
创建的上游可观测源,当订阅后会调用subscribeActual
。新的可观测源的映射source
会和MapObserver
建立新的订阅关系。
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
所以这里just
发送的数据,会被 MapObserver
再次订阅而且只一次,这样数据就完美的流向了map
操作符。在MapObserver
中实现了基类Observer
的onNext
方法。这时候just通过遍历发送的数据就会一次被下面的onNext
接收到。当接收到数据,就会执行mapper.apply(t)
将数据转化,最后发送转化后的数据v
。
通过上面的分析就可以印证之前的学习,每一次的中间操作都会有一个新的数据源映射,并且其中很多构造方法完美的解决了对象的不可变性。当订阅后,所有的操作都是一次性完成,减少了时间复杂度。
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
9.flatMap()
transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable
可以将可观测数据源发送的数据转化成多个可观测源,最后把他们在融合大一个可观测数据源中发送。但是不保证转化后数据的顺序。
Observable.just(1,2,3,4).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
return Observable.just("flatmap"+integer);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
log(s);
}
});
结合之前的just和map的学习,从代码很容易看出just依次发送元素,flatMap利用Function接口将每个元素转化为新的数据源,然后合并到一个可观测源中发送。那么是怎么合并发送的呢?这里看下源码。flatMap最终的可观测源是ObservableFlatMap
在subscribeActual
方法中进行了数据源的映射重新订阅source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
重新订阅后在onNext
中会将发送的数据转化成新的数据源。
@Override
public void onNext(T t) {
// safeguard against misbehaving sources
if (done) {
return;
}
ObservableSource<? extends U> p;
try {
//将发送的数据转化为新的可观测源
p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
s.dispose();
onError(e);
return;
}
if (maxConcurrency != Integer.MAX_VALUE) {
synchronized (this) {
if (wip == maxConcurrency) {
sources.offer(p);
return;
}
wip++;
}
}
subscribeInner(p);
}
最后执行subscribeInner(p);
一般我们的可观测源不会是Callable类型,所以最终到了else里面,在这里面就可以看出每次发送来的数据转化为新的数据源 p
后,每次会创建一个InnerObserver
(注意它的构造方法参数,是将MergeObserver
对象传入的)。通过addInner
方法将多个InnerObserver
添加到AtomicReference<InnerObserver<?, ?>[]> observers;
中。每次转化的数据源p都有一个对应InnerObserver。我们这里的p实际就是Observable.just("flatmap"+integer)
所产生的可观测源,对它调用p.subscribe(inner);
就会把当前可观测源中的数据让对应的InnerObserver的onNext接收。这时候就会通过MergeObserver的tryEmit
方法给下游下发数据。
void subscribeInner(ObservableSource<? extends U> p) {
for (;;) {
if (p instanceof Callable) {
if (tryEmitScalar(((Callable<? extends U>)p)) && maxConcurrency != Integer.MAX_VALUE) {
boolean empty = false;
synchronized (this) {
p = sources.poll();
if (p == null) {
wip--;
empty = true;
}
}
if (empty) {
drain();
break;
}
} else {
break;
}
} else {
InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
//每次转化的数据源p都有一个对应InnerObserver
if (addInner(inner)) {
p.subscribe(inner);
}
break;
}
}
}
boolean addInner(InnerObserver<T, U> inner) {
for (;;) {
InnerObserver<?, ?>[] a = observers.get();
if (a == CANCELLED) {
inner.dispose();
return false;
}
int n = a.length;
InnerObserver<?, ?>[] b = new InnerObserver[n + 1];
System.arraycopy(a, 0, b, 0, n);
b[n] = inner;
if (observers.compareAndSet(a, b)) {
return true;
}
}
}
最后我们在看InnerObserver的onNext方法和tryEmit方法,最终通过MergeObserver
单一可观测源调用tryEmit方法,通过最终观察者的回调来发送数据。MergeObserver继承了AtomicInteger,所以这里的tryEmit方法就利用了AtomicInteger的同步机制。所以同时只会有一个value被最终观察者actual发送,由于AtomicInteger CAS锁只能保证操作的原子性,并不保证锁的获取顺序,是抢占式的,所以最终数据的发射顺序并不是固定的。
if (fusionMode == QueueDisposable.NONE) {
parent.tryEmit(t, this);
} else {
parent.drain();
}
void tryEmit(U value, InnerObserver<T, U> inner) {
if (get() == 0 && compareAndSet(0, 1)) {
actual.onNext(value);
if (decrementAndGet() == 0) {
return;
}
} else {
SimpleQueue<U> q = inner.queue;
if (q == null) {
q = new SpscLinkedArrayQueue<U>(bufferSize);
inner.queue = q;
}
q.offer(value);
if (getAndIncrement() != 0) {
return;
}
}
drainLoop();
}
大概的过程就是,通过MergeObserver
来将发送的每个元素转化为可观测源,每个新的可观察源p中的数据都会通过p.subscribe(inner);
下发到对应的 InnerObserver
。然后通过MergeObserver中的方法下发数据到最终订阅者。
网友评论