所有代码的演示都在RxJava2.2.4版本上进行的
当你传递多个Observable给amb操作符时,该操作符只发射其中一个Observable的数据和通知:首先发送通知给amb操作符的的那个Observable,不管发射的是一项数据还是一个onError或onCompleted通知,amb将忽略和丢弃其它所有Observables的发射物
amb.png
1.example
Observable<Integer> observable1 = Observable.timer(4, TimeUnit.SECONDS)
.flatMap(__ -> Observable.just(1, 2, 3, 4, 5));
Observable<Integer> observable2 = Observable.timer(3, TimeUnit.SECONDS)
.flatMap(__ -> Observable.just(6, 7, 8, 9, 10));
Observable<Integer> observable3 = Observable.timer(2, TimeUnit.SECONDS)
.flatMap(__ -> Observable.just(11, 12, 13, 14, 15));
Observable.ambArray(observable1, observable2, observable3)
.subscribe(next -> Logger.getGlobal().info("OnNext: next: " + next));
2.源码分析
先看看subscribe方法做了什么
public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
很简单,构造一个LambdaObserver,注意代码里的 subscribe(ls);
public final void subscribe(Observer<? super T> observer) {
...省略部分代码
observer = RxJavaPlugins.onSubscribe(this, observer);
subscribeActual(observer);
...省略部分代码
}
subscribeActual是一个抽象方法,实现由子类负责
再来看看ambArray的执行
public static <T> Observable<T> ambArray(ObservableSource<? extends T>... sources) {
ObjectHelper.requireNonNull(sources, "sources is null");
int len = sources.length;
if (len == 0) {
return empty();
}
if (len == 1) {
return (Observable<T>)wrap(sources[0]);
}
return RxJavaPlugins.onAssembly(new ObservableAmb<T>(sources, null));
}
重点返回一个ObservableAmb,也就是subscribe是由该类的实例执行的。上面说执行subscribe就会执行subscribeActual方法,ObservableAmb类也会重写了该方法
public void subscribeActual(Observer<? super T> observer) {
ObservableSource<? extends T>[] sources = this.sources;
int count = 0;
if (sources == null) {
sources = new Observable[8];
try {
for (ObservableSource<? extends T> p : sourcesIterable) {
if (p == null) {
EmptyDisposable.error(new NullPointerException("One of the sources is null"), observer);
return;
}
if (count == sources.length) {
ObservableSource<? extends T>[] b = new ObservableSource[count + (count >> 2)];
System.arraycopy(sources, 0, b, 0, count);
sources = b;
}
sources[count++] = p;
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
EmptyDisposable.error(e, observer);
return;
}
} else {
count = sources.length;
}
if (count == 0) {
EmptyDisposable.complete(observer);
return;
} else
if (count == 1) {
sources[0].subscribe(observer);
return;
}
AmbCoordinator<T> ac = new AmbCoordinator<T>(observer, count);
ac.subscribe(sources);
}
方法里面的sources是谁?就是一开始的observable1 ,observable2,observable3
方法里面的observer是谁?就是subscribe方法产生的LambdaObserver
看看AmbCoordinator类中subscribe方法
public void subscribe(ObservableSource<? extends T>[] sources) {
AmbInnerObserver<T>[] as = observers;
int len = as.length;
for (int i = 0; i < len; i++) {
as[i] = new AmbInnerObserver<T>(this, i + 1, downstream);
}
winner.lazySet(0); // release the contents of 'as'
downstream.onSubscribe(this);
for (int i = 0; i < len; i++) {
if (winner.get() != 0) {
return;
}
sources[i].subscribe(as[i]);
}
}
downstream是通过AmbCoordinator构造方法传递进来的,也就是ObservableAmb实例
构造了一个AmbInnerObserver数组,在AmbInnerObserver内部,数组的大小与
AmbCoordinator<T> ac = new AmbCoordinator<T>(observer, count);
中的count是一致的,在AmbInnerObserver内部 ,parent就是AmbCoordinator实例
在AmbInnerObserver内部还关联着AmbInnerObserver的序号。
所有AmbInnerObserver数组中的对象共享downstream
在AmbCoordinator类中,还有一个win方法
public boolean win(int index) {
int w = winner.get();
if (w == 0) {
//如果原子变量中的值是0,就会设置原子变量为index,也就是上面序号
if (winner.compareAndSet(0, index)) {
//胜利者拿到后,其他的observer都将失败,不会调用next,onError,onComplete
AmbInnerObserver<T>[] a = observers;
int n = a.length;
for (int i = 0; i < n; i++) {
if (i + 1 != index) {
a[i].dispose();
}
}
return true;
}
return false;
}
//已经有胜利者,直接拿出胜利者
return w == index;
}
可以看出amb操作符,谁先调用onNext谁就是胜利者,失败者死的悄无声息!!!
网友评论