RxJava2源码解析

作者: 1cf2c90a5564 | 来源:发表于2018-07-25 20:53 被阅读70次

本文为博主原创文章,未经允许不得转载

前言

本文简析 RxJava2subscribeOnzip 操作符。

术语解释

Single.just().map().flatMap().subscribeOn().observeOn().subscribe();

上述代码中,Singlesubscribe() 之间的都称为 操作符,想像一下自己就是其中一个 操作符,那么位于左边的便称为 上游,位于右边的则称为 下游,故上下游其实是相对的。

一、subscribeOn

demo

        // Case2: 在非UI线程执行并关注结果
        Single.fromCallable(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return generateRandom();
            }
        }).subscribeOn(Schedulers.io()).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Logger.d(TAG, "test: accept(Integer integer) invoked on %s", Thread.currentThread().getName());
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Logger.d(TAG, "test: accept(Throwable throwable) invoked on %s", Thread.currentThread().getName());
            }
        });

fromCallable(Callable callable) [-> Single.java]

    public static <T> Single<T> fromCallable(final Callable<? extends T> callable) {
        ObjectHelper.requireNonNull(callable, "callable is null");
        // RxJavaPlugins 里是全局钩子函数,分析源码时无视即可,此处就是返回 SingleFromCallable
        return RxJavaPlugins.onAssembly(new SingleFromCallable<T>(callable));
    }

SingleFromCallable [-> SingleFromCallable.java]

// 注意继承自 Single,而 Single 实现了 SingleSource 接口,所以也继承了 subscribe() 方法
public final class SingleFromCallable<T> extends Single<T> {
    // 回调函数
    final Callable<? extends T> callable;

    public SingleFromCallable(Callable<? extends T> callable) {
        // 保存为全局变量
        this.callable = callable;
    }

    @Override
    protected void subscribeActual(SingleObserver<? super T> observer) {
        // 一个 run() 方法体为空的 RunnableDisposable 对象,用来取消订阅
        Disposable d = Disposables.empty();
        // 调用下游(本示例此处为SubscribeOnObserver)的 onSubscribe()
        observer.onSubscribe(d);

        // 已取消订阅的,直接返回,不会发射任何值
        if (d.isDisposed()) {
            return;
        }

        T value;
        try {
            // 调用 callable.call() 获取值
            value = ObjectHelper.requireNonNull(callable.call(), "The callable returned a null value");
        } catch (Throwable ex) {
            // 捕获所有异常,所以使用 rxjava 时,自己写的方法收不到异常通知,需订阅一个 Consumer<Throwable>
            Exceptions.throwIfFatal(ex);
            if (!d.isDisposed()) {
                // 发射一个 error 事件给下游
                observer.onError(ex);
            } else {
                RxJavaPlugins.onError(ex);
            }
            return;
        }

        if (!d.isDisposed()) {
            // 发射一个 success 事件给下游
            observer.onSuccess(value);
        }
    }
}

subscribeOn(Scheduler scheduler) [-> Single.java]

    public final Single<T> subscribeOn(final Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        // 返回 SingleSubscribeOn
        return RxJavaPlugins.onAssembly(new SingleSubscribeOn<T>(this, scheduler));
    }

SingleSubscribeOn [-> SingleSubscribeOn.java]

// 注意继承自 Single,而 Single 实现了 SingleSource 接口,所以也继承了 subscribe() 方法
public final class SingleSubscribeOn<T> extends Single<T> {
    // 上游
    final SingleSource<? extends T> source;
    // 线程调度器
    final Scheduler scheduler;

