美文网首页
Rxjava Amb

Rxjava Amb

作者: CODERLIHAO | 来源:发表于2018-12-17 14:31 被阅读0次

    所有代码的演示都在RxJava2.2.4版本上进行的

    当你传递多个Observable给amb操作符时,该操作符只发射其中一个Observable的数据和通知:首先发送通知给amb操作符的的那个Observable,不管发射的是一项数据还是一个onError或onCompleted通知,amb将忽略和丢弃其它所有Observables的发射物


    amb.png

    1.example

       Observable<Integer> observable1 = Observable.timer(4, TimeUnit.SECONDS)
                    .flatMap(__ -> Observable.just(1, 2, 3, 4, 5));
    
            Observable<Integer> observable2 = Observable.timer(3, TimeUnit.SECONDS)
                    .flatMap(__ -> Observable.just(6, 7, 8, 9, 10));
    
            Observable<Integer> observable3 = Observable.timer(2, TimeUnit.SECONDS)
                    .flatMap(__ -> Observable.just(11, 12, 13, 14, 15));
    
            Observable.ambArray(observable1, observable2, observable3)
                    .subscribe(next -> Logger.getGlobal().info("OnNext: next: " + next));
    

    2.源码分析
    先看看subscribe方法做了什么

    public final Disposable subscribe(Consumer<? super T> onNext) {
            return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
        }
    
       public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
                Action onComplete, Consumer<? super Disposable> onSubscribe) {
            ObjectHelper.requireNonNull(onNext, "onNext is null");
            ObjectHelper.requireNonNull(onError, "onError is null");
            ObjectHelper.requireNonNull(onComplete, "onComplete is null");
            ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
    
            LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
    
            subscribe(ls);
    
            return ls;
        }
    

    很简单,构造一个LambdaObserver,注意代码里的 subscribe(ls);

       public final void subscribe(Observer<? super T> observer) {
                ...省略部分代码
                observer = RxJavaPlugins.onSubscribe(this, observer);
                subscribeActual(observer);
                 ...省略部分代码
        }
    

    subscribeActual是一个抽象方法,实现由子类负责
    再来看看ambArray的执行

       public static <T> Observable<T> ambArray(ObservableSource<? extends T>... sources) {
            ObjectHelper.requireNonNull(sources, "sources is null");
            int len = sources.length;
            if (len == 0) {
                return empty();
            }
            if (len == 1) {
                return (Observable<T>)wrap(sources[0]);
            }
            return RxJavaPlugins.onAssembly(new ObservableAmb<T>(sources, null));
        }
    

    重点返回一个ObservableAmb,也就是subscribe是由该类的实例执行的。上面说执行subscribe就会执行subscribeActual方法,ObservableAmb类也会重写了该方法

     public void subscribeActual(Observer<? super T> observer) {
            ObservableSource<? extends T>[] sources = this.sources;
            int count = 0;
            if (sources == null) {
                sources = new Observable[8];
                try {
                    for (ObservableSource<? extends T> p : sourcesIterable) {
                        if (p == null) {
                            EmptyDisposable.error(new NullPointerException("One of the sources is null"), observer);
                            return;
                        }
                        if (count == sources.length) {
                            ObservableSource<? extends T>[] b = new ObservableSource[count + (count >> 2)];
                            System.arraycopy(sources, 0, b, 0, count);
                            sources = b;
                        }
                        sources[count++] = p;
                    }
                } catch (Throwable e) {
                    Exceptions.throwIfFatal(e);
                    EmptyDisposable.error(e, observer);
                    return;
                }
            } else {
                count = sources.length;
            }
    
            if (count == 0) {
                EmptyDisposable.complete(observer);
                return;
            } else
            if (count == 1) {
                sources[0].subscribe(observer);
                return;
            }
    
            AmbCoordinator<T> ac = new AmbCoordinator<T>(observer, count);
            ac.subscribe(sources);
        }
    

    方法里面的sources是谁?就是一开始的observable1 ,observable2,observable3
    方法里面的observer是谁?就是subscribe方法产生的LambdaObserver

    看看AmbCoordinator类中subscribe方法

    public void subscribe(ObservableSource<? extends T>[] sources) {
               AmbInnerObserver<T>[] as = observers;
               int len = as.length;
               for (int i = 0; i < len; i++) {
                   as[i] = new AmbInnerObserver<T>(this, i + 1, downstream);
               }
               winner.lazySet(0); // release the contents of 'as'
               downstream.onSubscribe(this);
    
               for (int i = 0; i < len; i++) {
                   if (winner.get() != 0) {
                       return;
                   }
    
                   sources[i].subscribe(as[i]);
               }
           }
    

    downstream是通过AmbCoordinator构造方法传递进来的,也就是ObservableAmb实例
    构造了一个AmbInnerObserver数组,在AmbInnerObserver内部,数组的大小与
    AmbCoordinator<T> ac = new AmbCoordinator<T>(observer, count);
    中的count是一致的,在AmbInnerObserver内部 ,parent就是AmbCoordinator实例
    在AmbInnerObserver内部还关联着AmbInnerObserver的序号。
    所有AmbInnerObserver数组中的对象共享downstream
    在AmbCoordinator类中,还有一个win方法

          public boolean win(int index) {
                int w = winner.get();
                if (w == 0) {
                   //如果原子变量中的值是0,就会设置原子变量为index,也就是上面序号
                    if (winner.compareAndSet(0, index)) {
                        //胜利者拿到后,其他的observer都将失败,不会调用next,onError,onComplete
                        AmbInnerObserver<T>[] a = observers;
                        int n = a.length;
                        for (int i = 0; i < n; i++) {
                            if (i + 1 != index) {
                                a[i].dispose();
                            }
                        }
                        return true;
                    }
                    return false;
                }
                //已经有胜利者,直接拿出胜利者
                return w == index;
            }
    

    可以看出amb操作符,谁先调用onNext谁就是胜利者,失败者死的悄无声息!!!

    相关文章

      网友评论

          本文标题:Rxjava Amb

          本文链接:https://www.haomeiwen.com/subject/hlgqkqtx.html