简言
mergeArray合并所有事件,当所有事件onComplete后,才会调用CompletableObserver的onComplete()方法,中间只要onError,就会执行停止
例子
CompletableSource source1 = Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(CompletableEmitter emitter) throws Exception {
emitter.onComplete();
}
});
CompletableSource source2 = Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(CompletableEmitter emitter) throws Exception {
emitter.onError(new Exception());
}
});
Completable.mergeArray(source1,source2).subscribe(new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {
System.out.println(d.isDisposed());
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
@Override
public void onError(Throwable e) {
System.out.println("onError "+e.getMessage());
}
});
源码分析
public static Completable mergeArray(CompletableSource... sources) {
...
return RxJavaPlugins.onAssembly(new CompletableMergeArray(sources));
}
public final class CompletableMergeArray extends Completable {
final CompletableSource[] sources;
public CompletableMergeArray(CompletableSource[] sources) {
this.sources = sources;
}
@Override
public void subscribeActual(final CompletableObserver observer) {
final CompositeDisposable set = new CompositeDisposable();
final AtomicBoolean once = new AtomicBoolean();
// observer被传递到了shared 对象里
//observer 中方法是否被执行,就要看shared里怎么调用
InnerCompletableObserver shared = new InnerCompletableObserver(observer, once, set, sources.length + 1);
observer.onSubscribe(set);
//遍历sources
for (CompletableSource c : sources) {
if (set.isDisposed()) {
return;
}
if (c == null) {
set.dispose();
NullPointerException npe = new NullPointerException("A completable source is null");
shared.onError(npe);
return;
}
c.subscribe(shared);
}
//最后执行onComplete
shared.onComplete();
}
...
}
代码中的c.subscribe(shared) 给每一个CompletableSource进行订阅,shared是共享的
static final class InnerCompletableObserver extends AtomicInteger implements CompletableObserver {
private static final long serialVersionUID = -8360547806504310570L;
final CompletableObserver downstream;
final AtomicBoolean once;
final CompositeDisposable set;
InnerCompletableObserver(CompletableObserver actual, AtomicBoolean once, CompositeDisposable set, int n) {
this.downstream = actual;
this.once = once;
this.set = set;
this.lazySet(n);
}
@Override
public void onSubscribe(Disposable d) {
set.add(d);
}
@Override
public void onError(Throwable e) {
set.dispose();
if (once.compareAndSet(false, true)) {
downstream.onError(e);
} else {
RxJavaPlugins.onError(e);
}
}
@Override
public void onComplete() {
if (decrementAndGet() == 0) {
//等到所有事件都执行完onComplete后,才会执行
if (once.compareAndSet(false, true)) {
downstream.onComplete();
}
}
}
}
网友评论