    public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) {
        // 保存上游为 this.source
        this.source = source;
        // 保存线程调度器为 this.scheduler
        this.scheduler = scheduler;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super T> s) {
        // 将下游和上游包装为 SubscribeOnObserver
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s, source);
        // 调用下游的 onSubscribe(),此时还没有切换线程,所以 onSubscribe() 是在原线程执行的
        s.onSubscribe(parent);
        // 将 SubscribeOnObserver 扔到线程调度器中执行,此处就是 IoScheduler,内部实现基于 jdk 的 ExecutorService、FutureTask 和 Future
        Disposable f = scheduler.scheduleDirect(parent);
        // 将调度器返回的 Disposable(一个实现了 Disposable 和 Runnable 接口的 DisposeTask) 对象设置给 SubscribeOnObserver 的 task,用来取消订阅、中断线程执行
        parent.task.replace(f);
    }

    // 继承自 AtomicReference,实现了 SingleObserver、Disposable、Runnable接口
    // SingleObserver:当作下游
    // Disposable:传给下游以便下游用来取消订阅
    // Runnable:用来提交给 ExecutorService
    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable, Runnable {
        private static final long serialVersionUID = 7000911171163930287L;
        // 下游
        final SingleObserver<? super T> actual;
        // 用来取消订阅、中断线程执行
        final SequentialDisposable task;
        // 上游
        final SingleSource<? extends T> source;

        SubscribeOnObserver(SingleObserver<? super T> actual, SingleSource<? extends T> source) {
            // 将下游保存为 this.actual
            this.actual = actual;
            // 将上游保存为 this.source
            this.source = source;
            // 一个继承自 AtomicReference 实现了 Disposable 接口的对象,用来取消订阅、中断线程执行
            this.task = new SequentialDisposable();
        }

        @Override
        public void onSubscribe(Disposable d) {
            // 因为继承自 AtomicRefrence,此处将取消订阅的句柄(本示例中此处为 Disposables.empty())设置给内部的对象引用,用于取消对上游的订阅
            DisposableHelper.setOnce(this, d);
        }

        @Override
        public void onSuccess(T value) {
            // 上游调用 observer.onSuccess() 时,会调用到这里,此处继续调用下游的 onSuccess() 将值向下传递
            actual.onSuccess(value);
        }

        @Override
        public void onError(Throwable e) {
            // 上游调用 observer.onError() 时,会调用到这里,此处继续调用下游的 onError() 将错误向下传递
            actual.onError(e);
        }

        @Override
        public void dispose() {
            // 取消对上游的订阅
            DisposableHelper.dispose(this);
            // 中断线程执行
            task.dispose();
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        @Override
        public void run() {
            // 在线程池中执行 source.subscribe(),本示例会触发:
            // SingleFromCallable.subscribe()->
            // SingleFromCallable.suscribeActual()->
            // this.onSuccess(callable.call())
            source.subscribe(this);
        }
    }
}

subscribe() [-> Single.java]

    public final Disposable subscribe(final Consumer<? super T> onSuccess, final Consumer<? super Throwable> onError) {
        ObjectHelper.requireNonNull(onSuccess, "onSuccess is null");
        ObjectHelper.requireNonNull(onError, "onError is null");

        // 将 successConsumer、throwableConsumer 包装成 ConsumerSingleObserver,作为观察者
        ConsumerSingleObserver<T> s = new ConsumerSingleObserver<T>(onSuccess, onError);
        // 调用 subscribe(SingleObserver subscriber)
        subscribe(s);
        // ConsumerSingleObserver 实现了 Disposable 接口,持有它,可以用来取消订阅 
        return s;
    }

subscribe(SingleObserver subscriber) [-> Single.java]

    public final void subscribe(SingleObserver<? super T> subscriber) {
        ObjectHelper.requireNonNull(subscriber, "subscriber is null");
        subscriber = RxJavaPlugins.onSubscribe(this, subscriber);
        ObjectHelper.requireNonNull(subscriber, "subscriber returned by the RxJavaPlugins hook is null");

        try {
            // 继续调用抽象方法 subscribeActual(SingleObserver subscriber),即调用子类的 subscribeActual(SingleObserver subscriber)
            // 本示例中,此处子类为 SingleSubscribeOn,源码分析见上方
            subscribeActual(subscriber);
        } catch (NullPointerException ex) {
            throw ex;
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            NullPointerException npe = new NullPointerException("subscribeActual failed");
            npe.initCause(ex);
            throw npe;
        }
    }

ConsumerSingleObserver [->ConsumerSingleObserver .java]

public final class ConsumerSingleObserver<T> extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, LambdaConsumerIntrospection {
    private static final long serialVersionUID = -7012088219455310787L;
    // successConsumer
    final Consumer<? super T> onSuccess;
    // throwableConsumer
    final Consumer<? super Throwable> onError;

    public ConsumerSingleObserver(Consumer<? super T> onSuccess, Consumer<? super Throwable> onError) {
        // 将 successConsumer 保存为全局变量 this.onSuccess
        this.onSuccess = onSuccess;
        // 将 throwableConsumer 保存为全局变量 this.onError
        this.onError = onError;
    }

