美文网首页RxJava2学习记录Android
RxJava2操作符(FlatMap学习)

RxJava2操作符(FlatMap学习)

作者: WonderSky_HY | 来源:发表于2019-01-15 15:48 被阅读28次

    FlatMap是RxJava2变换操作符中比较重要的一个,本文我们来学习一下它的内部变换过程。
    使用FlatMap变换方法如下:

    private void init() {
        Observer<String> observer = new Observer<String>() {
            //.....代码省略
        };
    
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) {
                Log.i(TAG, "subscribe--运行线程:" + Thread.currentThread().getName());
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.io())
                .subscribeOn(Schedulers.computation())
                .observeOn(AndroidSchedulers.mainThread())
                //FlatMap变换
                .flatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(Integer integer) {
                        //将int类型参数转换为string类型参数,然后用just操作符将其重新发射出去
                        return Observable.just(String.valueOf(integer));
                    }
                })
                .subscribe(observer);
    }
    

    点进这个flatMap方法看下:

    public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
        //调用两个参数的FlatMap
        return flatMap(mapper, false);
    }
    
    public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors) {
        //调用三个参数的FlatMap
        return flatMap(mapper, delayErrors, Integer.MAX_VALUE);
    }
    
    public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency) {
        //调用四个参数的FlatMap
        return flatMap(mapper, delayErrors, maxConcurrency, bufferSize());
    }
    
    public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
                boolean delayErrors, int maxConcurrency, int bufferSize) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        //如果上游的Observable类型是ScalarCallable类型的(比如上游的observable是通过Observable.just创建的等等,这种情况比较少见)
        if (this instanceof ScalarCallable) {
            @SuppressWarnings("unchecked")
            T v = ((ScalarCallable<T>)this).call();
            if (v == null) {
                return empty();
            }
            return ObservableScalarXMap.scalarXMap(v, mapper);
        }
        //上游类型不是ScalarCallable类型,返回ObservableFlatMap(一般情况下都是返回这个)
        return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
    }
    

    FlatMap操作符通过一系列调用,最终生成了一个ObservableFlatMap对象,ObservableFlatMap类的狗造方法接收五个参数,简要介绍下:

    • ObservableSource<T> source:保存上游的observable对象。
    • Function<? super T, ? extends ObservableSource<? extends U>> mapper:调用flatMap操作符时传入的Function接口实现类对象。
    • boolean delayErrors:当订阅出现异常时,是否立即发送错误(备注:如果delayErrors为true,则第一个出现异常的序列将直接终止整个序列;如果delayErrors为false,则该异常将被推迟,直到整个任务序列被异常终止)。
    • int maxConcurrency:可以同时订阅的ObservableSource的最大数量(由于FlatMap是一对多变换,因此可能需要多个临时的Observable来辅助变换,最后再将这多个临时的Observable合并为一个将数据发射出去。这里的最大数量就是这多个临时的Observable数量)。
    • int bufferSize:数据缓冲区的缓存大小(默认为128)。

    看下ObservableFlatMap这个类:

    public final class ObservableFlatMap<T, U> extends AbstractObservableWithUpstream<T, U> {
        final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
        final boolean delayErrors;
        final int maxConcurrency;
        final int bufferSize;
    
        public ObservableFlatMap(ObservableSource<T> source,
                Function<? super T, ? extends ObservableSource<? extends U>> mapper,
                boolean delayErrors, int maxConcurrency, int bufferSize) {
            super(source);
            this.mapper = mapper;
            this.delayErrors = delayErrors;
            this.maxConcurrency = maxConcurrency;
            this.bufferSize = bufferSize;
        }
    
        @Override
        public void subscribeActual(Observer<? super U> t) {
            //如果上游的Observable是ScalarCallable(ScalarCallable接口继承了Callable接口)类型的,
            //则委托ObservableScalarXMap执行数据下发,程序返回
            //(备注:这里几乎不会执行,在调用FlatMap操作符创建新的Observable对象时就已经经过该类型的判断,参见上面的代码,
            //因此若最终返回ObservableFlatMap对象,上游observable对象必定不是ScalarCallable类型)
            if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
                return;
            }
            //如果上游Observable对象不是ScalarCallable类型,通过MergeObserver来实现具体的数据变换以及下发
            source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
        }
        //......代码省略
    }
    

    在subscribeActual方法内通过MergeObserver来包装下游的observer,并将其他参数传递进去,辅助变换。在RxJava中这些包装类的设计思路基本都是类似的,因此就不做过多描述了,我们看下MergeObserver内部的onNext方法:

    public void onNext(T t) {
        // safeguard against misbehaving sources
        if (done) {
            return;
        }
        ObservableSource<? extends U> p;
        try {
            //保存调用mapper.apply(t)方法生成的Observable,接收上游数据,用于接下来的数据变换
            p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            s.dispose();
            onError(e);
            return;
        }
    
        if (maxConcurrency != Integer.MAX_VALUE) {
            synchronized (this) {
                if (wip == maxConcurrency) {
                    sources.offer(p);
                    return;
                }
                wip++;
            }
        }
        //调用方法
        subscribeInner(p);
    }
    

    由于RxJava默认传入的maxConcurrency是Integer.MAX_VALUE(这也是我们通常使用的方式),因此直接调用 subscribeInner(p),这个p就是保存我们调用FlatMap操作符时实现的Function接口生成的Observable(p在初始化是调用了mapper.apply(t)方法,t为上游Observable发射的数据),用来进行数据变换。看下这个方法:

    void subscribeInner(ObservableSource<? extends U> p) {
        for (;;) {
            if (p instanceof Callable) {
                //1、如果传入的ObservableSource是Callable类型的(比如上面的示例代码生成的ObservableJust对象,但这属于特例)程序走这里,
                //(ScalarCallable接口继承了Callable接口),
                //这里tryEmitScalar执行的过程和下面的InnerObserver执行过程极为类似,这里我们就只介绍InnerObserver的执行过程
                tryEmitScalar(((Callable<? extends U>)p));
    
                if (maxConcurrency != Integer.MAX_VALUE) {
                    synchronized (this) {
                        p = sources.poll();
                        if (p == null) {
                            wip--;
                            break;
                        }
                    }
                } else {
                    break;
                }
            } else {
                //2、Function对象内部返回的observable不是Callable类型的,则为每个observable创建一个InnerObserver,
                //本文中我们只关心这里,上面的数据下发过程以此类似,因此我们只要分析一个方面就行了
                InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
                //MergeObserver内部有一个保存InnerObserver的数组observers,
                //因此这个addInner方法就是将每次新建的InnerObserver保存到这个数组中
                if (addInner(inner)) {
                    //对每次创建的InnerObserver执行订阅
                    p.subscribe(inner);
                }
                break;
            }
        }
    }
    

    本文我们只分析使用InnerObserver这种方式,InnerObserver又包装了当前的MergeObserver,并将新创建的InnerObserver保存到InnerObserver数组中。看下addInner方法:

    boolean addInner(InnerObserver<T, U> inner) {
        //这里通过死循环来保存新建的InnerObserver,确保保存成功
        for (;;) {
            //从observers获取InnerObserver数组,这个observers是一个AtomicReference类型,
            //确保在多线程环境下只有一个InnerObserver数组对象
            InnerObserver<?, ?>[] a = observers.get();
            //保存失败的唯一条件是外部取消订阅
            if (a == CANCELLED) {
                inner.dispose();
                return false;
            }
            int n = a.length;
            //保存新建的InnerObserver的目标数组
            InnerObserver<?, ?>[] b = new InnerObserver[n + 1];
            //源数组到目标数组的迁移
            System.arraycopy(a, 0, b, 0, n);
            //保存新建的InnerObserver
            b[n] = inner;
            //将目标数组设置回observers中,完成保存InnerObserver的数组的更新
            if (observers.compareAndSet(a, b)) {
                return true;
            }
        }
    }
    

    新建的InnerObserver保存成功后,调用p.subscribe(inner)开始下发数据(这里的p就是Function返回的Observable),依次调用InnerObserver的onSubscribe,onNext,onComplete/onError方法,我们看下这几个方法:

    //建立订阅关系,获取disposable订阅状态管理对象
    public void onSubscribe(Disposable s) {
        if (DisposableHelper.setOnce(this, s)) {
            //如果s是QueueDisposable类型
            if (s instanceof QueueDisposable) {
                @SuppressWarnings("unchecked")
                QueueDisposable<U> qd = (QueueDisposable<U>) s;
                //获取合并标记
                int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
                //这里根据订阅类型是同步的还是异步的执行相应的操作
                if (m == QueueDisposable.SYNC) {
                    fusionMode = m;
                    queue = qd;
                    done = true;
                    //同步情况下,直接发射数据
                    parent.drain();
                    return;
                }
                if (m == QueueDisposable.ASYNC) {
                    fusionMode = m;
                    queue = qd;
                }
            }
        }
    }
    

    接着看onNext方法:

    public void onNext(U t) {
        //由于在onSubscribe中条件(s instanceof QueueDisposable)为false,因此fusionMode值还是0,走第一个条件
        if (fusionMode == QueueDisposable.NONE) {
            parent.tryEmit(t, this);
        } else {
            parent.drain();
        }
    }
    

    看下tryEmit(t, this)方法:

    void tryEmit(U value, InnerObserver<T, U> inner) {
        //1、判断VALUE==0是否成立,如果成立则将其设置为1,并执行条件内的方法
        if (get() == 0 && compareAndSet(0, 1)) {
            //2、调用下游observer的onNext方法,下发数据
            actual.onNext(value);
            //3、每次下发数据后将VALUE减一,并判断减一后的VALUE是否为0,如果为0,则表示数据都已下发完毕,方法结束
            if (decrementAndGet() == 0) {
                return;
            }
        } else {
            SimpleQueue<U> q = inner.queue;
            if (q == null) {
                q = new SpscLinkedArrayQueue<U>(bufferSize);
                inner.queue = q;
            }
            //4、将接收到的上游数据缓存到队列中
            q.offer(value);
            //5、每次将一个数据缓存到缓存队列中后,将VALUE加一,
            //然后判断VALUE加一之前的值,若不等于0,直接返回,结束方法;若等于0,执行步骤6
            if (getAndIncrement() != 0) {
                return;
            }
        }
        //6、循环获取缓存队列中的数据
        drainLoop();
    }
    

    MergeObserver继承了AtomicInteger,主要是保证数据的下发是一个一个进行的。
    简要介绍下tryEmit方法的运行流程:

    • tryEmit方法第一次调用时,get() == 0成立,调用compareAndSet(0, 1)将VALUE设置为1,此时执行条件内部语句,开始下发数据。数据下发执行完后调用decrementAndGet()==0,这里是先将VALUE减一,VALUE值变为0,因此VALUE==0成立,方法结束。
    • 如果在下发数据执行期间,上游有新的数据传递过来,此时get() == 0不成立,进入步骤4,将上游发送过来的数据保存到缓存队列中。
    • 执行完步骤4后,调用getAndIncrement()将VALUE值加一,并判断VALUE加一之前的值,若不等于0,结束方法;若等于0,执行步骤6。
    • 步骤6是从队列中循环取出数据并下发给下游observer。其执行条件是步骤3或步骤5的条件判断不成立,什么时候不成立呢?
      假设有两个线程在执行,分别称为线程A和线程B,假设线程A先抢到了CPU权限,步骤1条件成立,此时VALUE值为1,但是在执行步骤3之前,线程B抢占了CPU执行权限,线程A处于休眠状态,由于此时VALUE值为1,get() == 0不成立,此时线程B就会将数据缓存到队列中。
      一种情况是:线程B继续执行,并在步骤5处将VALUE加一(VALUE值变为2),由于VALUE原先的值为1,因此方法结束,线程B让出CPU权限。线程A获取CPU权限,原先的步骤3继续执行(VALUE的值变为1),条件不成立,执行步骤6,从队列中循环取出数据并下发。
      另一种情况是:线程B在执行步骤5之前,失去了CPU的权限。原线程A抢到了CPU权限,执行步骤3(VALUE值变为0),条件3判断成立,线程A中这个方法结束,线程A让出CPU权限。线程B获取CPU权限,继续执行步骤5(VALUE值变为1),但getAndIncrement()方法获取的是VALUE原来的值,原来的值为0,因此条件不成立,执行步骤6,从队列中循环取出数据并下发。

    tryEmit方法介绍完了,再来看下drainLoop()这个方法:

    void drainLoop() {
        final Observer<? super U> child = this.actual;
        int missed = 1;
        for (;;) {
            //检查订阅是否被终止
            if (checkTerminate()) {
                return;
            }
            //获取MergeObserver内的缓存队列
            SimplePlainQueue<U> svq = queue;
            //通常情况下,MergeObserver内的缓存队列都是空的
            if (svq != null) {
                //如果缓存队列里面有数据,开始循环
                for (;;) {
                    U o;
                    for (;;) {
                        //再次检查订阅是否被终止
                        if (checkTerminate()) {
                            return;
                        }
                        //从缓存队列中取一个数据
                        o = svq.poll();
    
                        if (o == null) {
                            break;
                        }
                        //调用下游observer的onNext方法,下发数据
                        child.onNext(o);
                    }
                    if (o == null) {
                        break;
                    }
                }
            }
    
            boolean d = done;
            svq = queue;
            InnerObserver<?, ?>[] inner = observers.get();
            int n = inner.length;
            //调用下游observer的onComplete或onError方法
            if (d && (svq == null || svq.isEmpty()) && n == 0) {
                Throwable ex = errors.terminate();
                if (ex != ExceptionHelper.TERMINATED) {
                    if (ex == null) {
                        child.onComplete();
                    } else {
                        child.onError(ex);
                    }
                }
                return;
            }
    
            //处理MergeObserver内的InnerObserver数组
            boolean innerCompleted = false;
            if (n != 0) {
                long startId = lastId;
                int index = lastIndex;
    
                if (n <= index || inner[index].id != startId) {
                    if (n <= index) {
                        index = 0;
                    }
                    int j = index;
                    for (int i = 0; i < n; i++) {
                        if (inner[j].id == startId) {
                            break;
                        }
                        j++;
                        if (j == n) {
                            j = 0;
                        }
                    }
                    index = j;
                    lastIndex = j;
                    lastId = inner[j].id;
                }
    
                int j = index;
                sourceLoop:
                //循环处理InnerObserver数组内的每个InnerObserver对象
                for (int i = 0; i < n; i++) {
                    if (checkTerminate()) {
                        return;
                    }
                    @SuppressWarnings("unchecked")
                    InnerObserver<T, U> is = (InnerObserver<T, U>)inner[j];
                    //处理InnerObserver内的缓存队列,如果有缓存数据,则将其发射出去
                    for (;;) {
                        if (checkTerminate()) {
                            return;
                        }
                        SimpleQueue<U> q = is.queue;
                        if (q == null) {
                            break;
                        }
                        U o;
                        for (;;) {
                            try {
                                o = q.poll();
                            } catch (Throwable ex) {
                                Exceptions.throwIfFatal(ex);
                                is.dispose();
                                errors.addThrowable(ex);
                                if (checkTerminate()) {
                                    return;
                                }
                                removeInner(is);
                                innerCompleted = true;
                                i++;
                                continue sourceLoop;
                            }
                            if (o == null) {
                                break;
                            }
                            //发射InnerObserver内缓存队列缓存的数据
                            child.onNext(o);
    
                            if (checkTerminate()) {
                                return;
                            }
                        }
                        if (o == null) {
                            break;
                        }
                    }
                    boolean innerDone = is.done;
                    SimpleQueue<U> innerQueue = is.queue;
                    //如果InnerObserver处理完毕,并且其内部缓存队列的数据都已发射出去
                    if (innerDone && (innerQueue == null || innerQueue.isEmpty())) {
                        //将InnerObserver从数组中移除
                        removeInner(is);
                        if (checkTerminate()) {
                            return;
                        }
                        //设置InnerObserver状态为complete
                        innerCompleted = true;
                    }
    
                    j++;
                    if (j == n) {
                        j = 0;
                    }
                }
                lastIndex = j;
                lastId = inner[j].id;
            }
            //当前InnerObserver已经处理完毕,继续循环处理数组中下一个InnerObserver
            if (innerCompleted) {
                //传入的默认maxConcurrency值是Integer.MAX_VALUE,因此if条件内的代码不会执行
                if (maxConcurrency != Integer.MAX_VALUE) {
                    ObservableSource<? extends U> p;
                    synchronized (this) {
                        p = sources.poll();
                        if (p == null) {
                            wip--;
                            continue;
                        }
                    }
                    subscribeInner(p);
                }
                //结束当前循环,开始下一循环处理下一个InnerObserver
                continue;
            }
            //每次数据下发完毕,将VALUE值减一
            missed = addAndGet(-missed);
            //如果VALUE值变为0,表示缓存队列的数据已全部下发完毕,退出循环,方法结束
            if (missed == 0) {
                break;
            }
        }
    }
    

    至此整个FlatMap操作符的流程就分析完了,总结下:

    • 根据上游Observable对象的类型是不是ScalarCallable类型,FlatMap决定返回相应的新的Observable对象,一般情况下返回的都是ObservableFlatMap对象(ObservableScalarXMap对象处理数据分发的方式与ObservableFlatMap类似,这里我们只分析ObservableFlatMap)。
    • 下游订阅时,触发ObservableFlatMap的subscribeActual方法,触发上游subscribe --> subscribeActual --> 调用MergeObserver的onSubscribe建立订阅关系,上游调用onNext下发数据 --> 调用MergeObserver的onNext,数据下发完毕或者出错调用MergeObserver的onComplete或onError。
    • 在MergeObserver的onNext方法中,获取FlatMap操作符接收的Function对象返回的数据变换Observable(暂时命名为observableA),并为每个返回的observableA创建一个对应的InnerObserver对象(暂时命名为innerObserverA),然后直行订阅observableA.subscribe(innerObserverA),最终将上游的数据经过变换后重新发射出去。
    • InnerObserver内部有一个缓存队列,用于缓存变换后的数据,其onNext方法内部最终还是调用的MergeObserver的tryEmit方法,将变换后的数据重新发射到下游observer。MergeObserver实现了AtomicInteger类,采用CAS操作保证了数据下发操作的原子性(即每次只有一个数据下发,在当前数据下发过程中,如果上游有新的数据到来,则将新的数据保存到InnerObserver的缓存队列中。等当前数据下发完毕后,再从InnerObserver的缓存队列中取出数据并将其下发给下游observer)。

    相关文章

      网友评论

        本文标题:RxJava2操作符(FlatMap学习)

        本文链接:https://www.haomeiwen.com/subject/sbuxdqtx.html