美文网首页api 架构
RxJava2框架源码分析二(Create篇)

RxJava2框架源码分析二(Create篇)

作者: yqianqiao | 来源:发表于2019-12-03 18:40 被阅读0次

    1.回顾

    上篇已经介绍了RxJava的基本概念以及用法 RxJava2基本框架分析一(基础篇)

    2.实例讲解

           // RxJava的链式操作
            // 1. 创建被观察者(Observable) & 定义需发送的事件
            Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onNext(3);
                    emitter.onComplete();
                }
            });
            // 2. 创建观察者(Observer) & 定义响应事件的行为
            Observer<Integer> observer = new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("开始采用subscribe连接");
                }
                // 默认最先调用复写的 onSubscribe()
    
                @Override
                public void onNext(Integer value) {
                    System.out.println("对Next事件" + value + "作出响应");
                }
    
                @Override
                public void onError(Throwable e) {
                    System.out.println("对Error事件作出响应");
                }
    
                @Override
                public void onComplete() {
                    System.out.println("对Complete事件作出响应");
                }
    
            };
            // 3. 通过订阅(subscribe)连接观察者和被观察者
            observable.subscribe(observer);
    
    • 运行结果


      示意图

    3. 源码分析

    下面,我讲根据 使用步骤 进行RxJava2的源码进行分析
    步骤1:创建被观察者(Observable)&定义需发送的事件
    步骤2:创建观察者(Observer)&定义响应事件的行为
    步骤3:通过订阅(subscribe)连接观察者和被观察者

    步骤一:创建被观察者(Observable)

    • 源码分析如下
    // 1. 创建被观察者(Observable) & 定义需发送的事件
     Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onNext(3);
                    emitter.onComplete();
                }
            });
     /**
      * 源码分析 Observable.create(object : ObservableOnSubscribe<Int>{...])
      *  create 操作主要是创建了 ObservableCreate 对象并且返回出去
     */
        @CheckReturnValue
        @NonNull
        @SchedulerSupport(SchedulerSupport.NONE)
        public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
            //判断source是否为空  
            ObjectHelper.requireNonNull(source, "source is null");
            //hook函数:判断是否需要再原对象加上一些代码操作(暂时可以当做返回对象本身)
            return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
        }
      
      /**
       * 下面我们来看看 ObservableCreate 对象里面做了什么操作
       */
        public final class ObservableCreate<T> extends Observable<T> {
        // ObservableCreate 是Observable的子类
        final ObservableOnSubscribe<T> source;
    
        public ObservableCreate(ObservableOnSubscribe<T> source) {
            //构造函数
            //传入source对象,并且赋值全局 = 手动创建的ObservableOnSubscribe匿名内部类对象(Observable.create(new ObservableOnSubscribe<Integer>())
            this.source = source;
        }
      //这里需要留心关注subscribeActual方法后面会讲到
    
    
    • 步骤1总结:创建被观察者的操作已经完成了,调用 Observable.create()返回了一个ObservableCreate 对象。

    步骤二创建观察者(Observer)

    • 源码分析
    /** 
      * 使用步骤2:创建观察者 & 定义响应事件的行为(方法内的创建对象代码)
      **/
     // 2. 创建观察者(Observer) & 定义响应事件的行为
            Observer<Integer> observer = new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("开始采用subscribe连接");
                }
                // 默认最先调用复写的 onSubscribe()
    
                @Override
                public void onNext(Integer value) {
                    System.out.println("对Next事件" + value + "作出响应");
                }
    
                @Override
                public void onError(Throwable e) {
                    System.out.println("对Error事件作出响应");
                }
    
                @Override
                public void onComplete() {
                    System.out.println("对Complete事件作出响应");
                }
    
            };
    /** 
      * 源码分析Observer类
      **/
         public interface Observer<T> {
            // 注:Observer本质 = 1个接口
            // 接口内含4个方法,分别用于 响应 对应于被观察者发送的不同事件
            void onSubscribe(@NonNull Disposable d); // 内部参数:Disposable 对象,可结束事件
            void onNext(@NonNull T t);
            void onError(@NonNull Throwable e);
            void onComplete();
        }
    
    • 步骤2总结:创建观察者的操作已经完成了,通过new了一个Observer的匿名内部类

    步骤三:通过订阅(subscribe)连接观察者和被观察者

    • 源码分析
     // 3. 通过订阅(subscribe)连接观察者和被观察者
            observable.subscribe(observer);
    
    /** 
      * 源码分析:Observable.subscribe(observer)
      * 说明:该方法属于 Observable 类的方法(注:传入1个 Observer 对象)
      **/  
    public abstract class Observable<T> implements ObservableSource<T> {
         ...
        // 仅贴出关键源码
      @Override
      public final void subscribe(Observer<? super T> observer) {
             ...
             // 仅贴出关键源码
            //可以看到调用的是本类的下面抽象方法
             subscribeActual(observer); 
       }
        //定义了一个抽象方法当调用subscribe时会跟这个调用Observable子类的实现方法(就是调用者)
       protected abstract void subscribeActual(Observer<? super T> observer);
    }
    
    /**
    *  现在我们回到先前创建的被观察者中 ObservableCreate类 
    **/
    public final class ObservableCreate<T> extends Observable<T> {
     // ObservableCreate 是Observable的子类
        final ObservableOnSubscribe<T> source;
    
        public ObservableCreate(ObservableOnSubscribe<T> source) {
            //构造函数
            //传入source对象,并且赋值全局 = 手动创建的ObservableOnSubscribe匿名内部类对象(Observable.create(new ObservableOnSubscribe<Integer>())
            this.source = source;
        }
    
       /** 
          * 重点关注:复写了subscribeActual()
          * 作用:订阅时,通过接口回调 调用被观察者(Observerable) 与 观察者(Observer)的方法
          **/
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
          //1. 创建1个CreateEmitter对象(封装成一个Disposable对象)
          //作用:发射事件
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
          //2. 调用观察者(Observer)的onSubscribe()
         // onSubscribe()的实现 = 使用步骤2(创建观察者(Observer))时复写的onSubscribe()
           //将Disposable(CreateEmitter) 传到观察者onSubscribe(Disposable d) 参数中,使之可以解除订阅
            observer.onSubscribe(parent);
    
            try {
                //3.调用source对象的subscribe()方法
                // source对象 = 使用步骤1(创建被观察者(Observable))中创建的ObservableOnSubscribe对象
                //subscribe()的实现 = 使用步骤1(创建被观察者(Observable))中复写的subscribe()
                //将CreateEmitter对象传递给被观察者进行对象方法的调用(onNext(),onComplete()...)
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
    
      /** 
        * 分析2:emitter.onNext("1");
        * 此处仅讲解subscribe()实现中的onNext()
        * onError()、onComplete()类似,此处不作过多描述
        **/
        static final class CreateEmitter<T>
        extends AtomicReference<Disposable>
        implements ObservableEmitter<T>, Disposable {
    
            final Observer<? super T> observer;
    
            CreateEmitter(Observer<? super T> observer) {
                //初始化讲观察者赋值到全局变量observer
                this.observer = observer;
            }
    
            @Override
            public void onNext(T t) {
              //当被观察者调用onNext()方法时,回调此方法(步骤一中创建Observable.create()匿名内部类中的onNext())
                //发送的事件不能为null
                if (t == null) {
                    onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                    return;
                }
              //判断是否断开连接(调用Disposable.dispose())
              //没有断开的话,则调用观察者中的onNext()方法
                if (!isDisposed()) {
                    observer.onNext(t);
                }
            }
    
            @Override
            public void onError(Throwable t) {
                if (!tryOnError(t)) {
                    RxJavaPlugins.onError(t);
                }
            }
    }
    

    步骤3总结:当被观察者订阅观察者的时候,会调用被观察者ObservablesubscribeActual()抽象方法,回调其子类重新的subscribeActual()方法。这方法里面有三个步骤:

    • 创建1个CreateEmitter对象(封装成一个Disposable对象)
    • 调用观察者(Observer)的onSubscribe(CreateEmitter parent ) 使其可以取消订阅
    • 调用source对象的subscribe(CreateEmitter parent)方法,通过 parent发送事件回调

    4. 源码总结

    • 在步骤1(创建被观察者(Observable))、步骤2(创建观察者(Observer))时,仅仅只是定义了发送的事件 & 响应事件的行为;
    • 只有在步骤3(订阅时),才开始发送事件 & 响应事件,真正连接了被观察者 & 观察者
    • 具体源码总结如下


      总结

    相关文章

      网友评论

        本文标题:RxJava2框架源码分析二(Create篇)

        本文链接:https://www.haomeiwen.com/subject/tvxhgctx.html