美文网首页
rxjava中merge的理解

rxjava中merge的理解

作者: ironman_ | 来源:发表于2018-02-26 23:59 被阅读0次

merge的官方解释:
http://reactivex.io/documentation/operators/merge.html

image.png
通过合并多个Observable发出的结果将多个Observable合并成一个Observable的操作。
有一段描述:

Merge may interleave the items emitted by the merged Observables (a similar operator, Concat, does not interleave items, but emits all of each source Observable’s items in turn before beginning to emit items from the next source Observable).

也就是说merge操作可能交替出现多个observable发出的内容,而concat操作是会将一个observable发出的内容发完才会发下一个observable发出的内容。

写一个列子验证一下:

    private void doSomeWork() {
        final String[] aStrings = {"A1", "A2", "A3", "A4"};
        final String[] bStrings = {"B1", "B2", "B3"};

        final Observable<String> aObservable = Observable.fromArray(aStrings);
        final Observable<String> bObservable = Observable.fromArray(bStrings);

        Observable.merge(bObservable,aObservable)
            .subscribeOn(AndroidSchedulers.mainThread())
            .subscribe(genSubscriber());
    }


    private Observer<String> genSubscriber() {
        return new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, " onSubscribe : " + d.isDisposed());
            }

            @Override
            public void onNext(String value) {
                Log.d(TAG, " onNext : value : " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, " onError : " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, " onComplete");
            }
        };
    }

但是发出发出的内容是先a的,然后再b的。

是不是observable里运行太快了?所以很快运行完了,于是我添加了一些sleep等待模拟耗时操作。

    private void doSomeWork() {
        Observable<String> aObservable = genObservable("observable one:");
        Observable<String> bObservable = genObservable("observable two:");

        Observable.merge(bObservable,aObservable)
            .subscribeOn(AndroidSchedulers.mainThread())
            .subscribe(genSubscriber());
    }

    private Observable<String> genObservable(final String name) {
        return Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
                for (int i=0;i<5;i++) {
                    //在这里等待
                    Thread.sleep(1000);
                    observableEmitter.onNext(name+i);
                }
                observableEmitter.onComplete();
            }
        });
    }

但是运行结果依然是一个observable的内容发完,然后再发另一个observable的内容。

这是什么原因呢?尝试着将Observable里的执行内容改成异步的。

    private void doSomeWork() {

        final Observable<String> aObservable = genTaskObservable("tas one");
        final Observable<String> bObservable = genTaskObservable("tas two");

        Observable.merge(bObservable,aObservable)
            .subscribeOn(AndroidSchedulers.mainThread())
            .subscribe(genSubscriber());
    }

//运行异步任务的observable
    private Observable<String> genTaskObservable(final String name) {
        return Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
                AsyncTask.THREAD_POOL_EXECUTOR.execute(getRunnable(observableEmitter));
            }

            private Runnable getRunnable(final ObservableEmitter<String> observableEmitter) {
                return new Runnable() {
                    @Override
                    public void run() {
                        for (int i=0;i<10;i++) {
                            observableEmitter.onNext(name + i);
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                        observableEmitter.onComplete();
                    }
                };
            }
        });

    }

果然这下就跟描述的是一样的,发出的内容是穿插的。

之前我都是到这里就结束了,如果不尝试着从源码里找答案,跟原来就没什么分别了。

所有还有一个问题:为什么是这样呢?源码里是怎么写的呢?我只尝试了这种情况,我没有尝试的情况其实我是不知道的。

接下来看源码

    public static <T> Observable<T> merge(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2) {
        ObjectHelper.requireNonNull(source1, "source1 is null");
        ObjectHelper.requireNonNull(source2, "source2 is null");
        return fromArray(source1, source2).flatMap((Function)Functions.identity(), false, 2);
    }

发现merge的内部实现其实是通过flatMap做的,先将两个ObservableSource合并成一个Observable,然后再通过flatMap将这个ObservableSource里的内容发出

