美文网首页Android收藏集
Rxjava2.x源码解析(一): 订阅流程

Rxjava2.x源码解析(一): 订阅流程

作者: 8f959b52e515 | 来源:发表于2019-05-21 09:06 被阅读0次

    现在网上已经有大量的源码分析文章,各种技术的都有。但我觉得很多文章对初学者并不友好,让人读起来云里雾里的,比源码还源码。究其原因,是根本没有从学习者的角度去分析。在自己完成了源码阅读之后,却忘记了自己是如何一步步提出问题,进而走到这里的。

    所以,我想在本篇及以后的文章中,花更多的精力去进行源码的分析,争取用浅显易懂的语言,用适合的逻辑去组织内容。这样不至于陷入源码里,导致文章难懂。尽量让更多的人愿意去读源码。

    阅读本文,你需要对 RxJava2 的一些基本使用有所了解,不过不用太深。这里推荐下Season_zlc给初学者的RxJava2.0教程(一)
    https://www.jianshu.com/p/464fa025229e

    比较浅显易懂。

    提到 RxJava,你第一个想到的词是什么?

    “异步”。

    RxJava 在 GitHub 上的官网主页也说了,“RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.”(RxJava是一个使用可观测序列来组建异步、基于事件的程序的库,它是 Reactive Extensions 在Java虚拟机上的一个实现)。它的优点嘛,用扔物线凯哥的话讲,就是“简洁”,并且“随着程序逻辑变得越来越复杂,它依然能够保持简洁”。

    这里要注意一点,虽然对大多数人来讲,更多的是使用 RxJava 来配合 Retrofit、OkHttp 进行网络请求框架的封装及数据的异步处理,但是,RxJava和网络请求本质上没有半毛钱的关系。它的本质,官网已经说的很明白了,就是“异步”。

    RxJava 基于观察者模式实现,基于事件流进行链式调用。

    首先,我们需要添加必要的依赖,这里以最新的2.2.8版本为例:

        implementation "io.reactivex.rxjava2:rxjava:2.2.8"
    

    当然,对于 Android 项目来讲,我们一般还需要添加一个补充库:

        implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
    

    这个库其实就是提供了 Android 相关的主线程的支持。

    然后写个简单的代码,就可以开始我们的源码分析啦。

            // 上游 observable
            Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    Log.d(TAG, "subscribe: ");
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onComplete();
                }
            });
            
            // 下游 observer
            Observer<Integer> observer = new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    // onSubscribe 方法会最先被执行
                    Log.d(TAG, "onSubscribe: ");
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "onNext: ");
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "onError: ");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete: ");
                }
            };
    
            // 将上游和下游进行关联
            observable.subscribe(observer);
    

    为便于理解,我故意将可以链式调用的代码,拆成了三部分。你完全可以写成下面的链式风格:

     Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    Log.d(TAG, "subscribe: ");
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onComplete();
                }
            }).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    // onSubscribe 方法会最先被执行
                    Log.d(TAG, "onSubscribe: ");
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "onNext: ");
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "onError: ");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete: ");
                }
            });
    

    同样,为了便于理解,我会借用i/o流里面经常用到的水流进行类比。将被观察者 observable 称为上游(upstream),将观察者 observer 称为下游(downstream)。读源码其实也能看出,作者本身也正是这么类比的。

    通过将整个过程拆分成三个步骤,能更清晰的理清逻辑。我们需要做的,本质上就是创建一个上游和一个下游,最终通过上游对象的subscribe方法将二者关联起来:

    1. 创建一个 Observable 的实现类
    2. 创建一个 Observer 的实现类
    3. 将二者通过 Observable 的 subscribe(...) 方法将二者进行关联

    明白了这三点,以后我们就不会被各种实现类搞的眼花缭乱。

    这三个步骤,里面的核心是第三部,也就是订阅过程,毕竟,这属于一个动作,而我们进行源码分析的时候,往往就是从动作开始的。这时候,我们Ctrl/Command + 鼠标左键,进入该方法看看,里面做了下什么。

        public final void subscribe(Observer<? super T> observer) {
            ObjectHelper.requireNonNull(observer, "observer is null");
            try {
                // RxJavaPlugins是个钩子函数,用来在代码的执行前后插入进行一些操作
                observer = RxJavaPlugins.onSubscribe(this, observer);
    
                ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
                // 关键点是这行代码
                subscribeActual(observer);
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                // can't call onError because no way to know if a Disposable has been set or not
                // can't call onSubscribe because the call might have set a Subscription already
                RxJavaPlugins.onError(e);
    
                NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
                npe.initCause(e);
                throw npe;
            }
        }
    

    这里将this(上游Observable类型)的和下游observer作为参数传给了 RxJavaPlugins 的 onSubscribe(...)方法,并返回一个Observer,同时,将原来的observer指向这个返回值,那么我们看看这个函数中到底进行了什么操作:

        //  RxJavaPlugins.java
        public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
            BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
            if (f != null) {
                return apply(f, source, observer);
            }
            return observer;
        }
    

    这里判断onObservableSubscribe是否为 null,不为 null 则调用其 apply(...) 方法。若为 null ,则直接返回原来的observer。而该变量需要通过RxJavaPlugin的setOnSingleSubscribe(...)方法来指定的,显然,我们并没有指定,所以忽略不管(后面遇到类似问题,基本也都可以忽略)。

    回到之前的订阅流程,就可以简化为下面这样:

        public final void subscribe(Observer<? super T> observer) {
            ObjectHelper.requireNonNull(observer, "observer is null");
            try {
                ...
                // 调用到具体实现子类的 subscribeActual(observer) 方法
                subscribeActual(observer);
            } catch (
                ...
            }
        }
    

    从上面代码可以看出,订阅过程,即调用Observable的subscribe(...)的过程,其实就是直接调用了其实现类的subscribeActual(observer)方法(该方法在 Observable 中是个抽象方法)。以后我们遇到这个方法,就直接去 Observable 的实现类中找即可,就不会乱了。

    一些熟悉RxJava的朋友可能会说,有时候我们通过subscribe(...)订阅的并不是Observer对象,而是consumer对象,有各种重载。如下:

    image

    当你传入的是Consumer的时候,不管你传递了几个参数,最终都会代用到以下方法,那些你没传递的 onError或者 onComplete 回调等等,会自动使用默认创建的值。

        public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
                Action onComplete, Consumer<? super Disposable> onSubscribe) {
            ObjectHelper.requireNonNull(onNext, "onNext is null");
            ObjectHelper.requireNonNull(onError, "onError is null");
            ObjectHelper.requireNonNull(onComplete, "onComplete is null");
            ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
    
            // 最终都会封装成一个 LambdaObserver,并作为参数传入subscribe(...)方法中
            LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
    
            subscribe(ls);
    
            return ls;
        }
    

    可以看出,这里最终还是将这些 Consumer 对象包装在了一个 LambdaObserver 类型的变量中,然后又调用了subscribe(...)方法,将其作为变量传入,之后的分析,就跟上面是一样的了。

    订阅方法讲完了,我们也知道最终调用到了 Observable 的实现类的subscribeActual(...)方法。那接下来肯定就是要弄懂在这个方中做了什么事。我们例子中是使用Observable.create(...)方法创建的 observable:

            // 上游 observable
            Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    Log.d(TAG, "subscribe: ");
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onComplete();
                }
            });
    

    其中,Observable.create(...)方法的实现是这样的:

        public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
            ObjectHelper.requireNonNull(source, "source is null");
            return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
        }
    

    我们传进去了一个实现了ObservableOnSubscribe接口的匿名内部类,该接口类也很简单,就定义了一个void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception抽象方法。

    然后我们将传进来的source(刚刚提到的匿名内部类ObservableOnSubscribe)封装进一个ObservableCreate对象中,又传进了RxJavaPlugins.onAssembly(...)中,这个RxJavaPlugins类刚才我们说过,其实就是一个hook类,暂时直接忽略,一般就是直接把传进来的参数返回了(不放心的话可以自己点进去,以后遇到该方法不再赘述)。

    也就是说Observable.create(...)方法最终创建了一个ObservableCreate对象。注意,该对象是Observable抽象类的具体实现类。

    特别注意!
    特别注意!
    特别注意!

    重要事情说三遍。我们这里通过create(...)方法创建的Observable的具体实现子类是ObservableCreate。该子类的命名是有规律可言的。我在分析源码的时候有时候就想,这么多看起来名字都一样的类,RxJava的开发者本人不会懵逼吗?作为一个用户量这么大的库,肯定各种都有讲究,肯定有贵了。嗯。规律就是生成的子类的命名方法为“Observable+创建该类的方法名”,即:在创建该类的方法名称前面加上个Observable,以此来作为新的类的名称。

    不信?

    我们还可以通过Observable.just(...)这种方式来创建Observable,点进去看看具体子类名字是啥:

    image

    其他的自己就去验证吧。

    所以,我们以后遇到Observable开头的类名,就可以猜测它是一个Observable类型的变量,类名后面的部分,就是创建该变量的方法(确保严谨,倒推可能不成立,要仔细确认)。

    同样的,各种Observer的实现类也是类似,只不过各种它们是把创建的方法放在了前面,然后以Observer结尾而已,这点之后遇到的时候会再提及。

    回到刚才讲的。我们通过create(...)方法,创建出来的是ObservableCreate,它是个Observable,那我们就直接看它的subscribeActual(...)方法究竟做了什么:

        protected void subscribeActual(Observer<? super T> observer) {
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            // 首先调用下游 observer 的 onSubscribe方法
            observer.onSubscribe(parent);
    
            try {
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
    

    首先,将observer包装进CreateEmitter对象中。

    然后立即调用 Observer 的onSubscribe(parent)方法,表示订阅过程完成。(当我们通过subscribe(...)进行订阅的时候,会立即调用下游Observer 的onSubscribe(...)方法。通过查看其它实现类,可以总结出该结论)。

    这里,会将我们的封装类CreateEmitter作为参数传进onSubscribe(...)方法中。

    之后,又在代码source.subscribe(parent)中将其作为参数传递。这里的source,是源的意思,其实也就是上游。此例子中具体指我们传入Observable.create(...)中的ObservableOnSubscribe类型的匿名内部类。

    而我们已经实现了该抽象方法:

            // 上游 observable
            Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    Log.d(TAG, "subscribe: ");
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onComplete();
                }
            });
    

    我们之后就是调用的传进来的ObservableEmitteronNext/onError/OnComplete来发送事件的。等等,我们创建的时候不是传进来的是CreateEmitter吗,怎么又变成了ObservableEmitter?其实,CreateEmitter是ObservableCreate的一个 static final 类型的内部类,并且实现了ObservableEmitter接口。因为是由create方法创建的,所以这样命名咯,同时,又作为内部类定义在 ObservableCreate 中,这样,用到的时候是不是就不那么凌乱啦?

    到这里,我们知道了会通过回调emitter的各种方法来发送事件,这些事件又是怎么被observer 正确接收并处理的呢?

    我们继续回到 ObservableCreate 的subscribeActual(...)方法:

        protected void subscribeActual(Observer<? super T> observer) {
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            observer.onSubscribe(parent);
    
            try {
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
    

    我们发送事件,最终调用的其实是 parent(即 CreateEmitter) 中的相应方法,而 CreateEmitter 里又封装了 observer。我们到 CreateEmitter 这个类的源码中,看看发送事件的时候,都干嘛了:

        static final class CreateEmitter<T>
        extends AtomicReference<Disposable>
        implements ObservableEmitter<T>, Disposable {
    
            private static final long serialVersionUID = -3434801548987643227L;
    
            final Observer<? super T> observer;
    
            CreateEmitter(Observer<? super T> observer) {
                this.observer = observer;
            }
    
            @Override
            public void onNext(T t) {
                if (t == null) {
                    onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                    return;
                }
                if (!isDisposed()) {
                    observer.onNext(t);
                }
            }
    
            @Override
            public void onError(Throwable t) {
                if (!tryOnError(t)) {
                    RxJavaPlugins.onError(t);
                }
            }
    
            @Override
            public boolean tryOnError(Throwable t) {
                if (t == null) {
                    t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
                }
                if (!isDisposed()) {
                    try {
                        observer.onError(t);
                    } finally {
                        dispose();
                    }
                    return true;
                }
                return false;
            }
    
            @Override
            public void onComplete() {
                if (!isDisposed()) {
                    try {
                        observer.onComplete();
                    } finally {
                        dispose();
                    }
                }
            }
    
            @Override
            public void setDisposable(Disposable d) {
                DisposableHelper.set(this, d);
            }
    
            @Override
            public void setCancellable(Cancellable c) {
                setDisposable(new CancellableDisposable(c));
            }
    
            @Override
            public ObservableEmitter<T> serialize() {
            // 这里返回了一个 SerializedEmitter,并传入 this,也就是 CreateEmitter 对象
                return new SerializedEmitter<T>(this);
            }
    
            @Override
            public void dispose() {
                DisposableHelper.dispose(this);
            }
    
            @Override
            public boolean isDisposed() {
                // 这里判断,是否已经处于 disposed 状态,
                // 注意 get() 是定义在 AtomicReference 中的方法
                return DisposableHelper.isDisposed(get());
            }
    
            @Override
            public String toString() {
                return String.format("%s{%s}", getClass().getSimpleName(), super.toString());
            }
        }
    

    这里的代码也是比较简单的,就是将发送的事件中的参数直接传递给 observer 中的相应方法。只不过中间多了背压的判断(该类实现了Disposable 接口)。同时注意,该类还是 AtomicReference 的子类,可以实现原子操作。并且在覆写的 ObservableEmitter 的serialize()接口中创建并返回了一个SerializedEmitter,这些都是跟线程安全以及背压相关的,不是本文的重点。

    还有一点,需要大家注意,从RxJava2.x开始,已经不允许向onNext/onError中传null值,否则会报空指针,这点在上面的源码中也能看到。这就会对封装网络请求的时候产生影响,比如请求验证验证码接口成功,但是后台返回的 result 字段为 null,我们此时可能仍然想要它调用 onNext 方法去执行成功的回调。那这就需要额外的处理了。网上也有一些解决方案,但是总觉得不够优雅,有大佬有比较好的建议,也可以指点下。

    好啦,本篇文章就写到这里,带大家完成了订阅、事件的发送及处理的整个流程。

    关于线程切换的内容,放在下一篇文章中讲。毕竟,不谈线程切换,谈什么 RxJava源码 分析,哈哈。

    欢迎关注公众号来获取其他最新消息,有趣的灵魂在等你。

    image

    相关文章

      网友评论

        本文标题:Rxjava2.x源码解析(一): 订阅流程

        本文链接:https://www.haomeiwen.com/subject/pitdzqtx.html