美文网首页Android开发
RxJava2源码解析系列——1.使用流程解析

RxJava2源码解析系列——1.使用流程解析

作者: ShadowHapiness | 来源:发表于2017-07-19 23:54 被阅读48次

    概述

    Rx系列现在火遍全球,网上也纷纷涌现各类教程、博客。作为一个Android开发人员,我认为掌握RxJava已经成为了一项必不可少的专业技能,然而一眛的去看网上已有的教程和博客,并不能让自己深入理解RxJava,于是有了本系列,也当作为自己的一个总结。本系列章节打算从最常见的使用开始,然后进入源码具体分析。

    使用

    最简单的使用RxJava的一个例子,我们需要三个元素

    1. Observable(被观察者)
    2. Observer(观察者)
    3. subscribe(订阅关系)

    废话不多说,就是上代码

    Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                    Log.d(TAG, "subscribe");
                    e.onNext("This is String");
                    e.onNext("This is String");
                    e.onComplete();
                }
            })
                    .subscribe(new Observer() {
                        @Override
                        public void onSubscribe(@NonNull final Disposable d) {
                            Log.d(TAG, "onSubScribe");
                        }
    
                        @Override
                        public void onNext(@NonNull Object o) {
                            Log.d(TAG, "onNext");
    
                        }
    
                        @Override
                        public void onError(@NonNull Throwable e) {
                            Log.d(TAG, "onError");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete");
                        }
                    });
    

    1.首先我们需要创建Observable,创建Observable的操作符有很多,这里不一一写出,本文中先使用create操作符创建Observable对象。我们来看一下create方法:

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
    
    1. create方法中需要一个ObservableOnSubscribe<T>类型参数
    2. 方法中最先使用ObjectHelper做了一次判空处理。
    3. 使用RxJavaPlugins做了某些事情
    mark

    ObservableOnSubscribe<T>是一个接口,因此需要实现当中的subscribe方法,而subscribe方法中存在ObservableEmitter<T>类型的参数。

    mark mark

    ObservableEmitter<T>继承Emitter<T>接口,Emitter<T>接口中定义了我们熟知的onNext、onError、onComplete方法。

    ObjectHelper是个工具类,在这就不多说了。

    mark

    RxJavaPlugins顾名思义,应该叫做插件类,具体的作用后续再做详解,这里大概说明一下,该类的onAssembly方法跟hook相关。而这里hook不影响我们主流程,因此传进去什么参数就返回什么(我们这里传进去的是ObservableCreate<T>对象),在ObservableCreate<T>中定义了具体订阅时的逻辑以及发射器的逻辑。

    从这个流程我们看出,当我们使用create操作符创建一个Observable时,我们需要传入一个实现了ObservableOnSubscribe<T>接口的对象,在这个实现中,存在发射器ObservableEmitter<T>,通过它可以让使用者自由定义数据流向。并且在create操作符过程中,ObservableOnSubscribe<T>对象与ObservableCreate<T>相关联。

    2.接下来我们需要创建Observer,Observer是个接口,因此我们只需要实现该接口即可。

    new Observer() {
                        @Override
                        public void onSubscribe(@NonNull final Disposable d) {
                            Log.d(TAG, "onSubScribe");
                        }
    
                        @Override
                        public void onNext(@NonNull Object o) {
                            Log.d(TAG, "onNext");
    
                        }
    
                        @Override
                        public void onError(@NonNull Throwable e) {
                            Log.d(TAG, "onError");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete");
                        }
                    }
    

    创建Observer很简单,实现接口当中定义的4个方法即可。

    1. onSubscribe(@NonNull final Disposable d),当订阅时会回调该方法,Disposable 用来取消订阅关系。
    2. onNext(@NonNull Object o),发射数据
    3. onComplete(),当onNext方法全部执行完毕,执行该方法。
    4. onError(@NonNull Throwable e),当数据流向出现问题或使用者自己调用时执行。

    3.完成订阅过程,首先我们来看看如何完成订阅的。

    observable.subscribe(observer) 
    

    当我们使用创建操作符create创建Observable时,还记得create方法返回的是什么类型吗?ok,就是Observable类型,但具体的实现类是ObservableCreate<T>。

    进入Observable的subscribe方法看看:

    mark
    1. 判空处理
    2. hook相关
    3. 执行subscribeActual(observer)
    4. 抛出异常

    在这4个流程当中,subscribeActual(observer)方法才是我们应该关注的。在Observable类中,subscribeActual(observer)是个抽象方法,因此我们需要寻找它的具体实现。上面已经提到使用create操作符,具体的实现类是ObservableCreate<T>。

    mark

    在ObservableCreate<T>类的subscribeActual(observer)中,所声明的方法参数便是我们外部传进来的实现Observer接口的对象。

    1. 将Observer(观察者)与发射器相关联
    2. 调用observer的onSubscribe方法,为了方便观察者可随时解除订阅关系。
    3. 执行使用者自定义的subscribe方法中的逻辑,同时也将发射器与Observable(被观察者)做关联
    4. 发生异常,回调onError

    因此,当执行到subscribeActual(observer)时,才是真正的订阅。

    而当执行到

     source.subscribe(parent);  
    

    将ObservableOnSubscribe(源头)与CreateEmitter(Observer,终点)联系起来。

    这里的souce就是

    mark

    这里的parent就是 CreateEmitter<T>。因此,会执行parent.onNext(), parent.onComplete(),parent.onError()

    static final class CreateEmitter<T>
        extends AtomicReference<Disposable>
        implements ObservableEmitter<T>, Disposable {
      
            private static final long serialVersionUID = -3434801548987643227L;
            final Observer<? super T> observer;
      
            CreateEmitter(Observer<? super T> observer) {
                this.observer = observer;
            }
    
            @Override
            public void onNext(T t) {
                ...
                if (!isDisposed()) {
                    observer.onNext(t);
                }
            }
    
            @Override
            public void onError(Throwable t) {
                if (!tryOnError(t)) {
                    RxJavaPlugins.onError(t);
                }
            }
    
            @Override
            public boolean tryOnError(Throwable t) {
                ...
                if (!isDisposed()) {
                    try {
                        observer.onError(t);
                    } finally {
                        dispose();
                    }
                    return true;
                }
                return false;
            }
    
            @Override
            public void onComplete() {
                if (!isDisposed()) {
                    try {
                        observer.onComplete();
                    } finally {
                        dispose();
                    }
                }
            }
            ...
        }
    

    到这,最简单的一个使用RxJava的流程就结束了。

    最后总结

    1.创建Observable过程

    1. 需要传入一个实现了ObservableOnSubscribe<T>接口的对象
    2. 在ObservableOnSubscribe<T>实现中,通过ObservableEmitter<T>可以让使用者自由定义数据流向
    3. create操作符过程中,采用适配器模式,将ObservableOnSubscribe<T>通过ObservableCreate<T>适配为Observable<T>对象,让ObservableOnSubscribe<T>与ObservableCreate<T>相关联

    2.订阅过程

    1. 真正订阅的方法在subscribeActual(Observer<? super T> observer)
    2. source.subscribe(parent); 这行代码执行时,才开始发射数据,在ObservableOnSubscribe<T>中通过ObservableEmitter<T>发送数据给Observer
    3. 当Observable与Observer订阅关系被dispose时,不会执行onXXX方法。
    4. Observer 的 onComplete() 和 onError() 互斥只能执行一次,因为CreateEmitter 在回调他们两中任意一个后,都会自动 dispose()。
    5. 先 error 后 complete,complete 不显示。 反之会 crash
    6. 还有一点要注意的是 onSubscribe() 是在我们执行 subscribe() 这句代码的那个线程回调的,并不受线程调度影响

    相关文章

      网友评论

        本文标题:RxJava2源码解析系列——1.使用流程解析

        本文链接:https://www.haomeiwen.com/subject/hhgrkxtx.html