美文网首页
RxJava2.0操作符flatmap源码分析

RxJava2.0操作符flatmap源码分析

作者: zuoweitan | 来源:发表于2018-08-15 17:52 被阅读117次

    RxJava 2.0简介

    关于RxJava到底是什么,我们可以看看它的开发者是如何描述它的:
    RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

    It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.

    它是通过使用可观察序列来组件异步和基于事件程序的库,它使用了观察这模式来支持事件与数据的序列化,并且提供了一些列的操作符,你可以用这些操作符来将各种序列显式的组合起来,并且它可以帮你把异步线程、线程同步、线程和并发的数据结构中抽象出来,让你远离那些实现细节。

    嗯,没错,这就是RxJava,一个啥都不需要你考虑,只需要把你想做的事务和要处理的数据组合成操作序列,剩下的就交给RxJava就好了。

    当然,不是说RxJava能做到的,其他库或者我们自己就不能做到,那为什么还要使用RxJava呢?(其实我不喜欢使用RxJava)这里我挑一些别人说的理由:

    1. 它是一种函数响应式编程,显然函数响应式编程的好处它都有(不知道函数响应式编程的自己查资料)

    2. 逻辑清晰,使用简洁(是不是这样呢?谁用谁知道)

    而我不喜欢的理由:

    我也不是不喜欢RxJava,只是觉得有时候只是为了一个异步任务就动不动RxJava,有点杀鸡用牛刀的感觉。这些轻量级别的任务,我还是喜欢用bolts-task(强推)。这里贴一个科普链接:https://blog.csdn.net/u010295885/article/details/52463039

    操作符flatmap的简单介绍与应用

    flatmap是RxJava众多操作符中的一个,flat字面含义就是"压平",当你需要把多个集合整理成一个集合,其实就是一个压平的操作,如下图,这个时候你就可以使用flatmap这个操作符了。


    image.png

    看一个简单的应用

    List<Student> students = new ArrayList<Student>();
    students.add(new Student("1", "zhangsan")
        .addCourse(new Course("1", "数学"))
        .addCourse(new Course("5", "计算机")));
    students.add(new Student("2", "lisi")
        .addCourse(new Course("2", "语文")));
    students.add(new Student("3", "wangwu")
        .addCourse(new Course("3", "英语")));
    Observable.fromIterable(students)
        .flatMap(new Function<Student, ObservableSource<Course>>() {
    
            @Override
            public ObservableSource<Course> apply(Student student) throws Exception {
                return Observable.fromIterable(student.courses);
            }
        })
    .subscribe(new Consumer<Course>() {
        @Override
        public void accept(Course course) throws Exception {
            System.out.println(course);
        }
    });
    

    应用非常简单,数据源是一些学生,而我最后打印的是每个学生的所选的课程,我们来试试使用map操作符来实现看看:

    Observable.fromIterable(students)
        .map(new Function<Student, List<Course>>() {
    
            @Override
            public List<Course> apply(Student student) throws Exception {
                return student.courses;
            }
        })
        .subscribe(new Consumer<List<Course>>() {
            @Override
            public void accept(List<Course> courses) throws Exception {
                for (Course cours : courses) {
                    System.out.println(cours);
                }
            }
        });
    

    比较看下来,相对于flatmap,map操作符只能给我们每个学生所选的所有课程,并不能告诉我们他所选的每一门课都是什么,这就需要我们自己去加一层遍历了。
    另外,从比较结果来看,flatmap好像是替我们对学生的课程列表做了一次遍历。嗯,没错,从结果上看它确实是做了,那它是怎么做到的呢?

    flatmap源码分析

    这里以上面那个简单应用作为一个入口,来看看flatmap如何帮我们把数据压平的。

    1. Observable::fromIterable
    public static <T> Observable<T> fromIterable(Iterable<? extends T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableFromIterable<T>(source));
    }
    

    创建一个ObservableFromIterable对象,构造参数为一个迭代器(在我们的应用就是students)。然后判断是否要进行Hook操作,最后返回一个Observable对象。

    2. RxJavaPlugins::onAssembly
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }
    

    这里的f为空,即不需要Hook,所以第一步中会直接返回一个ObservableFromIterable对象。接下来我们调用了flatmap。

    3. ObservableFromIterable::flatmap
    public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
        return flatMap(mapper, false);
    }
    

    flatmap被重载了很多次,对于只传了一个映射方法的flatmap方法,最后会调用下面的flatmap

    public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        if (this instanceof ScalarCallable) {
            @SuppressWarnings("unchecked")
            T v = ((ScalarCallable<T>)this).call();
            if (v == null) {
                return empty();
            }
            return ObservableScalarXMap.scalarXMap(v, mapper);
        }
        return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
    }
    

    这里需要关注它几个参数:

    1. mapper:就是student到ObservableSource<Course>的映射方法
    2. delayErrors:false
    3. maxConcurrency:Integer.MAX_VALUE
    4. bufferSize:bufferSize()
      由于不Hook,所以这里直接返回ObservableFlatMap对象,注意这里把我们在第一步中创
      ObservableFromIterable作为构造参数传给了ObservableFlatMap对象。
    4.ObservableFlatMap::this
    public ObservableFlatMap(ObservableSource<T> source,
            Function<? super T, ? extends ObservableSource<? extends U>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
        super(source);
        this.mapper = mapper;
        this.delayErrors = delayErrors;
        this.maxConcurrency = maxConcurrency;
        this.bufferSize = bufferSize;
    }
    

    流程很简单,这里要注意这个source最后会赋值给父类的一个名字为source成员。如下

    /** The source consumable Observable. */
    protected final ObservableSource<T> source;
    
    /**
     * Constructs the ObservableSource with the given consumable.
     * @param source the consumable Observable
     */
    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }
    

    小结,flatmap只做了一件事情,就是把我们最开始的ObservableFromIterable包了一层,构造了一个ObservableFlatMap,然后我们对ObservableFlatMap进行了订阅操作

    5. ObservableFlatMap::subscribe

    回顾下我们的调用

    .subscribe(new Consumer<Course>() {
        @Override
        public void accept(Course course) throws Exception {
            System.out.println(course);
        }
    });
    

    调用的是参数类型为Consumer的subscribe方法

    public final Disposable subscribe(Consumer<? super T> onNext) {
        return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Disposable> onSubscribe) {
        ObjectHelper.requireNonNull(onNext, "onNext is null");
        ObjectHelper.requireNonNull(onError, "onError is null");
        ObjectHelper.requireNonNull(onComplete, "onComplete is null");
        ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
    
        LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
    
        subscribe(ls);
    
        return ls;
    }
    

    调用了subscribe多参重载方法,注意这里的传参

    1. onNext:就是我们的Consumer
    2. onSubscribe:Functions.emptyConsumer(),它返回一个EmptyConsumer对象,accept不做任何处理
    static final class EmptyConsumer implements Consumer<Object> {
        @Override
        public void accept(Object v) { }
    
        @Override
        public String toString() {
            return "EmptyConsumer";
        }
    }
    

    多参的subscribe将我们的入参包装成了一个Observer对象,也就是LambdaObserver来代理了我们的Consumer,然后调用参数类型为Observer的subscribe

    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);
    
            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
    
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);
    
            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }
    

    这里RxJavaPlugins.onSubscribe依然不会对observer进行Hook,所以这里最后就是直接调用subscribeActual方法,它是一个抽象方法,每个子类都会有自己的实现,进行真正的订阅操作,这里我们最后调用的就是ObservableFlatMap的subscribeActual方法

    6. ObservableFlatMap::subscribeActual
    @Override
    public void subscribeActual(Observer<? super U> t) {
    
        if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
            return;
        }
    
        source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
    }
    

    还记得这个source是谁吧?就是开始创建的那个ObservableFromIterable对象,这里才开始对我们原始的数据源进行订阅。而且从这里开始,每一次调用subscribe,都会先创建一个新的Observer对象来代理上一个Observer对象,从而形成一种上下游的Flow,这个为后面的onSubscribe做了准备。

    小结,subscribe方法与onSubscribe是一对操作,subscribe负责把订阅者一层层包起来,每包一层就增加了一层对原始数据的加工,而onSubscribe则负责把加工的数据一层层往里面传,最后传给我们最初的订阅者。可以结合下图理解


    image.png

    接下来,就看看原始订阅者的上游MergeObserver

    7. MergeObserver::this
    MergeObserver(Observer<? super U> actual, Function<? super T, ? extends ObservableSource<? extends U>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
        this.downstream = actual;
        this.mapper = mapper;
        this.delayErrors = delayErrors;
        this.maxConcurrency = maxConcurrency;
        this.bufferSize = bufferSize;
        if (maxConcurrency != Integer.MAX_VALUE) {
            sources = new ArrayDeque<ObservableSource<? extends U>>(maxConcurrency);
        }
        this.observers = new AtomicReference<InnerObserver<?, ?>[]>(EMPTY);
    }
    

    重点关注几个地方:

    1. downStream:下游,这里就是我们最原始的订阅者
    2. mapper:就是之前我们给flatmap的映射函数
    3. maxConcurrency:就是我们构建ObservableFlatMap所传的值,Integer.MAX_VALUE
    4. observers:InnerObserver类型数组的一个原子引用

    这里涉及到了InnerObserver,既然已经登场我们就先来简单了解下

    8. InnerObserver
    static final class InnerObserver<T, U> extends AtomicReference<Disposable>
    implements Observer<U> {
    
        private static final long serialVersionUID = -4606175640614850599L;
        final long id;
        final MergeObserver<T, U> parent;
    
        volatile boolean done;
        volatile SimpleQueue<U> queue;
    
        int fusionMode;
    
        InnerObserver(MergeObserver<T, U> parent, long id) {
            this.id = id;
            this.parent = parent;
        }
    

    这里暂时只需要知道,它是一个Observer对象,它有个parent指向MergeObserver,我们先回到第6步中source被MergeObserver订阅的地方。这里的source就是我们原始的观察源ObservableFromIterable,我们看看它的subscribe方法

    9.ObservableFromIterable::subscribeActual

    同样的,subscribe方法是它的基类方法,最后都会调用到子类的subscribeActual

    @Override
    public void subscribeActual(Observer<? super T> observer) {
        Iterator<? extends T> it;
        try {
            it = source.iterator();
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            EmptyDisposable.error(e, observer);
            return;
        }
        boolean hasNext;
        try {
            hasNext = it.hasNext();
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            EmptyDisposable.error(e, observer);
            return;
        }
        if (!hasNext) {
            EmptyDisposable.complete(observer);
            return;
        }
    
        FromIterableDisposable<T> d = new FromIterableDisposable<T>(observer, it);
        observer.onSubscribe(d);
    
        if (!d.fusionMode) {
            d.run();
        }
    }
    

    整个流程很简单,source由第1步中知道它就是我们提供的数据源(在我们的实例中就是students),如果source有数据,则走接下来的三步:

    1. 构建FromIterableDisposable,一个Disposable对象,是onSubscribe方法的参数。它的构造参数有两个
      1. observer:这里就是MergeObserver,作为下游保存起来
      2. it:数据源的迭代器
    2. 调用MergeObserver的onSubscribe方法
      如果FromIterableDisposable没有被消费,则执行它的run方法

    小结,这里才开始真正的消费数据,两种情况:首先交给onSubscribe消费,如果没有被消费则通过FromIterableDisposable的run方法来消费

    10. MergeObserver::onSubscribe
    @Override
    public void onSubscribe(Disposable d) {
        if (DisposableHelper.validate(this.upstream, d)) {
            this.upstream = d;
            downstream.onSubscribe(this);
        }
    }
    

    流程很简单,首先判断是否已经有了上游了,如果有了,则不做任何处理,否则参数d作为自己的上游,赋值给upStream,调用自己下游的onSubscribe方法。这里的downstream就是我们最原始的订阅者Consumer,由第5步可知,它被LambdaObserver代理了,我们看看LambdaObserver的onSubscribe方法

    11. LambdaObserver::onSubscribe
    public void onSubscribe(Disposable d) {
        if (DisposableHelper.setOnce(this, d)) {
            try {
                onSubscribe.accept(this);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                d.dispose();
                onError(ex);
            }
        }
    }
    

    首先保存了下d,然后调用它的onSubscribe的accept方法,由第5步可知,onSubscribe就是一个EmptyConsumer对象,看看EmptyConsumer的accept方法

    12. EmptyConsumer::accept
    static final class EmptyConsumer implements Consumer<Object> {
        @Override
        public void accept(Object v) { }
    
        @Override
        public String toString() {
            return "EmptyConsumer";
        }
    }
    

    accpet是一个空方法,所以onSubscribe并没有做消费数据的操作,所以FromIterableDisposable的fusionMode还是false,由第9步可知,会走到FromIterableDisposable::run方法,通过它来消费数据

    小结,从以上onSubscribe流程可以看到,每个onSubscribe参数都是上游,执行体里面都是调用它的下游来消费数据,跟第6步的小结吻合

    13. FromIterableDisposable::run
    void run() {
        boolean hasNext;
    
        do {
            if (isDisposed()) {
                return;
            }
            T v;
    
            try {
                v = ObjectHelper.requireNonNull(it.next(), "The iterator returned a null value");
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                downstream.onError(e);
                return;
            }
    
            downstream.onNext(v);
    
            if (isDisposed()) {
                return;
            }
            try {
                hasNext = it.hasNext();
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                downstream.onError(e);
                return;
            }
        } while (hasNext);
    
        if (!isDisposed()) {
            downstream.onComplete();
        }
    }
    

    代码略长,简单梳理下流程

    1. 该任务是否已经取消了?是,则退出,否则判断是否有下一个数据,没有则到3,有则到2
    2. 调用下游的onNext,将数据传给下游,这里的downStream就是MergeObserver,再到1
    3. 调用下游的onComplete方法

    还有一个错误处理的分支:当获取数据的过程中出错了,则会调用下游的onError

    14. MergeObserver::onNext
    @Override
    public void onNext(T t) {
        // safeguard against misbehaving sources
        if (done) {
            return;
        }
        ObservableSource<? extends U> p;
        try {
            p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            upstream.dispose();
            onError(e);
            return;
        }
    
        if (maxConcurrency != Integer.MAX_VALUE) {
            synchronized (this) {
                if (wip == maxConcurrency) {
                    sources.offer(p);
                    return;
                }
                wip++;
            }
        }
    
        subscribeInner(p);
    }
    

    去掉一些判断,流程还是很简单的

    1. 通过映射函数mapper,将参数t转变成一个新的观察源ObservableSource对象
    2. maxConcurrency为Integer.MAX_VALUE,所以直接调用subscribeInner
    15. MergeObserver::subscribeInner
    void subscribeInner(ObservableSource<? extends U> p) {
        for (;;) {
            if (p instanceof Callable) {
                if (tryEmitScalar(((Callable<? extends U>)p)) && maxConcurrency != Integer.MAX_VALUE) {
                    boolean empty = false;
                    synchronized (this) {
                        p = sources.poll();
                        if (p == null) {
                            wip--;
                            empty = true;
                        }
                    }
                    if (empty) {
                        drain();
                        break;
                    }
                } else {
                    break;
                }
            } else {
                InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
                if (addInner(inner)) {
                    p.subscribe(inner);
                }
                break;
            }
        }
    }
    

    由我们的映射函数生成的ObservableSource是ObservableFromIterable,它没有实现Callable,所有走到了else分支,流程如下:

    1. 创建InnerObserver
    2. addInner,将每个InnerObserver添加到observers中
    3. 使用InnerObserver来对我们通过mapper创建的ObservableFromIterable进行订阅

    从第8步中,我们知道每一个InnerObserver都有一个parent指向MergeObserver,留意这一点

    16. MergeObserver::addInner
    boolean addInner(InnerObserver<T, U> inner) {
        for (;;) {
            InnerObserver<?, ?>[] a = observers.get();
            if (a == CANCELLED) {
                inner.dispose();
                return false;
            }
            int n = a.length;
            InnerObserver<?, ?>[] b = new InnerObserver[n + 1];
            System.arraycopy(a, 0, b, 0, n);
            b[n] = inner;
            if (observers.compareAndSet(a, b)) {
                return true;
            }
        }
    }
    

    将每一个InnerObserver存放到observers当中,其实逻辑很简单,但是这里做了很多同步操作
    由第9步可知,对ObservableFromIterable进行订阅,首先会回调订阅者的onSubscribe来消费数据,我们看看InnerObserver的onSubscribe

    17. InnerObserver::onSubscribe
    @Override
    public void onSubscribe(Disposable d) {
        if (DisposableHelper.setOnce(this, d)) {
            if (d instanceof QueueDisposable) {
                @SuppressWarnings("unchecked")
                QueueDisposable<U> qd = (QueueDisposable<U>) d;
    
                int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
                if (m == QueueDisposable.SYNC) {
                    fusionMode = m;
                    queue = qd;
                    done = true;
                    parent.drain();
                    return;
                }
                if (m == QueueDisposable.ASYNC) {
                    fusionMode = m;
                    queue = qd;
                }
            }
        }
    }
    

    流程如下

    1. 一样的,保存自己的上游
    2. 判断d是否是QueueDisposable,而FromIterableDisposable是QueueDisposable的子类,所以成立
    3. 调用FromIterableDisposable::requestFusion,这里传参是ANY | BOUNDARY
    4. 如果requestFusion返回SYNC则会走parent.drain方法,这里的parent就是MergeObserver,另外有以下几个赋值
      1. fusionMode = SYNC
      2. done = true
      3. queue = qd,即queue指向了FromIterableDisposable,也就是它的上游,随后会通过queue从它的上游获取数据
    18. FromIterableDisposable::requestFusion
    @Override
    public int requestFusion(int mode) {
        if ((mode & SYNC) != 0) {
            fusionMode = true;
            return SYNC;
        }
        return NONE;
    }
    

    由传参ANY | BOUNDARY,得到返回就是SYNC,故会走到MergeObserver::drain方法,另外这里讲标志位fusionMode标记为true了,即该FromIterableDisposable已经被消费了,它的FromIterableDisposable::run将不再被执行

    19. MergeObserver::drain
    void drain() {
        if (getAndIncrement() == 0) {
            drainLoop();
        }
    }
    

    做了多线程保护,然后直接调用drainLoop

    20. MergeObserver::drainLoop
    void drainLoop() {
        final Observer<? super U> child = this.downstream;
        int missed = 1;
        for (;;) {
            if (checkTerminate()) {
                return;
            }
            //这里queue没有赋值,所有为空
            SimplePlainQueue<U> svq = queue;
    
            if (svq != null) {
                for (;;) {
                    U o;
                    for (;;) {
                        if (checkTerminate()) {
                            return;
                        }
    
                        o = svq.poll();
    
                        if (o == null) {
                            break;
                        }
    
                        child.onNext(o);
                    }
                    if (o == null) {
                        break;
                    }
                }
            }
            //done为false
            boolean d = done;
            svq = queue;
            InnerObserver<?, ?>[] inner = observers.get();
            int n = inner.length; //n至少为1
    
            int nSources = 0;
            if (maxConcurrency != Integer.MAX_VALUE) {
                synchronized (this) {
                    nSources = sources.size();
                }
            }
    
            if (d && (svq == null || svq.isEmpty()) && n == 0 && nSources == 0) {
                Throwable ex = errors.terminate();
                if (ex != ExceptionHelper.TERMINATED) {
                    if (ex == null) {
                        child.onComplete();
                    } else {
                        child.onError(ex);
                    }
                }
                return;
            }
            //所以我们可以从这里开始分析,以下一段代码是确定下一个需要处理的InnerObserver的lastIndex,lastId,记为流程1
            boolean innerCompleted = false;
            if (n != 0) {
                long startId = lastId;
                int index = lastIndex;
    
                if (n <= index || inner[index].id != startId) {
                    if (n <= index) {
                        index = 0;
                    }
                    int j = index;
                    for (int i = 0; i < n; i++) {
                        if (inner[j].id == startId) {
                            break;
                        }
                        j++;
                        if (j == n) {
                            j = 0;
                        }
                    }
                    index = j;
                    lastIndex = j;
                    lastId = inner[j].id;
                }
                //找到以下一个待处理的InnerObserver后,则开始从index处开始遍历依次向后处理observers中未处理的InnerObserver,记为流程2
                int j = index;
                sourceLoop:
                for (int i = 0; i < n; i++) {
                    if (checkTerminate()) {
                        return;
                    }
                    @SuppressWarnings("unchecked")
                    InnerObserver<T, U> is = (InnerObserver<T, U>)inner[j];
    
                    for (;;) {
                        if (checkTerminate()) {
                            return;
                        }
                        SimpleQueue<U> q = is.queue;
                        if (q == null) {
                            break;
                        }
                        U o;
                        for (;;) {
                            try {
                                o = q.poll();
                            } catch (Throwable ex) {
                                Exceptions.throwIfFatal(ex);
                                is.dispose();
                                errors.addThrowable(ex);
                                if (checkTerminate()) {
                                    return;
                                }
                                removeInner(is);
                                innerCompleted = true;
                                i++;
                                continue sourceLoop;
                            }
                            if (o == null) {
                                break;
                            }
    
                            child.onNext(o);
    
                            if (checkTerminate()) {
                                return;
                            }
                        }
                        if (o == null) {
                            break;
                        }
                    }
                    boolean innerDone = is.done;
                    SimpleQueue<U> innerQueue = is.queue;
                    if (innerDone && (innerQueue == null || innerQueue.isEmpty())) {
                        removeInner(is);
                        if (checkTerminate()) {
                            return;
                        }
                        innerCompleted = true;
                    }
    
                    j++;
                    if (j == n) {
                        j = 0;
                    }
                }
                lastIndex = j;
                lastId = inner[j].id;
            }
    
            if (innerCompleted) {
                if (maxConcurrency != Integer.MAX_VALUE) {
                    ObservableSource<? extends U> p;
                    synchronized (this) {
                        p = sources.poll();
                        if (p == null) {
                            wip--;
                            continue;
                        }
                    }
                    subscribeInner(p);
                }
                continue;
            }
            missed = addAndGet(-missed);
            if (missed == 0) {
                break;
            }
        }
    }
    

    鉴于代码比较长,代码有些地方我做了注释,由于判断条件都不满足,所以我们只用看代码中标注的两个流程

    1. 流程1
      n不为0,进入if分支,流程如下

      1. 获取上次记录的待处理的InnerObserver的id以及index
      2. 如果observers中的InnerObserver中的个数减少了或者lastIndex对应的InnerObserver发生变化了,则到3,否则已经找到
      3. 分两种情况:
        1. 如果observers中的InnerObserver的个数减少了,则需要重置index,重头开始找到待处理的InnerObserver,重新赋值给lastId以及lastIndex
        2. 如果observer中的innerObserver的个数没有减少,但是最之前lastIndex对应的InnerObserver已经发生了变化,则需要从index处重新找到待处理的InnerObserver,如果没有找到,则重头开始找
    2. 流程2
      通过index取出需要处理的InnerObserver,拿到InnerObserver中的queue,开始遍历
      queue中的数据,由第17步中,可知这个queue就是FromIterableDisposable,FromIterableDisposable中有一个迭代器,queue通过其中的迭代器不断的获取数据通过onNext回调传给它的下游,这里就是我们最原始的订阅者。当前的queue处理完后,外层循环会因为o == null而退出,随后就是赋值lastIndex与lastId,也就是下一个即将要被处理的InnerObserver。

    小结,MergeObserver通过drainLoop方法将我们通过mapper创建出来的观察源一一创建一个内部订阅者,也就是InnerObserver,由InnerObserver来消费这些观察源,而InnerObserver又持有一个MergeObserver的引用,而MergeObserver又持有它的下游,也就是我们最原始的订阅者,这样我们就可以把由mapper创建的观察源的数据传给最终的订阅者

    总结

    1. RxJava中有大量的服务于多线程的代码,从flatmap的源码分析总可以看出来,很多代码都是了线程同步,保证线程安全。
    2. flatmap通过在MergeObserver中创建一系列的InnerObserver,在同一层级对所有通过mapper创建的观察源进行消费,然后通过内部订阅者持有的parent即MergeObserver将数据派发给MergeObserver下游。虽然这里的InnerObserver并没有显式的指定它的下游,但是它的下游从它的parent也就是MergeObserver“继承了”。
    3. RxJava是基于数据流的,分析它的代码一定要重点关注数据的流动,它的数据加工过程可以看成责任链模式的运用,它的数据分发则是观察者模式与代理模式的运用。
    4. 数据加工的链是通过subscribe方法链接起来的,而数据的分发则是通过onSubscribe方法来完成的

    相关文章

      网友评论

          本文标题:RxJava2.0操作符flatmap源码分析

          本文链接:https://www.haomeiwen.com/subject/dyzubftx.html