RxJava2源码解析

作者: 1cf2c90a5564 | 来源:发表于2018-07-25 20:53 被阅读70次

    本文为博主原创文章,未经允许不得转载

    前言

    本文简析 RxJava2subscribeOnzip 操作符。

    术语解释

    Single.just().map().flatMap().subscribeOn().observeOn().subscribe();
    

    上述代码中,Singlesubscribe() 之间的都称为 操作符,想像一下自己就是其中一个 操作符,那么位于左边的便称为 上游,位于右边的则称为 下游,故上下游其实是相对的。

    一、subscribeOn

    demo

            // Case2: 在非UI线程执行并关注结果
            Single.fromCallable(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    return generateRandom();
                }
            }).subscribeOn(Schedulers.io()).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Logger.d(TAG, "test: accept(Integer integer) invoked on %s", Thread.currentThread().getName());
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    Logger.d(TAG, "test: accept(Throwable throwable) invoked on %s", Thread.currentThread().getName());
                }
            });
    

    fromCallable(Callable callable) [-> Single.java]

        public static <T> Single<T> fromCallable(final Callable<? extends T> callable) {
            ObjectHelper.requireNonNull(callable, "callable is null");
            // RxJavaPlugins 里是全局钩子函数,分析源码时无视即可,此处就是返回 SingleFromCallable
            return RxJavaPlugins.onAssembly(new SingleFromCallable<T>(callable));
        }
    

    SingleFromCallable [-> SingleFromCallable.java]

    // 注意继承自 Single,而 Single 实现了 SingleSource 接口,所以也继承了 subscribe() 方法
    public final class SingleFromCallable<T> extends Single<T> {
        // 回调函数
        final Callable<? extends T> callable;
    
        public SingleFromCallable(Callable<? extends T> callable) {
            // 保存为全局变量
            this.callable = callable;
        }
    
        @Override
        protected void subscribeActual(SingleObserver<? super T> observer) {
            // 一个 run() 方法体为空的 RunnableDisposable 对象,用来取消订阅
            Disposable d = Disposables.empty();
            // 调用下游(本示例此处为SubscribeOnObserver)的 onSubscribe()
            observer.onSubscribe(d);
    
            // 已取消订阅的,直接返回,不会发射任何值
            if (d.isDisposed()) {
                return;
            }
    
            T value;
            try {
                // 调用 callable.call() 获取值
                value = ObjectHelper.requireNonNull(callable.call(), "The callable returned a null value");
            } catch (Throwable ex) {
                // 捕获所有异常,所以使用 rxjava 时,自己写的方法收不到异常通知,需订阅一个 Consumer<Throwable>
                Exceptions.throwIfFatal(ex);
                if (!d.isDisposed()) {
                    // 发射一个 error 事件给下游
                    observer.onError(ex);
                } else {
                    RxJavaPlugins.onError(ex);
                }
                return;
            }
    
            if (!d.isDisposed()) {
                // 发射一个 success 事件给下游
                observer.onSuccess(value);
            }
        }
    }
    

    subscribeOn(Scheduler scheduler) [-> Single.java]

        public final Single<T> subscribeOn(final Scheduler scheduler) {
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
            // 返回 SingleSubscribeOn
            return RxJavaPlugins.onAssembly(new SingleSubscribeOn<T>(this, scheduler));
        }
    

    SingleSubscribeOn [-> SingleSubscribeOn.java]

    // 注意继承自 Single,而 Single 实现了 SingleSource 接口,所以也继承了 subscribe() 方法
    public final class SingleSubscribeOn<T> extends Single<T> {
        // 上游
        final SingleSource<? extends T> source;
        // 线程调度器
        final Scheduler scheduler;
    
        public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) {
            // 保存上游为 this.source
            this.source = source;
            // 保存线程调度器为 this.scheduler
            this.scheduler = scheduler;
        }
    
        @Override
        protected void subscribeActual(final SingleObserver<? super T> s) {
            // 将下游和上游包装为 SubscribeOnObserver
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s, source);
            // 调用下游的 onSubscribe(),此时还没有切换线程,所以 onSubscribe() 是在原线程执行的
            s.onSubscribe(parent);
            // 将 SubscribeOnObserver 扔到线程调度器中执行,此处就是 IoScheduler,内部实现基于 jdk 的 ExecutorService、FutureTask 和 Future
            Disposable f = scheduler.scheduleDirect(parent);
            // 将调度器返回的 Disposable(一个实现了 Disposable 和 Runnable 接口的 DisposeTask) 对象设置给 SubscribeOnObserver 的 task,用来取消订阅、中断线程执行
            parent.task.replace(f);
        }
    
        // 继承自 AtomicReference,实现了 SingleObserver、Disposable、Runnable接口
        // SingleObserver:当作下游
        // Disposable:传给下游以便下游用来取消订阅
        // Runnable:用来提交给 ExecutorService
        static final class SubscribeOnObserver<T> extends AtomicReference<Disposable>
        implements SingleObserver<T>, Disposable, Runnable {
            private static final long serialVersionUID = 7000911171163930287L;
            // 下游
            final SingleObserver<? super T> actual;
            // 用来取消订阅、中断线程执行
            final SequentialDisposable task;
            // 上游
            final SingleSource<? extends T> source;
    
            SubscribeOnObserver(SingleObserver<? super T> actual, SingleSource<? extends T> source) {
                // 将下游保存为 this.actual
                this.actual = actual;
                // 将上游保存为 this.source
                this.source = source;
                // 一个继承自 AtomicReference 实现了 Disposable 接口的对象,用来取消订阅、中断线程执行
                this.task = new SequentialDisposable();
            }
    
            @Override
            public void onSubscribe(Disposable d) {
                // 因为继承自 AtomicRefrence,此处将取消订阅的句柄(本示例中此处为 Disposables.empty())设置给内部的对象引用,用于取消对上游的订阅
                DisposableHelper.setOnce(this, d);
            }
    
            @Override
            public void onSuccess(T value) {
                // 上游调用 observer.onSuccess() 时,会调用到这里,此处继续调用下游的 onSuccess() 将值向下传递
                actual.onSuccess(value);
            }
    
            @Override
            public void onError(Throwable e) {
                // 上游调用 observer.onError() 时,会调用到这里,此处继续调用下游的 onError() 将错误向下传递
                actual.onError(e);
            }
    
            @Override
            public void dispose() {
                // 取消对上游的订阅
                DisposableHelper.dispose(this);
                // 中断线程执行
                task.dispose();
            }
    
            @Override
            public boolean isDisposed() {
                return DisposableHelper.isDisposed(get());
            }
    
            @Override
            public void run() {
                // 在线程池中执行 source.subscribe(),本示例会触发:
                // SingleFromCallable.subscribe()->
                // SingleFromCallable.suscribeActual()->
                // this.onSuccess(callable.call())
                source.subscribe(this);
            }
        }
    }
    

    subscribe() [-> Single.java]

        public final Disposable subscribe(final Consumer<? super T> onSuccess, final Consumer<? super Throwable> onError) {
            ObjectHelper.requireNonNull(onSuccess, "onSuccess is null");
            ObjectHelper.requireNonNull(onError, "onError is null");
    
            // 将 successConsumer、throwableConsumer 包装成 ConsumerSingleObserver,作为观察者
            ConsumerSingleObserver<T> s = new ConsumerSingleObserver<T>(onSuccess, onError);
            // 调用 subscribe(SingleObserver subscriber)
            subscribe(s);
            // ConsumerSingleObserver 实现了 Disposable 接口,持有它,可以用来取消订阅 
            return s;
        }
    

    subscribe(SingleObserver subscriber) [-> Single.java]

        public final void subscribe(SingleObserver<? super T> subscriber) {
            ObjectHelper.requireNonNull(subscriber, "subscriber is null");
            subscriber = RxJavaPlugins.onSubscribe(this, subscriber);
            ObjectHelper.requireNonNull(subscriber, "subscriber returned by the RxJavaPlugins hook is null");
    
            try {
                // 继续调用抽象方法 subscribeActual(SingleObserver subscriber),即调用子类的 subscribeActual(SingleObserver subscriber)
                // 本示例中,此处子类为 SingleSubscribeOn,源码分析见上方
                subscribeActual(subscriber);
            } catch (NullPointerException ex) {
                throw ex;
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                NullPointerException npe = new NullPointerException("subscribeActual failed");
                npe.initCause(ex);
                throw npe;
            }
        }
    

    ConsumerSingleObserver [->ConsumerSingleObserver .java]

    public final class ConsumerSingleObserver<T> extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable, LambdaConsumerIntrospection {
        private static final long serialVersionUID = -7012088219455310787L;
        // successConsumer
        final Consumer<? super T> onSuccess;
        // throwableConsumer
        final Consumer<? super Throwable> onError;
    
        public ConsumerSingleObserver(Consumer<? super T> onSuccess, Consumer<? super Throwable> onError) {
            // 将 successConsumer 保存为全局变量 this.onSuccess
            this.onSuccess = onSuccess;
            // 将 throwableConsumer 保存为全局变量 this.onError
            this.onError = onError;
        }
    
        @Override
        public void onError(Throwable e) {、
            // 最后设置为已取消订阅
            lazySet(DisposableHelper.DISPOSED);
            try {
                // 回调 throwableConsumer
                onError.accept(e);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                RxJavaPlugins.onError(new CompositeException(e, ex));
            }
        }
    
        @Override
        public void onSubscribe(Disposable d) {
            // 本示例中,d 为 SubscribeOnObserver
            DisposableHelper.setOnce(this, d);
        }
    
        @Override
        public void onSuccess(T value) {
            // 最后设置为已取消订阅
            lazySet(DisposableHelper.DISPOSED);
            try {
                // 回调 successConsumer
                onSuccess.accept(value);
            } catch (Throwable ex) {
                // 异常被 rxjava 捕获,所以自己写的 successConsumer 收不到异常
                Exceptions.throwIfFatal(ex);
                RxJavaPlugins.onError(ex);
            }
        }
    
        @Override
        public void dispose() {
            // 取消订阅
            DisposableHelper.dispose(this);
        }
    
        @Override
        public boolean isDisposed() {
            return get() == DisposableHelper.DISPOSED;
        }
    
        @Override
        public boolean hasCustomOnError() {
            return onError != Functions.ON_ERROR_MISSING;
        }
    }
    

    时序图

    Sequence Diagram

    二、zip

    Demo

            // Case6: 并发读取不同数据源,转换成同类型后,合并
            Single<IBook> novel = Single.fromCallable(new Callable<Novel>() {
                @Override
                public Novel call() throws Exception {
                    return getNovel();
                }
            }).map(new Function<Novel, IBook>() {
                @Override
                public IBook apply(Novel novel) throws Exception {
                    return new NovelAdapter(novel);
                }
            }).subscribeOn(Schedulers.io());
    
            Single<IBook> rxJava2Tutorial = Single.fromCallable(new Callable<RxJava2Tutorial>() {
                @Override
                public RxJava2Tutorial call() throws Exception {
                    return getRxJava2Tutorial();
                }
            }).map(new Function<RxJava2Tutorial, IBook>() {
                @Override
                public IBook apply(RxJava2Tutorial rxJava2Tutorial) throws Exception {
                    return new RxJava2TutorialAdapter(rxJava2Tutorial);
                }
            }).subscribeOn(Schedulers.io());
    
            // 注意此处调用的是合并两个 SingleSource 的方法,zip 操作符的重载方法很多,从 2~9 都有,相应的变换函数也有从 2~9,无语啊~
            Single.zip(novel, rxJava2Tutorial, new BiFunction<IBook, IBook, List<IBook>>() {
                @Override
                public List<IBook> apply(IBook iBook, IBook iBook2) throws Exception {
                    List<IBook> books = new ArrayList<>(2);
                    books.add(iBook);
                    books.add(iBook2);
                    return books;
                }
            }).subscribe(new Consumer<List<IBook>>() {
                @Override
                public void accept(List<IBook> iBooks) throws Exception {
                    Logger.d(TAG, "test: books are " + iBooks);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    Logger.d(TAG, "test: get books error.", throwable);
                }
            });
    

    上述代码分别读取 NovelRxJava2Tutorial 两种不同类型书籍,再分别转化为 IBook 类型,然后添加到同一数组中,最后发射给下游。

    zip(SingleSource source1, SingleSource source2, BiFunction zipper) [-> Single.java]

        public static <T1, T2, R> Single<R> zip(
                SingleSource<? extends T1> source1, SingleSource<? extends T2> source2,
                BiFunction<? super T1, ? super T2, ? extends R> zipper
         ) {
            ObjectHelper.requireNonNull(source1, "source1 is null");
            ObjectHelper.requireNonNull(source2, "source2 is null");
            return zipArray(Functions.toFunction(zipper), source1, source2);
        }
    

    toFunction(BiFunction f) [-> Functions.java]

        public static <T1, T2, R> Function<Object[], R> toFunction(final BiFunction<? super T1, ? super T2, ? extends R> f) {
            ObjectHelper.requireNonNull(f, "f is null");
            // 注意因为上面调用的是合并两个 SingleSource 的方法,所以这里调用的就是 Array2Func,2表示合并个数,像这样的还有 Array3Func、Array4Func、... Array9Func
            // 作用就是把多参数的 BiFunction 统一转化为一个参数(Object[])的 Function 对象,调用的时候再把参数从 Object[] 里取出来即可
            return new Array2Func<T1, T2, R>(f);
        }
    

    Array2Func [-> Functions::Array2Func]

        static final class Array2Func<T1, T2, R> implements Function<Object[], R> {
            final BiFunction<? super T1, ? super T2, ? extends R> f;
    
            Array2Func(BiFunction<? super T1, ? super T2, ? extends R> f) {
                this.f = f;
            }
    
            @SuppressWarnings("unchecked")
            @Override
            public R apply(Object[] a) throws Exception {
                if (a.length != 2) {
                    throw new IllegalArgumentException("Array of size 2 expected but got " + a.length);
                }
                // 从 Object[] 中取出实参,然后调用实际的合并函数
                return f.apply((T1)a[0], (T2)a[1]);
            }
        }
    

    zipArray(Function zipper, SingleSource... sources) [-> Single.java]

        public static <T, R> Single<R> zipArray(Function<? super Object[], ? extends R> zipper, SingleSource<? extends T>... sources) {
            ObjectHelper.requireNonNull(zipper, "zipper is null");
            ObjectHelper.requireNonNull(sources, "sources is null");
            if (sources.length == 0) {
                return error(new NoSuchElementException());
            }
            return RxJavaPlugins.onAssembly(new SingleZipArray<T, R>(sources, zipper));
        }
    

    SingleZipArray [-> SingleZipArray.java]

    // 继承自 Single,而 Single 实现了 SingleSource 接口,所以也继承了 subscribe() 方法
    public final class SingleZipArray<T, R> extends Single<R> {
        // 用来保存要合并的 SingleSource
        final SingleSource<? extends T>[] sources;
        // 用来保存合并函数
        final Function<? super Object[], ? extends R> zipper;
    
        public SingleZipArray(SingleSource<? extends T>[] sources, Function<? super Object[], ? extends R> zipper) {
            // 将 SingleSource 保存为 this.sources
            this.sources = sources;
            // 将合并函数保存为 this.zipper
            this.zipper = zipper;
        }
    
        // 通过上面 subscribeOn 的源码分析可知,调用 subscribe() 时,便会调用到上游的 subscribeActual()
        // 此处的 observer 同样也是 ConsumerSingleObserver
        @Override
        protected void subscribeActual(SingleObserver<? super R> observer) {
            SingleSource<? extends T>[] sources = this.sources;
            int n = sources.length;
            // 本示例合并的 SingleSource 个数为 2,即 n=2
            if (n == 1) {
                sources[0].subscribe(new SingleMap.MapSingleObserver<T, R>(observer, new SingletonArrayFunc()));
                return;
            }
    
            // 将 ConsumerSingleObserver、SingleSource个数、合并函数封装为 ZipCoordinator,用来等待所有 
            // SingleSource 都处理完,然后对其发射的值应用合并函数 
            ZipCoordinator<T, R> parent = new ZipCoordinator<T, R>(observer, n, zipper);
            // 调用 ConsumerSingleObserver 的 onSubscribe()
            observer.onSubscribe(parent);
            // 一个 for 循环,挨个调用 SingleSource 的 subscribe(),触发生产者开始生产
            for (int i = 0; i < n; i++) {
                if (parent.isDisposed()) {
                    return;
                }
    
                SingleSource<? extends T> source = sources[i];
    
                if (source == null) {
                    parent.innerError(new NullPointerException("One of the sources is null"), i);
                    return;
                }
    
                source.subscribe(parent.observers[i]);
            }
        }
        ......
    }
    

    ZipCoordinator [-> SingleZipArray::ZipCoordinator]

        // 合并函数协调器,注意继承自 AtomicInteger,以便采用计数法检测是否所有的 SingleSource 都发射完毕
        static final class ZipCoordinator<T, R> extends AtomicInteger implements Disposable {
            private static final long serialVersionUID = -5556924161382950569L;
            // 保存下游观察者,本示例此处为 ConsumerSingleObserver
            final SingleObserver<? super R> actual;
            // 保存合并函数
            final Function<? super Object[], ? extends R> zipper;
            // SingleZipArray 的直接观察者,用来分别接收每个 SingleSource 发射的结果
            // 每收到一个值 (即每回调一次 ZipSingleObserver 的 onSuccess()),计数值-1,直至计数值为0,说明全部发射完毕
            final ZipSingleObserver<T>[] observers;
            // 保存每个 SingleSource 发射的结果
            final Object[] values;
    
            @SuppressWarnings("unchecked")
            ZipCoordinator(SingleObserver<? super R> observer, int n, Function<? super Object[], ? extends R> zipper) {
                // 因为继承自 AtomicInteger,所以调用父类构造器,设置计数值
                super(n);
                // 将下游观察者 ConsumerSingleObserver 保存为 this.actual,用来接收合并后的结果
                this.actual = observer;
                // 将合并函数保存为 this.zipper
                this.zipper = zipper;
                // 根据 SingleSource 的个数,生成相应个数的 SingleObserver,然后保存为 this.observers
                ZipSingleObserver<T>[] o = new ZipSingleObserver[n];
                for (int i = 0; i < n; i++) {
                    o[i] = new ZipSingleObserver<T>(this, i);
                }
                this.observers = o;
                // 根据 SingleSource 的个数,生成相应长度的 Object[],用来保存它们发射的结果
                this.values = new Object[n];
            }
    
            @Override
            public boolean isDisposed() {
                return get() <= 0;
            }
    
            @Override
            public void dispose() {
                if (getAndSet(0) > 0) {
                    for (ZipSingleObserver<?> d : observers) {
                        d.dispose();
                    }
                }
            }
    
            // 上游调用 ZipSingleObserver::onSuccess() 时,便会调用该方法,触发计数值-1
            void innerSuccess(T value, int index) {
                values[index] = value;
                // 判断计数值是否已减至零
                if (decrementAndGet() == 0) {
                    // 计数值为0,说明 SingleSource 全部发射完毕,可以调用合并函数了
                    R v;
                    try {
                        // 调用合并函数,获得合并后的结果
                        v = ObjectHelper.requireNonNull(zipper.apply(values), "The zipper returned a null value");
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        actual.onError(ex);
                        return;
                    }
                    // 将合并后的结果发射给下游,即 ConsumerSingleObserver 
                    actual.onSuccess(v);
                }
            }
    
            void disposeExcept(int index) {
                ZipSingleObserver<T>[] observers = this.observers;
                int n = observers.length;
                for (int i = 0; i < index; i++) {
                    observers[i].dispose();
                }
                for (int i = index + 1; i < n; i++) {
                    observers[i].dispose();
                }
            }
    
            void innerError(Throwable ex, int index) {
                if (getAndSet(0) > 0) {
                    disposeExcept(index);
                    actual.onError(ex);
                } else {
                    RxJavaPlugins.onError(ex);
                }
            }
        }
    

    ZipSingleObserver [-> SingleZipArray::ZipSingleObserver]

        // 用来接受 SingleSource 发射的结果 
        static final class ZipSingleObserver<T> extends AtomicReference<Disposable>
        implements SingleObserver<T> {
            private static final long serialVersionUID = 3323743579927613702L;
            // zip 协调器,用来触发计数值-1、计数值为0时调用合并函数并发射合并结果
            final ZipCoordinator<T, ?> parent;
            // 接受第几个 SingleSource 的结果
            final int index;
    
            ZipSingleObserver(ZipCoordinator<T, ?> parent, int index) {
                this.parent = parent;
                this.index = index;
            }
    
            public void dispose() {
                DisposableHelper.dispose(this);
            }
    
            @Override
            public void onSubscribe(Disposable d) {
                DisposableHelper.setOnce(this, d);
            }
    
            @Override
            public void onSuccess(T value) {
                // SingleSource 发射结果时,调用到这里
                // 调用 ZipCoordinator::innerSuccess()
                parent.innerSuccess(value, index);
            }
    
            @Override
            public void onError(Throwable e) {
                // SingleSource 发射错误时,调用到这里
                // 调用 ZipCoordinator::innerError
                parent.innerError(e, index);
            }
        }
    

    总结

    此情此景,我想吟诗一首:

    《源码分析》
         --尼古拉斯·Yulo
    RxJava 真牛逼,
    你且看我来分析。
    链式调用很神奇,
    线程调度也随意。
    码农工作不容易,
    下班还得把学习。
    要问源码哪里有?
    还得简书看Yulo。
    

    奥,类图忘了贴了:

    Class Diagram

    总结:

    调用时序简化版

    除了最上层的被观察者和最下层的观察者,中间的 Single 子类必有一与之对应的 SingleObserver 实现类,总结起来就是:

    • 我的下游的下游不是我的下游
    • 我的上游的上游不是我的上游
    • 我只能访问我的直接上游和直接下游

    嗯,这大概可以起名叫 异步责任链模式。。。

    别搜了,这名字是我自己想的!

    相关文章

      网友评论

      本文标题:RxJava2源码解析

      本文链接:https://www.haomeiwen.com/subject/hlzsmftx.html