看flatMap的官方解释:


image.png

将一个Observable发出的内容转换成多个Observable,然后再讲多个Observable发出的内容合并成一个observable。

所以merge是如何操作的,得看flatMap是如何操作的,关于faltMap的理解可以参考这篇文章

接着看flatMap的源码:

    public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        if (this instanceof ScalarCallable) {
            @SuppressWarnings("unchecked")
            T v = ((ScalarCallable<T>)this).call();
            if (v == null) {
                return empty();
            }
            return ObservableScalarXMap.scalarXMap(v, mapper);
        }
        return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
    }

这个observable不是ScalarCallable所以直接看ObservableFlatMap的源码(RxJavaPlugins是用来做一些hook的,可以先不用管它)

ObservableFlatMap也是一个Observable,直接看他的subscribeActual方法:

    @Override
    public void subscribeActual(Observer<? super U> t) {

        if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
            return;
        }

        source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
    }

tryScalarXMapSubscribe这个方法是用来执行Callable的,刚刚传进来的ObservableSource并不是Callable,直接看下面一句,source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
直接subscribe了一个observer,把任务丢了出去,接着看MergeObserver这个里面的代码:

@Override
        public void onNext(T t) {
            // safeguard against misbehaving sources
            if (done) {
                return;
            }
            ObservableSource<? extends U> p;
            try {
                p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                s.dispose();
                onError(e);
                return;
            }

            if (maxConcurrency != Integer.MAX_VALUE) {
                synchronized (this) {
                    if (wip == maxConcurrency) {
                        sources.offer(p);
                        return;
                    }
                    wip++;
                }
            }

            subscribeInner(p);
        }

因为是一个observer,直接看onNext方法,应用刚刚的mapper方法将原始ObservableSource发出内容转变成一个ObservableSource。merge方法传过来maxConcurrency等于2。会走到if (maxConcurrency != Integer.MAX_VALUE)这个方法块里,maxConcurrency是用来控制同时可以往下传递多少个Observable。比如这里merge穿进来的maxConcurrency是2,而且是合并两个ObservableSource,所以这两个ObservableSource发出的内容直接就像下传播了,这也就是我们寻找的merge的操作,有事件到达了就直接往下发送,并不做其他处理。

接着往下看subscribeInner(p)方法:

        void subscribeInner(ObservableSource<? extends U> p) {
            for (;;) {
                if (p instanceof Callable) {
                    tryEmitScalar(((Callable<? extends U>)p));

                    if (maxConcurrency != Integer.MAX_VALUE) {
                        synchronized (this) {
                            p = sources.poll();
                            if (p == null) {
                                wip--;
                                break;
                            }
                        }
                    } else {
                        break;
                    }
                } else {
                    InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
                    if (addInner(inner)) {
                        p.subscribe(inner);
                    }
                    break;
                }
            }
        }

        boolean addInner(InnerObserver<T, U> inner) {
            for (;;) {
                InnerObserver<?, ?>[] a = observers.get();
                if (a == CANCELLED) {
                    inner.dispose();
                    return false;
                }
                int n = a.length;
                InnerObserver<?, ?>[] b = new InnerObserver[n + 1];
                System.arraycopy(a, 0, b, 0, n);
                b[n] = inner;
                if (observers.compareAndSet(a, b)) {
                    return true;
                }
            }
        }

merge方法传过来的observableSource并不是一个Callable直接看else块里的方法, 创建了一个InnerObserver对象,然后把InnerObserver存到observers里。
然后让innerObserver监听p发出的事件。

innerObserver里做的事情还是很多的, 不过这次的探索可以只看onNext方法,他只是把消息发送出去。

大概的流程如下图:


image.png

总结一下merge的操作:
直接将两个observableSource合并成一个,然后使用flatMap操作将两个ObservableSource合并成一个。

相关文章

网友评论

      本文标题:rxjava中merge的理解

      本文链接:https://www.haomeiwen.com/subject/ndiuxftx.html