美文网首页
Rxjava2 Completable第一节mergeArray

Rxjava2 Completable第一节mergeArray

作者: CODERLIHAO | 来源:发表于2019-07-10 13:38 被阅读0次

简言

mergeArray合并所有事件,当所有事件onComplete后,才会调用CompletableObserver的onComplete()方法,中间只要onError,就会执行停止

例子

CompletableSource source1 = Completable.create(new CompletableOnSubscribe() {
            @Override
            public void subscribe(CompletableEmitter emitter) throws Exception {
                emitter.onComplete();
            }
        });

        CompletableSource source2 = Completable.create(new CompletableOnSubscribe() {
            @Override
            public void subscribe(CompletableEmitter emitter) throws Exception {
                emitter.onError(new Exception());
            }
        });
        Completable.mergeArray(source1,source2).subscribe(new CompletableObserver() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println(d.isDisposed());
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError "+e.getMessage());
            }
        });

源码分析

    public static Completable mergeArray(CompletableSource... sources) {
        ...
        return RxJavaPlugins.onAssembly(new CompletableMergeArray(sources));
    }

public final class CompletableMergeArray extends Completable {
    final CompletableSource[] sources;

    public CompletableMergeArray(CompletableSource[] sources) {
        this.sources = sources;
    }

    @Override
    public void subscribeActual(final CompletableObserver observer) {
        final CompositeDisposable set = new CompositeDisposable();
        final AtomicBoolean once = new AtomicBoolean();

       
        // observer被传递到了shared 对象里
        //observer 中方法是否被执行,就要看shared里怎么调用

        InnerCompletableObserver shared = new InnerCompletableObserver(observer, once, set, sources.length + 1);
        observer.onSubscribe(set);

        //遍历sources
        for (CompletableSource c : sources) {
            if (set.isDisposed()) {
                return;
            }

            if (c == null) {
                set.dispose();
                NullPointerException npe = new NullPointerException("A completable source is null");
                shared.onError(npe);
                return;
            }

            c.subscribe(shared);
        }
        //最后执行onComplete
        shared.onComplete();
    }

...
}

代码中的c.subscribe(shared) 给每一个CompletableSource进行订阅,shared是共享的

static final class InnerCompletableObserver extends AtomicInteger implements CompletableObserver {
        private static final long serialVersionUID = -8360547806504310570L;

        final CompletableObserver downstream;

        final AtomicBoolean once;

        final CompositeDisposable set;

        InnerCompletableObserver(CompletableObserver actual, AtomicBoolean once, CompositeDisposable set, int n) {
            this.downstream = actual;
            this.once = once;
            this.set = set;
            this.lazySet(n);
        }

        @Override
        public void onSubscribe(Disposable d) {
            set.add(d);
        }

        @Override
        public void onError(Throwable e) {
            set.dispose();
            if (once.compareAndSet(false, true)) {
                downstream.onError(e);
            } else {
                RxJavaPlugins.onError(e);
            }
        }

        @Override
        public void onComplete() {
            if (decrementAndGet() == 0) {
                //等到所有事件都执行完onComplete后,才会执行
                if (once.compareAndSet(false, true)) {
                    downstream.onComplete();
                }
            }
        }
    }

相关文章

网友评论

      本文标题:Rxjava2 Completable第一节mergeArray

      本文链接:https://www.haomeiwen.com/subject/bjthkctx.html