美文网首页
Rxjava2 Completable第一节mergeArray

Rxjava2 Completable第一节mergeArray

作者: CODERLIHAO | 来源:发表于2019-07-10 13:38 被阅读0次

    简言

    mergeArray合并所有事件,当所有事件onComplete后,才会调用CompletableObserver的onComplete()方法,中间只要onError,就会执行停止

    例子

    CompletableSource source1 = Completable.create(new CompletableOnSubscribe() {
                @Override
                public void subscribe(CompletableEmitter emitter) throws Exception {
                    emitter.onComplete();
                }
            });
    
            CompletableSource source2 = Completable.create(new CompletableOnSubscribe() {
                @Override
                public void subscribe(CompletableEmitter emitter) throws Exception {
                    emitter.onError(new Exception());
                }
            });
            Completable.mergeArray(source1,source2).subscribe(new CompletableObserver() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println(d.isDisposed());
                }
    
                @Override
                public void onComplete() {
                    System.out.println("onComplete");
                }
    
                @Override
                public void onError(Throwable e) {
                    System.out.println("onError "+e.getMessage());
                }
            });
    

    源码分析

        public static Completable mergeArray(CompletableSource... sources) {
            ...
            return RxJavaPlugins.onAssembly(new CompletableMergeArray(sources));
        }
    
    
    public final class CompletableMergeArray extends Completable {
        final CompletableSource[] sources;
    
        public CompletableMergeArray(CompletableSource[] sources) {
            this.sources = sources;
        }
    
        @Override
        public void subscribeActual(final CompletableObserver observer) {
            final CompositeDisposable set = new CompositeDisposable();
            final AtomicBoolean once = new AtomicBoolean();
    
           
            // observer被传递到了shared 对象里
            //observer 中方法是否被执行,就要看shared里怎么调用
    
            InnerCompletableObserver shared = new InnerCompletableObserver(observer, once, set, sources.length + 1);
            observer.onSubscribe(set);
    
            //遍历sources
            for (CompletableSource c : sources) {
                if (set.isDisposed()) {
                    return;
                }
    
                if (c == null) {
                    set.dispose();
                    NullPointerException npe = new NullPointerException("A completable source is null");
                    shared.onError(npe);
                    return;
                }
    
                c.subscribe(shared);
            }
            //最后执行onComplete
            shared.onComplete();
        }
    
    ...
    }
    

    代码中的c.subscribe(shared) 给每一个CompletableSource进行订阅,shared是共享的

    static final class InnerCompletableObserver extends AtomicInteger implements CompletableObserver {
            private static final long serialVersionUID = -8360547806504310570L;
    
            final CompletableObserver downstream;
    
            final AtomicBoolean once;
    
            final CompositeDisposable set;
    
            InnerCompletableObserver(CompletableObserver actual, AtomicBoolean once, CompositeDisposable set, int n) {
                this.downstream = actual;
                this.once = once;
                this.set = set;
                this.lazySet(n);
            }
    
            @Override
            public void onSubscribe(Disposable d) {
                set.add(d);
            }
    
            @Override
            public void onError(Throwable e) {
                set.dispose();
                if (once.compareAndSet(false, true)) {
                    downstream.onError(e);
                } else {
                    RxJavaPlugins.onError(e);
                }
            }
    
            @Override
            public void onComplete() {
                if (decrementAndGet() == 0) {
                    //等到所有事件都执行完onComplete后,才会执行
                    if (once.compareAndSet(false, true)) {
                        downstream.onComplete();
                    }
                }
            }
        }
    

    相关文章

      网友评论

          本文标题:Rxjava2 Completable第一节mergeArray

          本文链接:https://www.haomeiwen.com/subject/bjthkctx.html