简言
参数传入多个CompletableSource,哪个CompletableObserver中的OnSubscribe执行的快就执行就执行哪个,而其他CompletableObserver都被忽略。
多个异步操作,你也不知道哪个会先执行完毕,你只想在某一个操作执行完毕后,执行一次某个方法,那么这个操作符就刚好适用。
例子
CompletableSource source1 = Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(CompletableEmitter emitter) throws Exception {
emitter.onComplete();
}
});
CompletableSource source2 = Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(CompletableEmitter emitter) throws Exception {
emitter.onComplete();
}
});
Completable.ambArray(source1,source2).subscribe(new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {
System.out.println(d.isDisposed());
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
@Override
public void onError(Throwable e) {
System.out.println(e.getMessage());
}
});
源码分析
public static Completable ambArray(final CompletableSource... sources) {
...
return RxJavaPlugins.onAssembly(new CompletableAmb(sources, null));
}
构建的是CompletableAmb实例,
public final class CompletableAmb extends Completable {
//这里的sources的来源就是上面的source1,source2组成的数组
private final CompletableSource[] sources;
// 这里可以使用Iterable传递
private final Iterable<? extends CompletableSource> sourcesIterable;
public CompletableAmb(CompletableSource[] sources, Iterable<? extends CompletableSource> sourcesIterable) {
this.sources = sources;
this.sourcesIterable = sourcesIterable;
}
@Override
public void subscribeActual(final CompletableObserver observer) {
// 开始构建CompletableSource
CompletableSource[] sources = this.sources;
// 开始默认数量0
int count = 0;
if (sources == null) {
// 默认数量8
sources = new CompletableSource[8];
try {
//遍历Iterable,将数据添加sources 里
for (CompletableSource element : sourcesIterable) {
if (element == null) {
EmptyDisposable.error(new NullPointerException("One of the sources is null"), observer);
return;
}
if (count == sources.length) {
CompletableSource[] b = new CompletableSource[count + (count >> 2)];
System.arraycopy(sources, 0, b, 0, count);
sources = b;
}
sources[count++] = element;
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
EmptyDisposable.error(e, observer);
return;
}
} else {
count = sources.length;
}
//这里的set控制sources
final CompositeDisposable set = new CompositeDisposable();
observer.onSubscribe(set);
final AtomicBoolean once = new AtomicBoolean();
CompletableObserver inner = new Amb(once, set, observer);
for (int i = 0; i < count; i++) {
CompletableSource c = sources[i];
if (set.isDisposed()) {
return;
}
if (c == null) {
NullPointerException npe = new NullPointerException("One of the sources is null");
if (once.compareAndSet(false, true)) {
//修改成功
set.dispose();
observer.onError(npe);
} else {
RxJavaPlugins.onError(npe);
}
return;
}
// 开始执行,就看谁先跑到最后
c.subscribe(inner);
}
if (count == 0) {
observer.onComplete();
}
}
...
}
总结上面方法,将ambArray里面的参数收集到sources数组,遍历数组,执行每一个
subscribe()方法
image.png
谁先执行到onComplete或者onError,就会忽略后面的
static final class Amb implements CompletableObserver {
private final AtomicBoolean once;
private final CompositeDisposable set;
private final CompletableObserver downstream;
Amb(AtomicBoolean once, CompositeDisposable set, CompletableObserver observer) {
this.once = once;
this.set = set;
this.downstream = observer;
}
@Override
public void onComplete() {
if (once.compareAndSet(false, true)) {
//只会执行一次,就会忽略后面的
set.dispose();
downstream.onComplete();
}
}
@Override
public void onError(Throwable e) {
if (once.compareAndSet(false, true)) {
set.dispose();
downstream.onError(e);
} else {
RxJavaPlugins.onError(e);
}
}
@Override
public void onSubscribe(Disposable d) {
set.add(d);
}
}
网友评论