    @Override
    public void onError(Throwable e) {、
        // 最后设置为已取消订阅
        lazySet(DisposableHelper.DISPOSED);
        try {
            // 回调 throwableConsumer
            onError.accept(e);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            RxJavaPlugins.onError(new CompositeException(e, ex));
        }
    }

    @Override
    public void onSubscribe(Disposable d) {
        // 本示例中,d 为 SubscribeOnObserver
        DisposableHelper.setOnce(this, d);
    }

    @Override
    public void onSuccess(T value) {
        // 最后设置为已取消订阅
        lazySet(DisposableHelper.DISPOSED);
        try {
            // 回调 successConsumer
            onSuccess.accept(value);
        } catch (Throwable ex) {
            // 异常被 rxjava 捕获,所以自己写的 successConsumer 收不到异常
            Exceptions.throwIfFatal(ex);
            RxJavaPlugins.onError(ex);
        }
    }

    @Override
    public void dispose() {
        // 取消订阅
        DisposableHelper.dispose(this);
    }

    @Override
    public boolean isDisposed() {
        return get() == DisposableHelper.DISPOSED;
    }

    @Override
    public boolean hasCustomOnError() {
        return onError != Functions.ON_ERROR_MISSING;
    }
}

时序图

Sequence Diagram

二、zip

Demo

        // Case6: 并发读取不同数据源,转换成同类型后,合并
        Single<IBook> novel = Single.fromCallable(new Callable<Novel>() {
            @Override
            public Novel call() throws Exception {
                return getNovel();
            }
        }).map(new Function<Novel, IBook>() {
            @Override
            public IBook apply(Novel novel) throws Exception {
                return new NovelAdapter(novel);
            }
        }).subscribeOn(Schedulers.io());

        Single<IBook> rxJava2Tutorial = Single.fromCallable(new Callable<RxJava2Tutorial>() {
            @Override
            public RxJava2Tutorial call() throws Exception {
                return getRxJava2Tutorial();
            }
        }).map(new Function<RxJava2Tutorial, IBook>() {
            @Override
            public IBook apply(RxJava2Tutorial rxJava2Tutorial) throws Exception {
                return new RxJava2TutorialAdapter(rxJava2Tutorial);
            }
        }).subscribeOn(Schedulers.io());

        // 注意此处调用的是合并两个 SingleSource 的方法,zip 操作符的重载方法很多,从 2~9 都有,相应的变换函数也有从 2~9,无语啊~
        Single.zip(novel, rxJava2Tutorial, new BiFunction<IBook, IBook, List<IBook>>() {
            @Override
            public List<IBook> apply(IBook iBook, IBook iBook2) throws Exception {
                List<IBook> books = new ArrayList<>(2);
                books.add(iBook);
                books.add(iBook2);
                return books;
            }
        }).subscribe(new Consumer<List<IBook>>() {
            @Override
            public void accept(List<IBook> iBooks) throws Exception {
                Logger.d(TAG, "test: books are " + iBooks);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Logger.d(TAG, "test: get books error.", throwable);
            }
        });

上述代码分别读取 NovelRxJava2Tutorial 两种不同类型书籍,再分别转化为 IBook 类型,然后添加到同一数组中,最后发射给下游。

zip(SingleSource source1, SingleSource source2, BiFunction zipper) [-> Single.java]

    public static <T1, T2, R> Single<R> zip(
            SingleSource<? extends T1> source1, SingleSource<? extends T2> source2,
            BiFunction<? super T1, ? super T2, ? extends R> zipper
     ) {
        ObjectHelper.requireNonNull(source1, "source1 is null");
        ObjectHelper.requireNonNull(source2, "source2 is null");
        return zipArray(Functions.toFunction(zipper), source1, source2);
    }

toFunction(BiFunction f) [-> Functions.java]

    public static <T1, T2, R> Function<Object[], R> toFunction(final BiFunction<? super T1, ? super T2, ? extends R> f) {
        ObjectHelper.requireNonNull(f, "f is null");
        // 注意因为上面调用的是合并两个 SingleSource 的方法,所以这里调用的就是 Array2Func,2表示合并个数,像这样的还有 Array3Func、Array4Func、... Array9Func
        // 作用就是把多参数的 BiFunction 统一转化为一个参数(Object[])的 Function 对象,调用的时候再把参数从 Object[] 里取出来即可
        return new Array2Func<T1, T2, R>(f);
    }

