美文网首页Android-RxJavaAndroid开发经验·源码分析
谈谈对于响应式编程RxJava的理解 - 原理篇

谈谈对于响应式编程RxJava的理解 - 原理篇

作者: 程序员三千_ | 来源:发表于2020-06-09 15:10 被阅读0次

    谈谈对于响应式编程RxJava的理解 - 核心思想篇
    谈谈对于响应式编程RxJava的理解 - 原理篇

    源码分析

    我们直接先看一个最简单的例子

     Observable
            .create(new ObservableOnSubscribe<Object>() {
    
                @Override
                public void subscribe(ObservableEmitter<Object> e) throws Exception {
                    e.onNext("A");
    
                }
            })
    //        .map(new Function<Object, Object>() {
    //
    //            @Override
    //            public Object apply(Object o) throws Exception {
    //                return null;
    //            }
    //        })
           .subscribe(new Observer<Object>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Object o) {
    
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    

    map操作符我们现在先不看,先从最简单的流程开始,Observable不管用create 创建还是just创建其实内部原理是一样的,我们这里便于分析RxJava的整个流程所以用了create来创建。根据观察者模式的概念,我们将RxJava的源码分析过程从简单到难,拆分为三步进行:

    • 1、Observer观察者的创建
    • 2、Observable#create被观察者的创建
    • 3、subscribe订阅关系的建立
    第一步:Observer观察者的创建
    public interface Observer<T> {
    
        /**
         * Provides the Observer with the means of cancelling (disposing) the
         * connection (channel) with the Observable in both
         * synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
         * @param d the Disposable instance whose {@link Disposable#dispose()} can
         * be called anytime to cancel the connection
         * @since 2.0
         */
        void onSubscribe(@NonNull Disposable d);
    
        /**
         * Provides the Observer with a new item to observe.
         * <p>
         * The {@link Observable} may call this method 0 or more times.
         * <p>
         * The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
         * {@link #onError}.
         *
         * @param t
         *          the item emitted by the Observable
         */
        void onNext(@NonNull T t);
    
        /**
         * Notifies the Observer that the {@link Observable} has experienced an error condition.
         * <p>
         * If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
         * {@link #onComplete}.
         *
         * @param e
         *          the exception encountered by the Observable
         */
        void onError(@NonNull Throwable e);
    
        /**
         * Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
         * <p>
         * The {@link Observable} will not call this method if it calls {@link #onError}.
         */
        void onComplete();
    
    }
    

    Observer其实属于观察者模式中的抽象观察者对象,就是一个接口,里面有onSubscribe开始订阅回调、onNext拿到事件回调、onError错误事件回调、onComplete完成事件回调的抽象方法,我们这里就是new一个这个Observer接口的匿名内部类。

    第二步:Observable#create被观察者的创建

    我们先看到Observable.create方法的调用

     @CheckReturnValue
        @SchedulerSupport(SchedulerSupport.NONE)
        public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
            ObjectHelper.requireNonNull(source, "source is null");
            return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
        }
    

    我们点进去RxJavaPlugins.onAssembly看看

      @NonNull
        public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
            Function<? super Observable, ? extends Observable> f = onObservableAssembly;
            if (f != null) {
                return apply(f, source);
            }
            return source;
        }
    

    一开始是一个对onObservableAssembly的判空操作,其实RxJava内部并没有用到,一开始onObservableAssembly初始化后就是空的,

       setOnObservableAssembly(null);
    

    按我的理解onObservableAssembly是提供给用户我们自己用的,我们可以重写
    setOnObservableAssembly。例如:

     RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
             @Override
             public Observable apply(Observable observable) throws Exception {
    
          //例如这里可以加入一下日志打印,可以查看全局哪些地方用到了RxJava
             Log.d("RxJava",observable.toString());
             return observable;
            }
     };
    

    如果要重写setOnObservableAssembly的话,记得return返回observable,这样才可以继续RxJava的整个事件流,不然返回null肯定会报错的。
    这里也不是我们分析的重点,我们再看到return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));这句代码,我们看到这里返回了一个ObservableCreate对象

    public final class ObservableCreate<T> extends Observable<T> {
        final ObservableOnSubscribe<T> source;
    
        public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            observer.onSubscribe(parent);
    
            try {
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
    

    ObservableCreate对象是继承自Observable,的我们先大致看下它的结果,具体先接着看第三步流程,后面再结合起来分析。

    第三步:subscribe订阅关系的建立

    直接进入subscribe的源码

      @SchedulerSupport(SchedulerSupport.NONE)
        @Override
        public final void subscribe(Observer<? super T> observer) {
            ObjectHelper.requireNonNull(observer, "observer is null");
            try {
                observer = RxJavaPlugins.onSubscribe(this, observer);
    
                ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
    
                subscribeActual(observer);
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                // can't call onError because no way to know if a Disposable has been set or not
                // can't call onSubscribe because the call might have set a Subscription already
                RxJavaPlugins.onError(e);
    
                NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
                npe.initCause(e);
                throw npe;
            }
        }
    

    subscribe方法里传进来的就是我们自定义的observer(就是自己写的匿名类),前面也是一些初始化和判空操作,我们主要看 subscribeActual(observer);这句代码。我们点进去发现是Observable里的一个抽象方法

    protected abstract void subscribeActual(Observer<? super T> observer);
    

    我们从第二步可以知道ObservableCreate对象是继承自Observable的,所以这里抽象方法subscribeActual的实现就是在ObservableCreate里,我们看到ObservableCreate里的subscribeActual方法

     @Override
        protected void subscribeActual(Observer<? super T> observer) {
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            observer.onSubscribe(parent);
    
            try {
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
    
    
    一开始将我们传进来的observer包装成一个发射器CreateEmitter对象parent,再调用观察者的observer.onSubscribe(parent);这里实现onSubscribe方法的实际上就是我们自己写的内部类里的onSubscribe方法, image.png

    我们看到onSubscribe方法里最终我们调用了发射器CreateEmitter的onNext方法,所以我们进去onNext方法看看

     @Override
            public void onNext(T t) {
                if (t == null) {
                    onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                    return;
                }
                if (!isDisposed()) {
                    observer.onNext(t);
                }
            }
    
    

    我们看到发射器里的onNext方法就是最终调用到我们自定义观察者observer的
    onNext方法。

    至此,RxJava最简单的订阅关系和收发数据的流程就通了,我们再通过一张简易的流程图总结下流程。

    image.png

    那么现在我们再加入map操作符一起分析看看

    Map操作符原理分析

    我们对上面的代码进行简单的修改,并且加入map操作符
    注意看代码注释

     Observable
            //ObservableCreate   将自定义source传进去
            .create(
              //自定义被观察者(起点)
              new ObservableOnSubscribe<String>() {
    
                @Override
                public void subscribe(ObservableEmitter<String> e) throws Exception {
                    e.onNext("A");
    
                }
            })
            //ObservableCreate.map
            .map(new Function<String, Bitmap>() {
    
                @Override
                public Bitmap apply(String o) throws Exception {
                    return null;
                }
            })
            //ObservableMap.subscribe
           .subscribe(
               //自定义观察者(终点)
               new Observer<Object>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Object o) {
    
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    
    
    map.png

    其实上面map操作符的作用就是从Observable起点开始,通过map操作符将String类型数据转化为Bitmap类型数据最后流向终点Observer。
    经过之前的分析,我们已经知道Observable. create返回了一个ObservableCreate对象,这时我们如果再调用map操作符的话,就是对ObservableCreate进行了.map的操作了。

       @CheckReturnValue
        @SchedulerSupport(SchedulerSupport.NONE)
        public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
            ObjectHelper.requireNonNull(mapper, "mapper is null");
            return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
        }
    

    从源码可以看出.map操作返回一个ObservableMap,所以上面整个程序的最后一步调用会走到ObservableMap.subscribe。(具体可以看我代码的注释)

    因为有前面的分析基础,现在我们直接进入subscribe订阅流程的代码

     @SchedulerSupport(SchedulerSupport.NONE)
        @Override
        public final void subscribe(Observer<? super T> observer) {
            ObjectHelper.requireNonNull(observer, "observer is null");
            try {
                observer = RxJavaPlugins.onSubscribe(this, observer);
    
                ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
    
                subscribeActual(observer);
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                // can't call onError because no way to know if a Disposable has been set or not
                // can't call onSubscribe because the call might have set a Subscription already
                RxJavaPlugins.onError(e);
    
                NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
                npe.initCause(e);
                throw npe;
            }
        }
    
    

    因为我们加入了map操作符,现在这里的subscribeActual(observer);就是调用的
    ObservableMap的subscribeActual了,

    
      @Override
        public void subscribeActual(Observer<? super U> t) {
            source.subscribe(new MapObserver<T, U>(t, function));
        }
    

    这里,源码将自定义观察者Observer(终点),传入到MapObserver对象里,将自定义观察者Observer包装了一层,new了一个MapObserver对象出来。
    而这里的 source.subscribe里的source就是上一层的Observable也就是ObservableCreate,所以这里调用了ObservableCreate的subscribe方法。因为不管是ObservableMap还是ObservableCreate都是继承自Observable
    所以我们再进入Observable的subscribe方法

    
    @SchedulerSupport(SchedulerSupport.NONE)
        @Override
        public final void subscribe(Observer<? super T> observer) {
            ObjectHelper.requireNonNull(observer, "observer is null");
            try {
                observer = RxJavaPlugins.onSubscribe(this, observer);
    
                ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
    
                subscribeActual(observer);
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                // can't call onError because no way to know if a Disposable has been set or not
                // can't call onSubscribe because the call might have set a Subscription already
                RxJavaPlugins.onError(e);
    
                NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
                npe.initCause(e);
                throw npe;
            }
        }
    

    而这次 subscribeActual(observer);调用的是ObservableCreate的subscribeActual了,而且这次的observer观察者是自定义观察者经过包装之后的MapObserver对象,

    
     @Override
        protected void subscribeActual(Observer<? super T> observer) {
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            observer.onSubscribe(parent);
    
            try {
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
    

    observer.onSubscribe(parent);就是调用了我们自定义观察者(终点的)的onSubscribe, source.subscribe(parent);这句调用的就是自定义被观察者(起点)的subscribe方法,也就是我们上面流程分析的CreateEmitter发射器的onNext方法,

     public void onNext(T t) {
                if (t == null) {
                    onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                    return;
                }
                if (!isDisposed()) {
                    observer.onNext(t);
                }
            }
    

    而此时的observer.onNext(t);的onNext方法不再是自定义observer的onNext方法了,而是先调用MapObserver的onNext方法

      MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
                super(actual);
                this.mapper = mapper;
            }
    
            @Override
            public void onNext(T t) {
                if (done) {
                    return;
                }
    
                if (sourceMode != NONE) {
                    actual.onNext(null);
                    return;
                }
    
                U v;
    
                try {
                    v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
                } catch (Throwable ex) {
                    fail(ex);
                    return;
                }
                actual.onNext(v);
            }
    

    而这一句代码 v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");就是真正执行变化的代码,也就是我们自己重写的map的apply方法里,最后再调用actual.onNext(v);方法返回整个终点的onNext(v);,到此执行完整个事件流。还是和之前一样,我再用一张流程图总结下。


    image.png

    其实总结一下:RxJava整个事件流的流程就是将事件通过ObservableMap=>ObservableCreate=> CreateEmitter层层包装包装(你的业务需要几个操作符就几层包装),再通过CreateEmitter#onNext=>ObserverMap#onNext像洋葱一样一层层拆开包装,最后返回到终点响应事件也就是我们自定义的Observer的过程。这也是整个RxJava的核心思想,RxJava里所有的操作符的思路都是这样的执行流程。

    相关文章

      网友评论

        本文标题:谈谈对于响应式编程RxJava的理解 - 原理篇

        本文链接:https://www.haomeiwen.com/subject/fagstktx.html