RxJava2 源码分析(一)

作者: yhihua0607 | 来源:发表于2017-09-08 00:03 被阅读0次

    前言

    最近由于项目需要自己搭建了网络框架,采用时下非常流行的Rxjava2 + Retrofit搭建, Rxjava现在已经发展到Rxjava2,之前一直都只是再用Rxjava,但从来没有了解下Rxjava的内部实现,未来知其然并且知其所以然,今天我将一步步来分析Rxjava2的源码,Rxjava2分Observable和Flowable两种(无被压和有被压),我们今天先从简单的无背压的observable来分析。如有不对的地方,望大牛指教&轻拍。源码基于rxjava:2.1.1。

    简单的例子

    先来段最简单的代码,直观的了解下整个Rxjava运行的完整流程。

    private void doSomeWork() {
            Observable<String> observable =  Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> e) throws Exception {
                    e.onNext("a");
                    e.onComplete();
                }
            });
            Observer observer = new Observer<String>() {
    
                @Override
                public void onSubscribe(Disposable d) {
                    Log.i("lx", " onSubscribe : " + d.isDisposed());
                }
    
                @Override
                public void onNext(String str) {
                    Log.i("lx", " onNext : " + str);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.i("lx", " onError : " + e.getMessage());
                }
    
                @Override
                public void onComplete() {
                    Log.i("lx", " onComplete");
                }
            };
            observable.subscribe(observer);
        }
    

    上面代码之所以将observable和observer单独声明,最后再调用observable.subscribe(observer);
    是为了分步来分析:

    1. 被观察者 Observable 如何生产事件的
    2. 被观察者 Observable 何时生产事件的
    3. 观察者Observer是何时接收到上游事件的
    4. Observable 与Observer是如何关联在一起的

    Observable

    Observable是数据的上游,即事件生产者
    首先来分析事件是如何生成的,直接看代码 Observable.create()方法。

       @SchedulerSupport(SchedulerSupport.NONE)
        public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {    // ObservableOnSubscribe 是个接口,只包含subscribe方法,是事件生产的源头。
            ObjectHelper.requireNonNull(source, "source is null"); // 判空
            return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
        }
    

    最重要的是RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));这句代码。继续跟踪进去

    /**
         * Calls the associated hook function.
         * @param <T> the value type
         * @param source the hook's input value
         * @return the value returned by the hook
         */
        @SuppressWarnings({ "rawtypes", "unchecked" })
        @NonNull
        public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
            Function<? super Observable, ? extends Observable> f = onObservableAssembly;
            if (f != null) {
                return apply(f, source);
            }
            return source;
        }
    

    看注释,原来这个方法是个hook function。 通过调试得知静态对象onObservableAssembly默认为null, 所以此方法直接返回传入的参数source。
    onObservableAssembly可以通过静态方法RxJavaPlugins. setOnObservableAssembly ()设置全局的Hook函数, 有兴趣的同学可以自己去试试。 这里暂且不谈,我们继续返回代码。
    现在我们明白了:

     Observable<String> observable=Observable.create(new ObservableOnSubscribe<String>() {
    ...
    ...
    })
    

    相当于:

    Observable<String> observable=new ObservableCreate(new ObservableOnSubscribe<String>() {
    ...
    ...
    }))
    

    好了,至此我们明白了,事件的源就是new ObservableCreate()对象,将ObservableOnSubscribe作为参数传递给ObservableCreate的构造函数。
    事件是由接口ObservableOnSubscribe的subscribe方法上产的,至于何时生产事件,稍后再分析。

    Observer

    Observer 是数据的下游,即事件消费者
    Observer是个interface,包含 :

       void onSubscribe(@NonNull Disposable d);
        void onNext(@NonNull T t);
        void onError(@NonNull Throwable e);
        void onComplete();
    

    上游发送的事件就是再这几个方法中被消费的。上游何时发送事件、如何发送,稍后再表。

    subscribe

    重点来了,接下来最重要的方法来了:observable.subscribe(observer);
    从这个方法的名字就知道,subscribe是订阅,是将观察者(observer)与被观察者(observable)连接起来的方法。只有subscribe方法执行后,上游产生的事件才能被下游接收并处理。其实自然的方式应该是observer订阅(subscribe) observable, 但这样会打断rxjava的链式结构。所以采用相反的方式。
    接下来看源码,只列出关键代码

    public final void subscribe(Observer<? super T> observer) {
            ObjectHelper.requireNonNull(observer, "observer is null");
            ......
           observer = RxJavaPlugins.onSubscribe(this, observer); // hook ,默认直接返回observer
           ......
           subscribeActual(observer);  // 这个才是真正实现订阅的方法。
           ......
        }
    
    // subscribeActual 是抽象方法,所以需要到实现类中去看具体实现,也就是说实现是在上文中提到的ObservableCreate中
    protected abstract void subscribeActual(Observer<? super T> observer);
    

    接下来我们来看ObservableCreate.java:

        public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;  // 事件源,生产事件的接口,由我们自己实现
        }
    
       @Override
        protected void subscribeActual(Observer<? super T> observer) {
            CreateEmitter<T> parent = new CreateEmitter<T>(observer); // 发射器
            observer.onSubscribe(parent);  //直接回调了观察者的onSubscribe
    
            try {
                // 调用了事件源subscribe方法生产事件,同时将发射器传给事件源。 
                // 现在我们明白了,数据源生产事件的subscribe方法只有在observable.subscribe(observer)被执行
                  后才执行的。 换言之,事件流是在订阅后才产生的。
                //而observable被创建出来时并不生产事件,同时也不发射事件。
              source.subscribe(parent);  
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
    

    现在我们明白了,数据源生产事件的subscribe方法只有在observable.subscribe(observer)被执行后才执行的。 换言之,事件流是在订阅后才产生的。而observable被创建出来时并不生产事件,同时也不发射事件。
    接下来我们再来看看事件是如何被发射出去,同时observer是如何接收到发射的事件的
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    CreateEmitter 实现了ObservableEmitter接口,同时ObservableEmitter接口又继承了Emitter接口。
    CreateEmitter 还实现了Disposable接口,这个disposable接口是用来判断是否中断事件发射的。
    从名称上就能看出,这个是发射器,故名思议是用来发射事件的,正是它将上游产生的事件发射到下游的。
    Emitter是事件源与下游的桥梁。
    CreateEmitter 主要包括方法:

        void onNext(@NonNull T value);
        void onError(@NonNull Throwable error);
        void onComplete();
        public void dispose() ;
        public boolean isDisposed();
    

    是不是跟observer的方法很像?
    我们来看看CreateEmitter中这几个方法的具体实现:
    只列出关键代码

         public void onNext(T t) {
             if (!isDisposed()) { // 判断事件是否需要被丢弃
                 observer.onNext(t); // 调用Emitter的onNext,它会直接调用observer的onNext
             }
          }
          public void onError(Throwable t) {
               if (!isDisposed()) {
                    try {
                        observer.onError(t); // 调用Emitter的onError,它会直接调用observer的onError
                    } finally {
                        dispose();  // 当onError被触发时,执行dispose(), 后续onNext,onError, onComplete就不会继
                                        续发射事件了
                    }
                }
            }
    
           @Override
            public void onComplete() {
                if (!isDisposed()) {
                    try {
                        observer.onComplete(); // 调用Emitter的onComplete,它会直接调用observer的onComplete
                    } finally {
                        dispose();  // 当onComplete被触发时,也会执行dispose(), 后续onNext,onError, onComplete
                                          同样不会继续发射事件了
                    }
                }
            }
    

    CreateEmitter 的onError和onComplete方法任何一个执行完都会执行dispose()中断事件发射,所以observer中的onError和onComplete也只能有一个被执行。
    现在终于明白了,事件是如何被发射给下游的。
    当订阅成功后,数据源ObservableOnSubscribe开始生产事件,调用Emitter的onNext,onComplete向下游发射事件,Emitter包含了observer的引用,又调用了observer onNext,onComplete,这样下游observer就接收到了上游发射的数据。

    总结

    Rxjava的流程大概是:

    1. Observable.create 创建事件源,但并不生产也不发射事件。
    2. 实现observer接口,但此时没有也无法接受到任何发射来的事件。
    3. 订阅 observable.subscribe(observer), 此时会调用具体Observable的实现类中的subscribeActual方法,
      此时会才会真正触发事件源生产事件,事件源生产出来的事件通过Emitter的onNext,onError,onComplete发射给observer对应的方法由下游observer消费掉。从而完成整个事件流的处理。

    PS: observer中的onSubscribe在订阅时即被调用,并传回了Disposable, observer中可以利用Disposable来随时中断事件流的发射。

    今天所列举的例子是最简单的一个事件处理流程,没有使用线程调度,Rxjava最强大的就是异步时对线程的调度和随时切换观察者线程。至于这部分的源码且听下回讲解。

    相关文章

      网友评论

        本文标题:RxJava2 源码分析(一)

        本文链接:https://www.haomeiwen.com/subject/weoijxtx.html