Array2Func [-> Functions::Array2Func]

    static final class Array2Func<T1, T2, R> implements Function<Object[], R> {
        final BiFunction<? super T1, ? super T2, ? extends R> f;

        Array2Func(BiFunction<? super T1, ? super T2, ? extends R> f) {
            this.f = f;
        }

        @SuppressWarnings("unchecked")
        @Override
        public R apply(Object[] a) throws Exception {
            if (a.length != 2) {
                throw new IllegalArgumentException("Array of size 2 expected but got " + a.length);
            }
            // 从 Object[] 中取出实参,然后调用实际的合并函数
            return f.apply((T1)a[0], (T2)a[1]);
        }
    }

zipArray(Function zipper, SingleSource... sources) [-> Single.java]

    public static <T, R> Single<R> zipArray(Function<? super Object[], ? extends R> zipper, SingleSource<? extends T>... sources) {
        ObjectHelper.requireNonNull(zipper, "zipper is null");
        ObjectHelper.requireNonNull(sources, "sources is null");
        if (sources.length == 0) {
            return error(new NoSuchElementException());
        }
        return RxJavaPlugins.onAssembly(new SingleZipArray<T, R>(sources, zipper));
    }

SingleZipArray [-> SingleZipArray.java]

// 继承自 Single,而 Single 实现了 SingleSource 接口,所以也继承了 subscribe() 方法
public final class SingleZipArray<T, R> extends Single<R> {
    // 用来保存要合并的 SingleSource
    final SingleSource<? extends T>[] sources;
    // 用来保存合并函数
    final Function<? super Object[], ? extends R> zipper;

    public SingleZipArray(SingleSource<? extends T>[] sources, Function<? super Object[], ? extends R> zipper) {
        // 将 SingleSource 保存为 this.sources
        this.sources = sources;
        // 将合并函数保存为 this.zipper
        this.zipper = zipper;
    }

    // 通过上面 subscribeOn 的源码分析可知,调用 subscribe() 时,便会调用到上游的 subscribeActual()
    // 此处的 observer 同样也是 ConsumerSingleObserver
    @Override
    protected void subscribeActual(SingleObserver<? super R> observer) {
        SingleSource<? extends T>[] sources = this.sources;
        int n = sources.length;
        // 本示例合并的 SingleSource 个数为 2,即 n=2
        if (n == 1) {
            sources[0].subscribe(new SingleMap.MapSingleObserver<T, R>(observer, new SingletonArrayFunc()));
            return;
        }

        // 将 ConsumerSingleObserver、SingleSource个数、合并函数封装为 ZipCoordinator,用来等待所有 
        // SingleSource 都处理完,然后对其发射的值应用合并函数 
        ZipCoordinator<T, R> parent = new ZipCoordinator<T, R>(observer, n, zipper);
        // 调用 ConsumerSingleObserver 的 onSubscribe()
        observer.onSubscribe(parent);
        // 一个 for 循环,挨个调用 SingleSource 的 subscribe(),触发生产者开始生产
        for (int i = 0; i < n; i++) {
            if (parent.isDisposed()) {
                return;
            }

            SingleSource<? extends T> source = sources[i];

            if (source == null) {
                parent.innerError(new NullPointerException("One of the sources is null"), i);
                return;
            }

            source.subscribe(parent.observers[i]);
        }
    }
    ......
}

