美文网首页
Rxjava Amb

Rxjava Amb

作者: CODERLIHAO | 来源:发表于2018-12-17 14:31 被阅读0次

所有代码的演示都在RxJava2.2.4版本上进行的

当你传递多个Observable给amb操作符时,该操作符只发射其中一个Observable的数据和通知:首先发送通知给amb操作符的的那个Observable,不管发射的是一项数据还是一个onError或onCompleted通知,amb将忽略和丢弃其它所有Observables的发射物


amb.png

1.example

   Observable<Integer> observable1 = Observable.timer(4, TimeUnit.SECONDS)
                .flatMap(__ -> Observable.just(1, 2, 3, 4, 5));

        Observable<Integer> observable2 = Observable.timer(3, TimeUnit.SECONDS)
                .flatMap(__ -> Observable.just(6, 7, 8, 9, 10));

        Observable<Integer> observable3 = Observable.timer(2, TimeUnit.SECONDS)
                .flatMap(__ -> Observable.just(11, 12, 13, 14, 15));

        Observable.ambArray(observable1, observable2, observable3)
                .subscribe(next -> Logger.getGlobal().info("OnNext: next: " + next));

2.源码分析
先看看subscribe方法做了什么

public final Disposable subscribe(Consumer<? super T> onNext) {
        return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }
   public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Disposable> onSubscribe) {
        ObjectHelper.requireNonNull(onNext, "onNext is null");
        ObjectHelper.requireNonNull(onError, "onError is null");
        ObjectHelper.requireNonNull(onComplete, "onComplete is null");
        ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

        LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

        subscribe(ls);

        return ls;
    }

很简单,构造一个LambdaObserver,注意代码里的 subscribe(ls);

   public final void subscribe(Observer<? super T> observer) {
            ...省略部分代码
            observer = RxJavaPlugins.onSubscribe(this, observer);
            subscribeActual(observer);
             ...省略部分代码
    }

subscribeActual是一个抽象方法,实现由子类负责
再来看看ambArray的执行

   public static <T> Observable<T> ambArray(ObservableSource<? extends T>... sources) {
        ObjectHelper.requireNonNull(sources, "sources is null");
        int len = sources.length;
        if (len == 0) {
            return empty();
        }
        if (len == 1) {
            return (Observable<T>)wrap(sources[0]);
        }
        return RxJavaPlugins.onAssembly(new ObservableAmb<T>(sources, null));
    }

重点返回一个ObservableAmb,也就是subscribe是由该类的实例执行的。上面说执行subscribe就会执行subscribeActual方法,ObservableAmb类也会重写了该方法

 public void subscribeActual(Observer<? super T> observer) {
        ObservableSource<? extends T>[] sources = this.sources;
        int count = 0;
        if (sources == null) {
            sources = new Observable[8];
            try {
                for (ObservableSource<? extends T> p : sourcesIterable) {
                    if (p == null) {
                        EmptyDisposable.error(new NullPointerException("One of the sources is null"), observer);
                        return;
                    }
                    if (count == sources.length) {
                        ObservableSource<? extends T>[] b = new ObservableSource[count + (count >> 2)];
                        System.arraycopy(sources, 0, b, 0, count);
                        sources = b;
                    }
                    sources[count++] = p;
                }
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                EmptyDisposable.error(e, observer);
                return;
            }
        } else {
            count = sources.length;
        }

        if (count == 0) {
            EmptyDisposable.complete(observer);
            return;
        } else
        if (count == 1) {
            sources[0].subscribe(observer);
            return;
        }

        AmbCoordinator<T> ac = new AmbCoordinator<T>(observer, count);
        ac.subscribe(sources);
    }

方法里面的sources是谁?就是一开始的observable1 ,observable2,observable3
方法里面的observer是谁?就是subscribe方法产生的LambdaObserver

看看AmbCoordinator类中subscribe方法

public void subscribe(ObservableSource<? extends T>[] sources) {
           AmbInnerObserver<T>[] as = observers;
           int len = as.length;
           for (int i = 0; i < len; i++) {
               as[i] = new AmbInnerObserver<T>(this, i + 1, downstream);
           }
           winner.lazySet(0); // release the contents of 'as'
           downstream.onSubscribe(this);

           for (int i = 0; i < len; i++) {
               if (winner.get() != 0) {
                   return;
               }

               sources[i].subscribe(as[i]);
           }
       }

downstream是通过AmbCoordinator构造方法传递进来的,也就是ObservableAmb实例
构造了一个AmbInnerObserver数组,在AmbInnerObserver内部,数组的大小与
AmbCoordinator<T> ac = new AmbCoordinator<T>(observer, count);
中的count是一致的,在AmbInnerObserver内部 ,parent就是AmbCoordinator实例
在AmbInnerObserver内部还关联着AmbInnerObserver的序号。
所有AmbInnerObserver数组中的对象共享downstream
在AmbCoordinator类中,还有一个win方法

      public boolean win(int index) {
            int w = winner.get();
            if (w == 0) {
               //如果原子变量中的值是0,就会设置原子变量为index,也就是上面序号
                if (winner.compareAndSet(0, index)) {
                    //胜利者拿到后,其他的observer都将失败,不会调用next,onError,onComplete
                    AmbInnerObserver<T>[] a = observers;
                    int n = a.length;
                    for (int i = 0; i < n; i++) {
                        if (i + 1 != index) {
                            a[i].dispose();
                        }
                    }
                    return true;
                }
                return false;
            }
            //已经有胜利者,直接拿出胜利者
            return w == index;
        }

可以看出amb操作符,谁先调用onNext谁就是胜利者,失败者死的悄无声息!!!

相关文章

网友评论

      本文标题:Rxjava Amb

      本文链接:https://www.haomeiwen.com/subject/hlgqkqtx.html