美文网首页
RxJava中实现观察者模式的原理

RxJava中实现观察者模式的原理

作者: 菇凉别走 | 来源:发表于2018-12-04 18:06 被阅读0次

    前言:

        本文所有代码基于RxJava 2.2.3和RxAndroid 2.1.0,并使用kotlin来介绍RxJava中常用的操作符的用法及效果。
    

    第一步:创建被观察者Observable。
    1、 create其实就是Observable里的一个静态方法用于创建一个Observable实例。在下面这段代码中我们主动创建了两个对象实例,一个是由Observable.create方法返回的Observable实例,一个是传入create方法的参数ObservableOnSubscribe匿名对象并覆写了定义事件发送顺序的方法 ObservableOnSubscribe.subscribe。

             //第一步
            Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter emitter) throws Exception {
                    Log.e("Create","ObservableOnSubscribe对象的subscribe方法被调用");
                    emitter.onNext("数据1");
                    emitter.onNext("数据2");
                    emitter.onNext("数据3");
                    emitter.onNext("数据4");
                    emitter.onError(new Exception());
                    emitter.onComplete();
                }
            });
    
    

    2、Observable的create方法,该方法在这种情况下实际上返回的就是新new的 ObservableCreate对象,并且会将ObservableOnSubscribe对象赋值给其内部的成员变量source

        //Observable的create方法
        public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
            ObjectHelper.requireNonNull(source, "source is null");
            return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
        }
        //ObservableCreate类的成员变量及其构造函数 create传入的参数ObservableOnSubscribe对象最终赋值给了source
       final ObservableOnSubscribe<T> source;
       public ObservableCreate(ObservableOnSubscribe<T> source) {
                this.source = source;
        }
        
       //RxJavaPlugins.onAssembly方法
        public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
            Function<? super Observable, ? extends Observable> f = onObservableAssembly;
            if (f != null) {//此时 f == null 所以会返回source即传入的参数不会有其他操作
                return apply(f, source);
            }
            return source;
        }
    
    
    第一步结论:创建了一个ObservableCreate对象,并将决定订阅时事件调动顺序的ObservableOnSubscribe对象赋值给其成员变量source。

    第二步:创建观察者Observer

            Observer<String> observer = new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.e("Create","Observer的onSubscribe方法");
                }
    
                @Override
                public void onNext(String s) {
                    Log.e("Create","Observer的onNext方法 收到的参数为:"+s);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.e("Create","Observer的onError方法");
                }
    
                @Override
                public void onComplete() {
                    Log.e("Create","Observer的onComplete方法");
                }
            };
    
    第二步结论:第二步就是单纯的创建了一个观察者,并覆写如上所示的四个方法。

    第三步:让被观察者(Observable)和观察者(Observer)之间产生联系

    //观察者订阅被观察者 两者关系发生
     observable.subscribe(observer);
    

    1、订阅之后立即调用被观察者(observable)的subscribe方法。该方法中调用了抽象方法subscribeActual(observer);并将观察者(observer)作为参数传入传入。

        public final void subscribe(Observer<? super T> observer) {
            ObjectHelper.requireNonNull(observer, "observer is null");
            try {
                observer = RxJavaPlugins.onSubscribe(this, observer);
                ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned 
                a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe 
                for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
                //核心代码是这句这个方法在Observable类中是个抽象方法第一步创建的ObservableCreate对象,覆写了该方法,该方法的参数为
                subscribeActual(observer);
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                // can't call onError because no way to know if a Disposable has been set or not
                // can't call onSubscribe because the call might have set a Subscription already
                RxJavaPlugins.onError(e);
    
                NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
                npe.initCause(e);
                throw npe;
            }
        }
    
    

    2、当这个创建的Observable对象调用subscribe方法时,subscribe方法会调用ObservableCreate类中覆写的subscrubeActual方法,subscrubeActual的代码如下:

       
        protected void subscribeActual(Observer<? super T> observer) {
            //将订阅的Observer对象进行封装到CreateEmitter中,作为source调用subscribe方法时的参数
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            observer.onSubscribe(parent);
            try {
                //source即为调用create操作符时传入的用于定义调用顺序的ObservableOnSubscribe对象,在此处调用了其subscribe方法
               //传入的参数是上面封装了的CreateEmitter对象,所以我们在外面定义的调用顺序就是调用的CreateEmitter的方法如onNext,
              //onError,onComplete等方法
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
    

    3、再看看CreateEmitter中对onNext,onError,onComplete的封装。三个方法都调用了观察者(observer)的相应方法,但是在调用前都对其进行了处理,判断其是否被取消。若调用了onError或者onComplete中的任意一个方法时,就会被取消,即用dispose()方法,这时 !isDisposed())的值就为false了。所以后续的观察者的其他方法都不可调用了。

            //observer对象即为我们在第二步创建的观察者(observer)
            public void onNext(T t) {
                if (t == null) {
                    onError(new NullPointerException("onNext called with null. Null values are generally
                                                       not allowed in 2.x operators and sources."));
                    return;
                }
                if (!isDisposed()) {
                    observer.onNext(t);
                }
            }
    
            public void onError(Throwable t) {
                if (!tryOnError(t)) {
                    RxJavaPlugins.onError(t);
                }
            }
    
            public boolean tryOnError(Throwable t) {
                if (t == null) {
                    t = new NullPointerException("onError called with null. Null values are generally not 
                                                                    allowed in 2.x operators and sources.");
                }
                if (!isDisposed()) {
                    try {
                        observer.onError(t);
                    } finally {
                        dispose();
                    }
                    return true;
                }
                return false;
            }
    
            public void onComplete() {
                if (!isDisposed()) {
                    try {
                        observer.onComplete();
                    } finally {
                        dispose();
                    }
                }
            }
    
    
    第三步总结:被观察者(observable)调用subscribe方法,此方法最终将观察者(observer)对象封装成Emitter并作为参数,调用ObservableCreate中source即第一步创建时传入的参数的subscribe方法。即实现了调用第一步时定义的调用顺序。而Emitter的封装使onError及onComplete的调用做了限制,保证了onError和onComplete方法只能调用其中一个。

    最后总结:

    上述一系列操作,可归结为以下伪代码:

         //规定调用顺序
           ObservableOnSubscribe<String> source = new ObservableOnSubscribe<String>(){
               @Override
               public void subscribe(ObservableEmitter emitter) throws Exception {
                   Log.e("Create","ObservableOnSubscribe对象的subscribe方法被调用");
                   emitter.onNext("数据1");
                   emitter.onNext("数据2");
                   emitter.onNext("数据3");
                   emitter.onNext("数据4");
                   emitter.onError(new Exception());
                   emitter.onComplete();
               }
           };
       //创建被观察者
           Observable<String> observable = new ObservableCreate<>(source);
           observable.source = source;
         //以上为第一步
    //-------------------------------------------------------------------------
           Observer<String> observer = new Observer<String>();
    //创建观察者这是第二步
    //---------------------------------------------------------------------------
    //调用observable.subscribe(observer)
           Emitter<String> emitter = new EmitterCreate(observer);
           observable.source.subscribe(emitter);
    //第三步订阅
    //-----------------------------------------------------------------------------
    
    

    相关文章

      网友评论

          本文标题:RxJava中实现观察者模式的原理

          本文链接:https://www.haomeiwen.com/subject/rcdmcqtx.html