ZipCoordinator [-> SingleZipArray::ZipCoordinator]

    // 合并函数协调器,注意继承自 AtomicInteger,以便采用计数法检测是否所有的 SingleSource 都发射完毕
    static final class ZipCoordinator<T, R> extends AtomicInteger implements Disposable {
        private static final long serialVersionUID = -5556924161382950569L;
        // 保存下游观察者,本示例此处为 ConsumerSingleObserver
        final SingleObserver<? super R> actual;
        // 保存合并函数
        final Function<? super Object[], ? extends R> zipper;
        // SingleZipArray 的直接观察者,用来分别接收每个 SingleSource 发射的结果
        // 每收到一个值 (即每回调一次 ZipSingleObserver 的 onSuccess()),计数值-1,直至计数值为0,说明全部发射完毕
        final ZipSingleObserver<T>[] observers;
        // 保存每个 SingleSource 发射的结果
        final Object[] values;

        @SuppressWarnings("unchecked")
        ZipCoordinator(SingleObserver<? super R> observer, int n, Function<? super Object[], ? extends R> zipper) {
            // 因为继承自 AtomicInteger,所以调用父类构造器,设置计数值
            super(n);
            // 将下游观察者 ConsumerSingleObserver 保存为 this.actual,用来接收合并后的结果
            this.actual = observer;
            // 将合并函数保存为 this.zipper
            this.zipper = zipper;
            // 根据 SingleSource 的个数,生成相应个数的 SingleObserver,然后保存为 this.observers
            ZipSingleObserver<T>[] o = new ZipSingleObserver[n];
            for (int i = 0; i < n; i++) {
                o[i] = new ZipSingleObserver<T>(this, i);
            }
            this.observers = o;
            // 根据 SingleSource 的个数,生成相应长度的 Object[],用来保存它们发射的结果
            this.values = new Object[n];
        }

        @Override
        public boolean isDisposed() {
            return get() <= 0;
        }

        @Override
        public void dispose() {
            if (getAndSet(0) > 0) {
                for (ZipSingleObserver<?> d : observers) {
                    d.dispose();
                }
            }
        }

        // 上游调用 ZipSingleObserver::onSuccess() 时,便会调用该方法,触发计数值-1
        void innerSuccess(T value, int index) {
            values[index] = value;
            // 判断计数值是否已减至零
            if (decrementAndGet() == 0) {
                // 计数值为0,说明 SingleSource 全部发射完毕,可以调用合并函数了
                R v;
                try {
                    // 调用合并函数,获得合并后的结果
                    v = ObjectHelper.requireNonNull(zipper.apply(values), "The zipper returned a null value");
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    actual.onError(ex);
                    return;
                }
                // 将合并后的结果发射给下游,即 ConsumerSingleObserver 
                actual.onSuccess(v);
            }
        }

        void disposeExcept(int index) {
            ZipSingleObserver<T>[] observers = this.observers;
            int n = observers.length;
            for (int i = 0; i < index; i++) {
                observers[i].dispose();
            }
            for (int i = index + 1; i < n; i++) {
                observers[i].dispose();
            }
        }

        void innerError(Throwable ex, int index) {
            if (getAndSet(0) > 0) {
                disposeExcept(index);
                actual.onError(ex);
            } else {
                RxJavaPlugins.onError(ex);
            }
        }
    }

ZipSingleObserver [-> SingleZipArray::ZipSingleObserver]

    // 用来接受 SingleSource 发射的结果 
    static final class ZipSingleObserver<T> extends AtomicReference<Disposable>
    implements SingleObserver<T> {
        private static final long serialVersionUID = 3323743579927613702L;
        // zip 协调器,用来触发计数值-1、计数值为0时调用合并函数并发射合并结果
        final ZipCoordinator<T, ?> parent;
        // 接受第几个 SingleSource 的结果
        final int index;

        ZipSingleObserver(ZipCoordinator<T, ?> parent, int index) {
            this.parent = parent;
            this.index = index;
        }

        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public void onSubscribe(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }

        @Override
        public void onSuccess(T value) {
            // SingleSource 发射结果时,调用到这里
            // 调用 ZipCoordinator::innerSuccess()
            parent.innerSuccess(value, index);
        }

        @Override
        public void onError(Throwable e) {
            // SingleSource 发射错误时,调用到这里
            // 调用 ZipCoordinator::innerError
            parent.innerError(e, index);
        }
    }

总结

此情此景,我想吟诗一首:

《源码分析》
     --尼古拉斯·Yulo
RxJava 真牛逼,
你且看我来分析。
链式调用很神奇,
线程调度也随意。
码农工作不容易,
下班还得把学习。
要问源码哪里有?
还得简书看Yulo。

奥,类图忘了贴了:

Class Diagram

总结:

调用时序简化版

除了最上层的被观察者和最下层的观察者,中间的 Single 子类必有一与之对应的 SingleObserver 实现类,总结起来就是:

  • 我的下游的下游不是我的下游
  • 我的上游的上游不是我的上游
  • 我只能访问我的直接上游和直接下游

嗯,这大概可以起名叫 异步责任链模式。。。

别搜了,这名字是我自己想的!

相关文章

网友评论

本文标题:RxJava2源码解析

本文链接:https://www.haomeiwen.com/subject/hlzsmftx.html