美文网首页
RxJava2源码分析——FlatMap和ConcatMap及其

RxJava2源码分析——FlatMap和ConcatMap及其

作者: 谭嘉俊 | 来源:发表于2019-11-15 03:10 被阅读0次

    本文章主要是对RxJava2FlatMapConcatMap这两个操作符进行源码分析,并且对其相关并发编程进行分析,在阅读之前,可以先阅读以下文章:

    RxJava2源码分析——订阅

    RxJava2源码分析——线程切换

    RxJava2源码分析——Map操作符

    本文章用的RxJavaRxAndroid版本如下:

    implementation 'io.reactivex.rxjava2:rxjava:2.2.6'
    implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
    

    FlatMap

    FlatMap操作符可以将一个发射数据的Observable转变为多个Observables,然后将这些发射的数据合并进一个单独的Observable,发射的数据不保证有序

    我们先写段示例代码,为了方便理解,在调用FlatMap方法的时候,我就不用上Lambda链式调用了,代码如下:

    Observable.create((ObservableOnSubscribe<String>) emitter -> {
        emitter.onNext("Tan:");
        emitter.onNext("Jia:");
        emitter.onNext("Jun:");
        emitter.onComplete();
    })
            .flatMap(new Function<String, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(String s) {
                    List<String> list = new ArrayList<>();
    
                    for (int i = 0; i < 3; i++) {
                        list.add(s + i);
                    }
                    return Observable.fromIterable(list);
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    // no implementation
                }
    
                @Override
                public void onNext(String s) {
                    Log.i("TanJiaJun", s);
                }
    
                @Override
                public void onError(Throwable e) {
                    // no implementation
                }
    
                @Override
                public void onComplete() {
                    // no implementation
                }
            });
    

    Log如下:


    FlatMapLog.png

    源码分析

    我们看下flatMap方法,分析可知,会依次调用以下方法,代码如下:

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
        // 注意:参数delayErrors传入的是false
        return flatMap(mapper, false);
    }
    
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors) {
        // 注意:参数maxConcurrency传入的是Integer.MAX_VALUE
        return flatMap(mapper, delayErrors, Integer.MAX_VALUE);
    }
    
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency) {
        // bufferSize是指数据缓冲区的大小,与背压(Backpressure)有关
        return flatMap(mapper, delayErrors, maxConcurrency, bufferSize());
    }
    
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        // 这里有个判断,判断this是不是ScalarCallable的实现类,详细解释请看下面
        if (this instanceof ScalarCallable) {
            @SuppressWarnings("unchecked")
            T v = ((ScalarCallable<T>)this).call();
            if (v == null) {
                return empty();
            }
            return ObservableScalarXMap.scalarXMap(v, mapper);
        }
        // 如果不是ScalarCallable的实现类就会调用以下方法
        return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
    }
    

    bufferSize()是数据缓冲区的大小,默认是128,可从以下代码得知:

    // Observable.java
    public static int bufferSize() {
        return Flowable.bufferSize();
    }
    
    // Flowable.java
    public static int bufferSize() {
        return BUFFER_SIZE;
    }
    
    // Flowable.java
    /** The default buffer size. */
    static final int BUFFER_SIZE;
    static {
        BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));
    }
    

    ScalarCallable是一个接口,它的实现类有6个FlowableEmptyFlowableJustMaybeEmptyMaybeJustObservableEmptyObservableJust,分别对应这6个方法:Flowable.empty()Flowable.just(T item)Maybe.empty()Maybe.just(T item)Observable.empty()Observable.just(T item)

    根据前几篇文章的经验可知,我们只要看ObservableFlatMap这个类就行了,代码如下:

    // ObservableFlatMap.java
    public ObservableFlatMap(ObservableSource<T> source,
            Function<? super T, ? extends ObservableSource<? extends U>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
        super(source);
        this.mapper = mapper;
        this.delayErrors = delayErrors;
        this.maxConcurrency = maxConcurrency;
        this.bufferSize = bufferSize;
    }
    
    @Override
    public void subscribeActual(Observer<? super U> t) {
    
        if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
            return;
        }
    
        source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
    }
    

    我们也像前几篇文章那样,先看下subscribeActual方法,这里会先调用ObservableScalarXMap.tryScalarXMapSubscribe方法,如果是true的话就return,这个方法中会判断source是不是Callable的实现类,如果是的话就会委托ObservableScalarXMap来发射事件,然后返回true,否则返回false,上面说的ScalarCallable接口就是继承Callable接口,所以我们主要是看下面的逻辑,调用了sourcesubscribe方法,并且传入new出来的MergeObserver,我们来看下MergeObserver这个类,要注意的点我都写上注释了,代码如下:

    // ObservableFlatMap.java
    // MergeObserver继承AtomicInteger
    static final class MergeObserver<T, U> extends AtomicInteger implements Disposable, Observer<T> {
    
        private static final long serialVersionUID = -2117620485640801370L;
    
        final Observer<? super U> downstream;
        final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
        final boolean delayErrors;
        final int maxConcurrency;
        final int bufferSize;
    
        volatile SimplePlainQueue<U> queue;
    
        volatile boolean done;
    
        final AtomicThrowable errors = new AtomicThrowable();
    
        volatile boolean cancelled;
    
        // 存放InnerObserver的数组
        final AtomicReference<InnerObserver<?, ?>[]> observers;
    
        static final InnerObserver<?, ?>[] EMPTY = new InnerObserver<?, ?>[0];
    
        static final InnerObserver<?, ?>[] CANCELLED = new InnerObserver<?, ?>[0];
    
        Disposable upstream;
    
        long uniqueId;
        long lastId;
        int lastIndex;
    
        Queue<ObservableSource<? extends U>> sources;
    
        int wip;
    
        MergeObserver(Observer<? super U> actual, Function<? super T, ? extends ObservableSource<? extends U>> mapper,
                boolean delayErrors, int maxConcurrency, int bufferSize) {
            this.downstream = actual;
            // mapper是Function接口的实现类
            this.mapper = mapper;
            this.delayErrors = delayErrors;
            this.maxConcurrency = maxConcurrency;
            this.bufferSize = bufferSize;
            // 根据上面的代码可知,传入的Integer.MAX_VALUE,所以这段逻辑不会
            if (maxConcurrency != Integer.MAX_VALUE) {
                sources = new ArrayDeque<ObservableSource<? extends U>>(maxConcurrency);
            }
            // 创建一个InnerObserver数组的原子引用
            this.observers = new AtomicReference<InnerObserver<?, ?>[]>(EMPTY);
        }
    
        @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.validate(this.upstream, d)) {
                this.upstream = d;
                downstream.onSubscribe(this);
            }
        }
    
        @Override
        public void onNext(T t) {
            // safeguard against misbehaving sources
            if (done) {
                return;
            }
            ObservableSource<? extends U> p;
            try {
                // 调用mapper的apply方法
                p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                upstream.dispose();
                onError(e);
                return;
            }
    
            // 在上面也分析过了,传入的Integer.MAX_VALUE,所以这段逻辑不会执行
            if (maxConcurrency != Integer.MAX_VALUE) {
                synchronized (this) {
                    if (wip == maxConcurrency) {
                        sources.offer(p);
                        return;
                    }
                    wip++;
                }
            }
    
            subscribeInner(p);
        }
    
        @SuppressWarnings("unchecked")
        void subscribeInner(ObservableSource<? extends U> p) {
            // 一个死循环
            for (;;) {
                // 判断p是不是Callable接口的实现类,上面分析过,这里不再赘述
                if (p instanceof Callable) {
                    if (tryEmitScalar(((Callable<? extends U>)p)) && maxConcurrency != Integer.MAX_VALUE) {
                        boolean empty = false;
                        synchronized (this) {
                            p = sources.poll();
                            if (p == null) {
                                wip--;
                                empty = true;
                            }
                        }
                        if (empty) {
                            drain();
                            break;
                        }
                    } else {
                        break;
                    }
                } else {
                    // 如果p不是Callable接口的实现类,创建InnerObserver
                    InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
                    // 调用addInner方法,将InnerObserver存放到observers数组中,下面会解析
                    if (addInner(inner)) {
                        // 对每次创建的InnerObserver进行订阅
                        p.subscribe(inner);
                    }
                    break;
                }
            }
        }
    
        boolean addInner(InnerObserver<T, U> inner) {
            // 又是一个死循环
            for (;;) {
                // 从observers数组取出InnerObserver
                InnerObserver<?, ?>[] a = observers.get();
                if (a == CANCELLED) {
                    // 如果是CANCELLED状态的就取消订阅
                    inner.dispose();
                    return false;
                }
                int n = a.length;
                // 创建新的InnerObserver数组,大小为a数组大小加1
                InnerObserver<?, ?>[] b = new InnerObserver[n + 1];
                // 将a数组数据复制到b数组
                System.arraycopy(a, 0, b, 0, n);
                // 将新建的InnerObserver放到b数组最后的位置
                b[n] = inner;
                // 将b数组数据原子性地更新到a数组中
                if (observers.compareAndSet(a, b)) {
                    // 如果成功就返回true
                    return true;
                }
            }
        }
    
        // 移除InnerObserver的方法
        void removeInner(InnerObserver<T, U> inner) {
            for (;;) {
                InnerObserver<?, ?>[] a = observers.get();
                int n = a.length;
                if (n == 0) {
                    return;
                }
                int j = -1;
                for (int i = 0; i < n; i++) {
                    if (a[i] == inner) {
                        j = i;
                        break;
                    }
                }
                if (j < 0) {
                    return;
                }
                InnerObserver<?, ?>[] b;
                if (n == 1) {
                    b = EMPTY;
                } else {
                    b = new InnerObserver<?, ?>[n - 1];
                    System.arraycopy(a, 0, b, 0, j);
                    System.arraycopy(a, j + 1, b, j, n - j - 1);
                }
                if (observers.compareAndSet(a, b)) {
                    return;
                }
            }
        }
    
        boolean tryEmitScalar(Callable<? extends U> value) {
            U u;
            try {
                u = value.call();
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                errors.addThrowable(ex);
                drain();
                return true;
            }
    
            if (u == null) {
                return true;
            }
    
            if (get() == 0 && compareAndSet(0, 1)) {
                downstream.onNext(u);
                if (decrementAndGet() == 0) {
                    return true;
                }
            } else {
                SimplePlainQueue<U> q = queue;
                if (q == null) {
                    if (maxConcurrency == Integer.MAX_VALUE) {
                        q = new SpscLinkedArrayQueue<U>(bufferSize);
                    } else {
                        q = new SpscArrayQueue<U>(maxConcurrency);
                    }
                    queue = q;
                }
    
                if (!q.offer(u)) {
                    onError(new IllegalStateException("Scalar queue full?!"));
                    return true;
                }
                if (getAndIncrement() != 0) {
                    return false;
                }
            }
            drainLoop();
            return true;
        }
    
        void tryEmit(U value, InnerObserver<T, U> inner) {
            // 判断get()是不是等于0,如果等于0就将值设为1
            if (get() == 0 && compareAndSet(0, 1)) {
                // 调用下游的onNext方法
                downstream.onNext(value);
                // 发射完数据后,判断自减1后的值是不是等于0,如果等于0,证明所有数据发射完成,方法结束
                if (decrementAndGet() == 0) {
                    return;
                }
            } else {
                SimpleQueue<U> q = inner.queue;
                if (q == null) {
                    // 创建SpscLinkedArrayQueue队列,它是一个单生产、单消费的数组队列,它可以在消费者变慢的情况下分配新的数组
                    q = new SpscLinkedArrayQueue<U>(bufferSize);
                    inner.queue = q;
                }
                // 将接收的上游数据缓存到队列中
                q.offer(value);
                // 判断值是不是不等于0后自增1,如果不等于0就结束方法
                if (getAndIncrement() != 0) {
                    return;
                }
            }
            // 调用drainLoop方法
            drainLoop();
        }
    
        @Override
        public void onError(Throwable t) {
            if (done) {
                RxJavaPlugins.onError(t);
                return;
            }
            if (errors.addThrowable(t)) {
                done = true;
                drain();
            } else {
                RxJavaPlugins.onError(t);
            }
        }
    
        @Override
        public void onComplete() {
            if (done) {
                return;
            }
            done = true;
            drain();
        }
    
        @Override
        public void dispose() {
            if (!cancelled) {
                cancelled = true;
                if (disposeAll()) {
                    Throwable ex = errors.terminate();
                    if (ex != null && ex != ExceptionHelper.TERMINATED) {
                        RxJavaPlugins.onError(ex);
                    }
                }
            }
        }
    
        @Override
        public boolean isDisposed() {
            return cancelled;
        }
    
        void drain() {
            if (getAndIncrement() == 0) {
                drainLoop();
            }
        }
    
        void drainLoop() {
            final Observer<? super U> child = this.downstream;
            int missed = 1;
            for (;;) {
                // 检查订阅是不是被终止,如果是,方法结束
                if (checkTerminate()) {
                    return;
                }
                // 将MergeObserver内的变量queue复制给svq,queue是一个队列
                SimplePlainQueue<U> svq = queue;
    
                if (svq != null) {
                    for (;;) {
                        // 再次检查订阅是不是被终止,如果是,方法结束
                        if (checkTerminate()) {
                            return;
                        }
    
                        // 从队列中取出数据
                        U o = svq.poll();
    
                        // 如果是null的话,跳出该循环
                        if (o == null) {
                            break;
                        }
    
                        // 调用下游Observer的onNext方法,发射数据
                        child.onNext(o);
                    }
                }
    
                boolean d = done;
                svq = queue;
                InnerObserver<?, ?>[] inner = observers.get();
                int n = inner.length;
    
                int nSources = 0;
                if (maxConcurrency != Integer.MAX_VALUE) {
                    synchronized (this) {
                        nSources = sources.size();
                    }
                }
    
                if (d && (svq == null || svq.isEmpty()) && n == 0 && nSources == 0) {
                    Throwable ex = errors.terminate();
                    if (ex != ExceptionHelper.TERMINATED) {
                        // 判断Throwable是不是null
                        if (ex == null) {
                            // 调用下游Observer的onComplete方法
                            child.onComplete();
                        } else {
                            // 调用下游Observer的onError方法
                            child.onError(ex);
                        }
                    }
                    return;
                }
    
                // 处理数组数据
                int innerCompleted = 0;
                if (n != 0) {
                    long startId = lastId;
                    int index = lastIndex;
    
                    if (n <= index || inner[index].id != startId) {
                        if (n <= index) {
                            index = 0;
                        }
                        int j = index;
                        for (int i = 0; i < n; i++) {
                            if (inner[j].id == startId) {
                                break;
                            }
                            j++;
                            if (j == n) {
                                j = 0;
                            }
                        }
                        index = j;
                        lastIndex = j;
                        lastId = inner[j].id;
                    }
    
                    int j = index;
                    sourceLoop:
                    for (int i = 0; i < n; i++) {
                        if (checkTerminate()) {
                            return;
                        }
    
                        @SuppressWarnings("unchecked")
                        InnerObserver<T, U> is = (InnerObserver<T, U>)inner[j];
                        SimpleQueue<U> q = is.queue;
                        if (q != null) {
                            // 处理InnerObserver数组中的每一个InnerObserver对象
                            for (;;) {
                                U o;
                                try {
                                    o = q.poll();
                                } catch (Throwable ex) {
                                    Exceptions.throwIfFatal(ex);
                                    is.dispose();
                                    errors.addThrowable(ex);
                                    if (checkTerminate()) {
                                        return;
                                    }
                                    removeInner(is);
                                    innerCompleted++;
                                    j++;
                                    if (j == n) {
                                        j = 0;
                                    }
                                    continue sourceLoop;
                                }
                                if (o == null) {
                                    break;
                                }
    
                                // 调用onNext方法,发射InnerObserver的数据
                                child.onNext(o);
    
                                // 检查订阅是不是被终止,如果是,方法结束
                                if (checkTerminate()) {
                                    return;
                                }
                            }
                        }
    
                        boolean innerDone = is.done;
                        SimpleQueue<U> innerQueue = is.queue;
                        // 检查队列里的数据是否处理完毕
                        if (innerDone && (innerQueue == null || innerQueue.isEmpty())) {
                            // 如果是,将对应的InnerObserver从数组中移除
                            removeInner(is);
                            // 检查订阅是不是被终止,如果是,方法结束
                            if (checkTerminate()) {
                                return;
                            }
                            // innerCompleted自增
                            innerCompleted++;
                        }
    
                        j++;
                        if (j == n) {
                            j = 0;
                        }
                    }
                    lastIndex = j;
                    lastId = inner[j].id;
                }
    
                // 判断innerCompleted是不是不等于0,也就是判断当前InnerObserver是否处理完毕
                if (innerCompleted != 0) {
                    if (maxConcurrency != Integer.MAX_VALUE) {
                        while (innerCompleted-- != 0) {
                            ObservableSource<? extends U> p;
                            synchronized (this) {
                                p = sources.poll();
                                if (p == null) {
                                    wip--;
                                    continue;
                                }
                            }
                            subscribeInner(p);
                        }
                    }
                    // 结束当前当前循环,进入下一个循环,继续处理下一个InnerObserver
                    continue;
                }
                // 数据发射完毕后,将值自减
                missed = addAndGet(-missed);
                // 如果missed等于0,证明队列中的所有数据全部发射完毕,跳出循环,方法结束
                if (missed == 0) {
                    break;
                }
            }
        }
    
        // 检查订阅是不是被终止的方法
        boolean checkTerminate() {
            if (cancelled) {
                return true;
            }
            Throwable e = errors.get();
            if (!delayErrors && (e != null)) {
                disposeAll();
                e = errors.terminate();
                if (e != ExceptionHelper.TERMINATED) {
                    downstream.onError(e);
                }
                return true;
            }
            return false;
        }
    
        boolean disposeAll() {
            upstream.dispose();
            InnerObserver<?, ?>[] a = observers.get();
            if (a != CANCELLED) {
                a = observers.getAndSet(CANCELLED);
                if (a != CANCELLED) {
                    for (InnerObserver<?, ?> inner : a) {
                        inner.dispose();
                    }
                    return true;
                }
            }
            return false;
        }
    }
    
    // InnerObserver继承AtomicReference
    static final class InnerObserver<T, U> extends AtomicReference<Disposable>
    implements Observer<U> {
    
        private static final long serialVersionUID = -4606175640614850599L;
        final long id;
        final MergeObserver<T, U> parent;
    
        volatile boolean done;
        volatile SimpleQueue<U> queue;
    
        int fusionMode;
    
        InnerObserver(MergeObserver<T, U> parent, long id) {
            this.id = id;
            this.parent = parent;
        }
    
        @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.setOnce(this, d)) {
                if (d instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    QueueDisposable<U> qd = (QueueDisposable<U>) d;
    
                    // requestFusion和背压(Backpressure)有关,因为我们这里没用到相关的类,所以fusionMode的值为0
                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
                    // 订阅类型是同步
                    if (m == QueueDisposable.SYNC) {
                        fusionMode = m;
                        queue = qd;
                        done = true;
                        // 发射数据
                        parent.drain();
                        return;
                    }
                    // 订阅类型是异步
                    if (m == QueueDisposable.ASYNC) {
                        fusionMode = m;
                        queue = qd;
                    }
                }
            }
        }
    
        @Override
        public void onNext(U t) {
            // 根据上面分析可知,fusionMode的值为0,所以等于QueueDisposable.NONE
            if (fusionMode == QueueDisposable.NONE) {
                // 调用tryEmit方法
                parent.tryEmit(t, this);
            } else {
                parent.drain();
            }
        }
    
        @Override
        public void onError(Throwable t) {
            if (parent.errors.addThrowable(t)) {
                if (!parent.delayErrors) {
                    parent.disposeAll();
                }
                done = true;
                parent.drain();
            } else {
                RxJavaPlugins.onError(t);
            }
        }
    
        @Override
        public void onComplete() {
            done = true;
            parent.drain();
        }
    
        public void dispose() {
            DisposableHelper.dispose(this);
        }
    }
    

    ConcatMap

    ConcatMap操作符可以将一个发射数据的Observable转变为多个Observables,然后将这些发射的数据合并进一个单独的Observable,发射的数据保证有序

    我们先写段示例代码,为了方便理解,在调用ConcatMap方法的时候,我就不用上Lambda链式调用了,代码如下:

    Observable.create((ObservableOnSubscribe<String>) emitter -> {
        emitter.onNext("Tan:");
        emitter.onNext("Jia:");
        emitter.onNext("Jun:");
        emitter.onComplete();
    })
            .concatMap(new Function<String, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(String s) {
                    List<String> list = new ArrayList<>();
    
                    for (int i = 0; i < 3; i++) {
                        list.add(s + i);
                    }
                    return Observable.fromIterable(list);
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    // no implementation
                }
    
                @Override
                public void onNext(String s) {
                    Log.i("TanJiaJun", s);
                }
    
                @Override
                public void onError(Throwable e) {
                    // no implementation
                }
    
                @Override
                public void onComplete() {
                    // no implementation
                }
            });
    

    Log如下:


    ConcatMapLog.png

    源码分析

    我们看下ConcatMap方法,分析可知,会依次调用以下方法,代码如下:

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
        return concatMap(mapper, 2);
    }
    
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, int prefetch) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        ObjectHelper.verifyPositive(prefetch, "prefetch");
        // 判断this是不是ScalarCallable,上面分析过了,这里不再赘述
        if (this instanceof ScalarCallable) {
            @SuppressWarnings("unchecked")
            T v = ((ScalarCallable<T>)this).call();
            if (v == null) {
                return empty();
            }
            return ObservableScalarXMap.scalarXMap(v, mapper);
        }
        // 最后一个参数delayErrors传入的是ErrorMode.IMMEDIATE
        return RxJavaPlugins.onAssembly(new ObservableConcatMap<T, R>(this, mapper, prefetch, ErrorMode.IMMEDIATE));
    }
    

    根据前几篇文章的经验可知,我们只要看ObservableConcatMap这个类就行了,代码如下:

    public ObservableConcatMap(ObservableSource<T> source, Function<? super T, ? extends ObservableSource<? extends U>> mapper,
            int bufferSize, ErrorMode delayErrors) {
        super(source);
        this.mapper = mapper;
        this.delayErrors = delayErrors;
        this.bufferSize = Math.max(8, bufferSize);
    }
    
    @Override
    public void subscribeActual(Observer<? super U> observer) {
    
        if (ObservableScalarXMap.tryScalarXMapSubscribe(source, observer, mapper)) {
            return;
        }
    
        // 这里delayErrors传入的是ErrorMode.IMMEDIATE
        if (delayErrors == ErrorMode.IMMEDIATE) {
            // 对observer进行序列化
            SerializedObserver<U> serial = new SerializedObserver<U>(observer);
            // 调用订阅方法,并且传入new出来的SourceObserver
            source.subscribe(new SourceObserver<T, U>(serial, mapper, bufferSize));
        } else {
            source.subscribe(new ConcatMapDelayErrorObserver<T, U>(observer, mapper, bufferSize, delayErrors == ErrorMode.END));
        }
    }
    

    我们看下SourceObserver这个类,有些源码的逻辑和FlatMap比较相似,这里就不再赘述了,代码如下:

    // ObservableConcatMap.java
    static final class SourceObserver<T, U> extends AtomicInteger implements Observer<T>, Disposable {
    
        private static final long serialVersionUID = 8828587559905699186L;
        final Observer<? super U> downstream;
        final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
        final InnerObserver<U> inner;
        final int bufferSize;
    
        SimpleQueue<T> queue;
    
        Disposable upstream;
    
        volatile boolean active;
    
        volatile boolean disposed;
    
        volatile boolean done;
    
        int fusionMode;
    
        SourceObserver(Observer<? super U> actual,
                                Function<? super T, ? extends ObservableSource<? extends U>> mapper, int bufferSize) {
            this.downstream = actual;
            this.mapper = mapper;
            this.bufferSize = bufferSize;
            this.inner = new InnerObserver<U>(actual, this);
        }
    
        @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.validate(this.upstream, d)) {
                this.upstream = d;
                if (d instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    QueueDisposable<T> qd = (QueueDisposable<T>) d;
    
                    // requestFusion和背压(Backpressure)有关,因为我们这里没用到相关的类,所以fusionMode的值为0
                    int m = qd.requestFusion(QueueDisposable.ANY);
                    // 订阅关系是同步
                    if (m == QueueDisposable.SYNC) {
                        fusionMode = m;
                        queue = qd;
                        done = true;
    
                        downstream.onSubscribe(this);
    
                        drain();
                        return;
                    }
    
                    // 订阅关系是异步
                    if (m == QueueDisposable.ASYNC) {
                        fusionMode = m;
                        queue = qd;
    
                        downstream.onSubscribe(this);
    
                        return;
                    }
                }
    
                // 创建一个大小为数据缓冲区大小的队列
                queue = new SpscLinkedArrayQueue<T>(bufferSize);
    
                // 调用下游Observer的onSubscribe方法
                downstream.onSubscribe(this);
            }
        }
    
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            // 根据上面分析可知,fusionMode的值为0,所以等于QueueDisposable.NONE
            if (fusionMode == QueueDisposable.NONE) {
                // 将接收的上游数据缓存到队列中
                queue.offer(t);
            }
            // 调用drain方法
            drain();
        }
    
        @Override
        public void onError(Throwable t) {
            if (done) {
                RxJavaPlugins.onError(t);
                return;
            }
            done = true;
            dispose();
            downstream.onError(t);
        }
    
        @Override
        public void onComplete() {
            if (done) {
                return;
            }
            done = true;
            drain();
        }
    
        void innerComplete() {
            active = false;
            drain();
        }
    
        @Override
        public boolean isDisposed() {
            return disposed;
        }
    
        @Override
        public void dispose() {
            disposed = true;
            inner.dispose();
            upstream.dispose();
    
            if (getAndIncrement() == 0) {
                queue.clear();
            }
        }
    
        void drain() {
            // 判断值是不是不等于0后自增1,如果不等于0就结束方法
            if (getAndIncrement() != 0) {
                return;
            }
    
            for (;;) {
                // 判断是不是结束订阅
                if (disposed) {
                    // 如果是,就清空队列,结束方法
                    queue.clear();
                    return;
                }
                // active是用volatile修饰,active是用来判断当前是否还有InnerObserver在发射,所以能保证发射InnerObserver是有序的,这点和FlatMap不一样
                if (!active) {
    
                    boolean d = done;
    
                    T t;
    
                    try {
                        // 从队列取出数据
                        t = queue.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        dispose();
                        queue.clear();
                        downstream.onError(ex);
                        return;
                    }
    
                    boolean empty = t == null;
    
                    // 判断是否发射完毕,同时队列是否还有数据
                    if (d && empty) {
                        // 如果发射完毕,同时队列是没有数据的话,结束订阅,调用下游Observer的onComplete方法
                        disposed = true;
                        downstream.onComplete();
                        return;
                    }
    
                    // 再次判断队列是否还有数据
                    if (!empty) {
                        // 如果队列还有数据,执行以下逻辑
                        ObservableSource<? extends U> o;
    
                        try {
                            // 调用mapper的apply方法
                            o = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
                        } catch (Throwable ex) {
                            Exceptions.throwIfFatal(ex);
                            dispose();
                            queue.clear();
                            downstream.onError(ex);
                            return;
                        }
    
                        // active设为true,表示当前还在发射数据,其他任务就进入不了上面所说的判断了
                        active = true;
                        // 调用下游Observer的订阅方法
                        o.subscribe(inner);
                    }
                }
    
                // 发射完数据后,判断自减1后的值是不是等于0,如果等于0,证明所有数据发射完成,方法结束
                if (decrementAndGet() == 0) {
                    break;
                }
            }
        }
    
        // InnerObserver继承AtomicReference
        static final class InnerObserver<U> extends AtomicReference<Disposable> implements Observer<U> {
    
            private static final long serialVersionUID = -7449079488798789337L;
    
            final Observer<? super U> downstream;
            final SourceObserver<?, ?> parent;
    
            InnerObserver(Observer<? super U> actual, SourceObserver<?, ?> parent) {
                this.downstream = actual;
                this.parent = parent;
            }
    
            @Override
            public void onSubscribe(Disposable d) {
                DisposableHelper.replace(this, d);
            }
    
            @Override
            public void onNext(U t) {
                // 调用下游Observer的onNext方法
                downstream.onNext(t);
            }
    
            @Override
            public void onError(Throwable t) {
                parent.dispose();
                downstream.onError(t);
            }
    
            @Override
            public void onComplete() {
                parent.innerComplete();
            }
    
            void dispose() {
                DisposableHelper.dispose(this);
            }
        }
    }
    

    FlatMap和ConcatMap对比

    在做对比之前,我改下上面的两段示例代码,都调用delay方法,延迟1s发射,代码如下:

    FlatMap:

    Observable.create((ObservableOnSubscribe<String>) emitter -> {
        emitter.onNext("Tan:");
        emitter.onNext("Jia:");
        emitter.onNext("Jun:");
        emitter.onComplete();
    })
            .flatMap(new Function<String, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(String s) {
                    List<String> list = new ArrayList<>();
    
                    for (int i = 0; i < 3; i++) {
                        list.add(s + i);
                    }
                    return Observable.fromIterable(list).delay(1, TimeUnit.SECONDS);
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    // no implementation
                }
    
                @Override
                public void onNext(String s) {
                    Log.i("TanJiaJun", s);
                }
    
                @Override
                public void onError(Throwable e) {
                    // no implementation
                }
    
                @Override
                public void onComplete() {
                    // no implementation
                }
            });
    

    Log如下:


    FlatMapDelayLog.png

    ConcatMap:

    Observable.create((ObservableOnSubscribe<String>) emitter -> {
        emitter.onNext("Tan:");
        emitter.onNext("Jia:");
        emitter.onNext("Jun:");
        emitter.onComplete();
    })
            .concatMap(new Function<String, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(String s) {
                    List<String> list = new ArrayList<>();
    
                    for (int i = 0; i < 3; i++) {
                        list.add(s + i);
                    }
                    return Observable.fromIterable(list).delay(1, TimeUnit.SECONDS);
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    // no implementation
                }
    
                @Override
                public void onNext(String s) {
                    Log.i("TanJiaJun", s);
                }
    
                @Override
                public void onError(Throwable e) {
                    // no implementation
                }
    
                @Override
                public void onComplete() {
                    // no implementation
                }
            });
    

    Log如下:


    ConcatMapDelay.png

    我这里发射了3组数据,需要注意的是,我们发现FlatMap3组数据都是不按顺序的,但是每组数据里发射的数据都是按顺序的ConcatMap3组数据都是按顺序的,而且每组数据里发射的数据也是按顺序的,那为什么这样呢?其实上面阅读源码的时候也稍微提及了下,这里再详细解释下,因为FlatMap对应的MergeObserverConcatMap对应的SourceObserver都继承了AtomicInteger,在解释这个类前,先说下几个概念。

    volatile

    volatile的语义:

    1. volatile修饰的变量的操作不保证是原子性的。
    2. Java内存模型不会对volatile指令进行重排序优化,可以保证对volatile变量的操作是按照指令的顺序执行。
    3. volatile修饰的变量能保证对所有线程的可见性,每次修改值都会立刻同步回主内存,每次读取值都会从主内存中重新读取。

    指令重排序

    处理器通过缓存能够从数量级上降低内存延迟的成本,因为对主存的一次访问需要花费硬件多次的时钟周期,而这些缓存为了性能会重新排列待定内存操作的顺序,也就是重排序,这里有个前提,Java内存模型(Java Memory Model)通过先行发生原则(happen-before)保证顺序执行语义,对一个volatile变量的写操作先行发生于后面对这个变量的读操作,这里的先后指的是时间上的顺序,在这里举个例子:

    Object object = new Object();
    

    这条语句会转成多条汇编指令,大致做了以下三件事情

    1. Object类实例分配内存空间。
    2. 初始化Object对象
    3. object变量指向刚分配的内存,这时候object变量就不是null了。

    因为Java编译器允许指令重排序对其优化,上面这3个步骤可能1->2->3或者是1->3->2,但是步骤1肯定是第一个执行的,因为做指令重排序有个前提,就是必须遵循先行发生原则,保证最后是正确的执行结果,执行步骤2步骤3的前提是步骤1,必须为实例分配内存空间才能去初始化对象或者将变量指向分配的内存,步骤2步骤3之间不存在依赖关系,所以步骤2步骤3经过Java编译器优化后执行顺序不确定。

    单线程这样的优化是没有问题的,但是在多线程就会有问题了,这里我举个例子,我们会使用双重检查锁定(Double Check Locking,简称DCL)来实现单例,它是懒汉模式,代码如下:

    package com.tanjiajun.rxjavademo;
    
    /**
     * Created by TanJiaJun on 2019-11-14.
     */
    public class Singleton {
    
        // mInstance用volatile修饰,保证指令执行的顺序
        private static volatile Singleton mInstance;
    
        // 私有构造函数
        private Singleton() {
            // 防止通过反射调用构造函数造成单例失效
            if (mInstance != null) {
                throw new RuntimeException("Cannot construct a singleton more than once.");
            }
        }
    
        // 获取单例的方法
        public static Singleton getInstance() {
            // 第一次判断mInstance是否为null,判断是否需要同步,提高性能和效率
            if (mInstance == null) {
                synchronized (Singleton.class) {
                    // 第二次判断mInstance是否为null,判断是否已经创建实例
                    if (mInstance == null) {
                        mInstance = new Singleton();
                    }
                }
            }
            // 返回mInstance
            return mInstance;
        }
    
    }
    

    创建实例的这条语句会转成多条汇编指令,大概做了如下3件事情

    1. Singleton类实例分配内存空间。
    2. 初始化Singleton对象
    3. mInstance变量指向刚分配的内存,这时候mInstance变量就不是null了。

    如果我们不用volatile修饰,也不加同步锁的话,假设有两个线程,分别是AB,如果线程A创建实例步骤是1->3->2,当它执行步骤3的时候,这时候mInstance变量已经不是null了,线程B也执行getInstance方法,进入第一个判断,因为mInstance变量已经不是null了,所以就会创建另外一个实例了,造成单例失效

    CAS操作

    CAS(Compare And Swap),翻译过来就是比较和交换,它可以防止共享变量出现脏读脏写问题,保证了原子操作CAS也是乐观锁的一种实现方式,乐观锁是什么呢?乐观锁总是假设最好的情况,所以在数据进行提交更新的时候才会去检查是否有冲突。

    AtomicInteger

    我们看下AtomicInteger的源码,代码如下:

    public class AtomicInteger extends Number implements java.io.Serializable {
        private static final long serialVersionUID = 6214790243416807050L;
    
        // 用到sun.misc.Unsafe
        private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
        private static final long VALUE;
    
        static {
            try {
                // VALUE是内存偏移值
                VALUE = U.objectFieldOffset
                    (AtomicInteger.class.getDeclaredField("value"));
            } catch (ReflectiveOperationException e) {
                throw new Error(e);
            }
        }
    
        // 用volatile修饰value,保证指令的执行顺序
        private volatile int value;
    
        public AtomicInteger(int initialValue) {
            value = initialValue;
        }
    
        public AtomicInteger() {
        }
    
        public final int get() {
            return value;
        }
    
        public final void set(int newValue) {
            value = newValue;
        }
    
        public final void lazySet(int newValue) {
            U.putOrderedInt(this, VALUE, newValue);
        }
    
        public final int getAndSet(int newValue) {
            return U.getAndSetInt(this, VALUE, newValue);
        }
    
        // 主要看这个方法,它就是CAS操作,我会在下面解析
        public final boolean compareAndSet(int expect, int update) {
            return U.compareAndSwapInt(this, VALUE, expect, update);
        }
    
        public final boolean weakCompareAndSet(int expect, int update) {
            return U.compareAndSwapInt(this, VALUE, expect, update);
        }
    
        public final int getAndIncrement() {
            return U.getAndAddInt(this, VALUE, 1);
        }
    
        public final int getAndDecrement() {
            return U.getAndAddInt(this, VALUE, -1);
        }
    
        public final int getAndAdd(int delta) {
            return U.getAndAddInt(this, VALUE, delta);
        }
    
        public final int incrementAndGet() {
            return U.getAndAddInt(this, VALUE, 1) + 1;
        }
    
        public final int decrementAndGet() {
            return U.getAndAddInt(this, VALUE, -1) - 1;
        }
    
        public final int addAndGet(int delta) {
            return U.getAndAddInt(this, VALUE, delta) + delta;
        }
    
        public final int getAndUpdate(IntUnaryOperator updateFunction) {
            int prev, next;
            do {
                prev = get();
                next = updateFunction.applyAsInt(prev);
            } while (!compareAndSet(prev, next));
            return prev;
        }
    
        public final int updateAndGet(IntUnaryOperator updateFunction) {
            int prev, next;
            do {
                prev = get();
                next = updateFunction.applyAsInt(prev);
            } while (!compareAndSet(prev, next));
            return next;
        }
    
        public final int getAndAccumulate(int x,
                                          IntBinaryOperator accumulatorFunction) {
            int prev, next;
            do {
                prev = get();
                next = accumulatorFunction.applyAsInt(prev, x);
            } while (!compareAndSet(prev, next));
            return prev;
        }
    
        public final int accumulateAndGet(int x,
                                          IntBinaryOperator accumulatorFunction) {
            int prev, next;
            do {
                prev = get();
                next = accumulatorFunction.applyAsInt(prev, x);
            } while (!compareAndSet(prev, next));
            return next;
        }
    
        public String toString() {
            return Integer.toString(get());
        }
    
        public int intValue() {
            return get();
        }
    
        public long longValue() {
            return (long)get();
        }
    
        public float floatValue() {
            return (float)get();
        }
    
        public double doubleValue() {
            return (double)get();
        }
    
    }
    

    compareAndSet方法就是CAS操作,它是调用sun.misc.Unsafe里的compareAndSwapInt方法,这个方法是个native方法,其作用是每次从内存中根据内存偏移量(VALUE)取出的值和expect比较,如果数据一致就把内存中的值改为update

    结论

    因为FlatMap对应的MergeObserverConcatMap对应的SourceObserver都继承了AtomicInteger,根据之前的源码分析,它们两个操作符每组数据里发射数据的操作都是原子操作,因此它们都是按顺序的;不同的是,在ObservableConcatMap的源码中,我们可以看到它用volatile修饰的active布尔值判断当前是否还有InnerObserver在发射,但是在ObservableFlatMap的源码中没看到相关的逻辑,所以FlatMap发射的那几组数据是不按顺序的,ConcatMap发射的那几组数据是按顺序的。

    我的GitHub:TanJiaJunBeyond

    Android通用框架:Android通用框架

    我的掘金:谭嘉俊

    我的简书:谭嘉俊

    我的CSDN:谭嘉俊

    相关文章

      网友评论

          本文标题:RxJava2源码分析——FlatMap和ConcatMap及其

          本文链接:https://www.haomeiwen.com/subject/dmqjictx.html