美文网首页
RxJava2 源码解析(一)

RxJava2 源码解析(一)

作者: 徘徊0_ | 来源:发表于2019-08-08 16:06 被阅读0次

    简述:本篇主要分析 Observable、Observer 产生(create)、关联(subscribe)、数据发送(emitter)的过程!

    下面这段代码,是常规的RxJava的操作,从这段代码入手分析:

            //1- create 一个 Observable
            Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    emitter.onNext("This is Create!");
                }
            }).subscribe(new Observer<String>() { //订阅一个 Observer
                @Override
                public void onSubscribe(Disposable d) {
                    
                }
    
                @Override
                public void onNext(String s) {
                    Log.d(TAG, "onNext: " + s);
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    

    一、Create 被观察者 Observable 过程

    1,被观察者Observable

    //抽象类,具体实现由子类来做
    public abstract class Observable<T> implements ObservableSource<T> {
     .....
    }
    

    2,接着看 Observable . Create 方法

            //1- create 一个 Observable
            Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    emitter.onNext("This is Create!");
                }
            })//....省略部分代码
    
    //*****************************分割线*********************************
    
      public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
            ObjectHelper.requireNonNull(source, "source is null");
            return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
        }
    

    这里需要注意:

    • create 方法的返回值是Observable
    • create 方法传入的参数为:ObservableOnSubscribe<T>
    • create 方法 return,返回的是RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); ,其实就是返回的new ObservableCreate<T>(source),并且将source作为参数,传了进去(重要)ObservableCreate这个类,稍后分析.

    3,上面create方法提到了3个重要的地方,ObservableOnSubscribe、ObservableCreate、RxJavaPlugins.onAssembly 继续往下看

    RxJavaPlugins.onAssembly 分析:;

    onAssembly方法.png
    这个方法需要一个Observable作为参数,上面示例代码中,直接new ObservableCreate<T>(source)作为参数,其中这个source也就是上面的ObservableOnSubscribe。这里又提到ObservableCreate这个类,顾名思义就是:被观察者生产类,继续往下看:

    ObservableOnSubscribe,需要注意里面的ObservableEmitter后面会分析,源码如下:

    public interface ObservableOnSubscribe<T> {
        /**
         * Called for each Observer that subscribes.
         * @param emitter the safe emitter instance, never null
         * @throws Exception on error
         */
        void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
    }
    

    ObservableCreate 分析:

    public final class ObservableCreate<T> extends Observable<T> {
        final ObservableOnSubscribe<T> source;
    
        //这里的 ObservableOnSubscribe 作为参数传递进来
        public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
        }
        //注意,这个方法
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            // 将观察者作为参数,传给 CreateEmitter
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            //将parent 传到了onSubscribe()
            observer.onSubscribe(parent);
    
            try {
                // 将ObservableOnSubscribe 和 CreateEmitter(也就是Observer) 关联
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
      //...........省略部分代码
    }
    

    注意

    • observer.onSubscribe(parent); 这里调用的是Observer-->void onSubscribe(@NonNull Disposable d);这个方法需要的参数类型为:Disposable , 这也就说明了,CreateEmitter 可以将Observer 阻断。
    • 上面的source是被观察者ObservableOnSubscribe<T> source,执行source.subscribe(parent);parentCreateEmitter,CreateEmitter中又有observer,也就是将被观察者观察者关联起来。

    接着具体看下CreateEmitter这个类:

    static final class CreateEmitter<T>
        extends AtomicReference<Disposable>
        implements ObservableEmitter<T>, Disposable {
    
            private static final long serialVersionUID = -3434801548987643227L;
    
            final Observer<? super T> observer;
    
            //******************1************************
            CreateEmitter(Observer<? super T> observer) {
                this.observer = observer;
            }
    
            @Override
            public void onNext(T t) {
                if (t == null) {
                    onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                    return;
                }
                //********************2***********************
                //如果没被阻断,调用观察者的onNext()方法
                if (!isDisposed()) {
                    observer.onNext(t);
                }
            }
    
            @Override
            public void onError(Throwable t) {
                if (!tryOnError(t)) {
                    RxJavaPlugins.onError(t);
                }
            }
    
            @Override
            public boolean tryOnError(Throwable t) {
                if (t == null) {
                    t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
                }
                if (!isDisposed()) {
                    try {
                        observer.onError(t);
                    } finally {
                        dispose();
                    }
                    return true;
                }
                return false;
            }
    
            @Override
            public void onComplete() {
                if (!isDisposed()) {
                    try {
                        observer.onComplete();
                    } finally {
                        dispose();
                    }
                }
            }
    
            //.... 省略部分代码
        }
    

    注意,上面标识的地方:

    • 1,CreateEmitter(生成发射器)这个类的构造器的参数是:Observer ,下面代码也可以看到,调用 发射器的onNext()等方法最终都是调用的Observer 中对应的方法
    • 2,onNext(); 等方法的逻辑,大致都是,先进行判空操作,如果没有被阻断(!isDisposed()) 就会调用Observer 中对应的方法。

    CreateEmitter 实现了 ObservableEmitter<T>, Disposable 接口

    public interface ObservableEmitter<T> extends Emitter<T> {
      //.... 省略代码....
    }
    

    这里ObservableEmitter又继承自Emitter,接下来看一下Emitter

    public interface Emitter<T> {
        /**
         * Signal a normal value.
         * @param value the value to signal, not null
         */
        void onNext(@NonNull T value);
    
        /**
         * Signal a Throwable exception.
         * @param error the Throwable to signal, not null
         */
        void onError(@NonNull Throwable error);
    
        /**
         * Signal a completion.
         */
        void onComplete();
    }
    

    Emitter: 这里面就是我们经常看到的三个方法!

    到这里也就说明了,为什么上面代码中emitter.onNext("This is Create!"); 会走到下面观察者的onNext()方法了。

    Create方法总结:

        public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
            ObjectHelper.requireNonNull(source, "source is null");
            return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
        }
    

    Observable.create() 方法需要一个ObservableOnSubscribe作为参数,然后又将这个ObservableOnSubscribe作为参数传给了new ObservableCreate<T>(source) , 并返回,这也就是生成的被观察者。

    二、接着看 Observable . subscribe() 订阅方法

    {@link Observable#subscribe()}这个方法主要是:被观察者订阅观察者:

        @SchedulerSupport(SchedulerSupport.NONE)
        @Override
        public final void subscribe(Observer<? super T> observer) {
            ObjectHelper.requireNonNull(observer, "observer is null");
            try {
                //可以忽略
                observer = RxJavaPlugins.onSubscribe(this, observer);
    
                ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
    
                //注意这个方法
                subscribeActual(observer);
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                // can't call onError because no way to know if a Disposable has been set or not
                // can't call onSubscribe because the call might have set a Subscription already
                RxJavaPlugins.onError(e);
    
                NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
                npe.initCause(e);
                throw npe;
            }
        }
    

    subscribeActual 分析:

    protected abstract void subscribeActual(Observer<? super T> observer);
    
    public final class ObservableCreate<T> extends Observable<T> {
        final ObservableOnSubscribe<T> source;
    
        //这里的 ObservableOnSubscribe 作为参数传递进来
        public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
        }
        //注意,这个方法
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            // 将观察者作为参数,传给 CreateEmitter
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            //将parent 传到了onSubscribe()
            observer.onSubscribe(parent);
    
            try {
                // 将ObservableOnSubscribe 和 CreateEmitter(也就是Observer) 关联
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
      //...........省略部分代码
    }
    

    注意:

    • subscribeActual(observer);,这个方法是一个抽象方法,由具体的类去实现,这里的具体类指的是上面的ObservableCreate(这是返回的Observable) , 所以这里的 subscribeActual(observer); 其实是调用的ObservableCreate - >subscribeActual ();方法

    三、观察者:Consumer 分析

    在被观察者订阅观察者的时候,可以发现有好几个重载方法,上面分析了Observer

    订阅方法.png

    但是Observer需要实现所有的方法,如果只需要onNext()、onError(); 就需要使用到Consumer这个接口类:

    public interface Consumer<T> {
        /**
         * Consume the given value.
         * @param t the value
         * @throws Exception on error
         */
        void accept(T t) throws Exception;
    }
    

    如果只需要关注onNext()方法,可以调用Observable的这个调用方法,参数为Consumber

        @CheckReturnValue
        @SchedulerSupport(SchedulerSupport.NONE)
        public final Disposable subscribe(Consumer<? super T> onNext) {
            return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
        }
    

    继续跟到subscribe(Consumer<? super T> onNext) ;方法:

    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
                Action onComplete, Consumer<? super Disposable> onSubscribe) {
            //一系列的判空操作
            ObjectHelper.requireNonNull(onNext, "onNext is null");
            ObjectHelper.requireNonNull(onError, "onError is null");
            ObjectHelper.requireNonNull(onComplete, "onComplete is null");
            ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
    
            //重要!!!
            LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
    
            //这里就是调用具体的实现类的方法,和上面的流程一样
            subscribe(ls);
    
            return ls;
        }
    

    这里主要是LambdaObserver这个类,可以看出,这个类也是一个观察者:

    public final class LambdaObserver<T> extends AtomicReference<Disposable>
            implements Observer<T>, Disposable, LambdaConsumerIntrospection {
    
        private static final long serialVersionUID = -7251123623727029452L;
        final Consumer<? super T> onNext;
        final Consumer<? super Throwable> onError;
        final Action onComplete;
        final Consumer<? super Disposable> onSubscribe;
    
        //构造方法接收三个参数,对应 next,error,complete
        public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
                Action onComplete,
                Consumer<? super Disposable> onSubscribe) {
            super();
            this.onNext = onNext;
            this.onError = onError;
            this.onComplete = onComplete;
            this.onSubscribe = onSubscribe;
        }
    
        @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.setOnce(this, d)) {
                try {
                    onSubscribe.accept(this);
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    d.dispose();
                    onError(ex);
                }
            }
        }
    
        @Override
        public void onNext(T t) {
            if (!isDisposed()) {
                try {
                    onNext.accept(t);
                } catch (Throwable e) {
                    Exceptions.throwIfFatal(e);
                    get().dispose();
                    onError(e);
                }
            }
        }
    
        @Override
        public void onError(Throwable t) {
            if (!isDisposed()) {
                lazySet(DisposableHelper.DISPOSED);
                try {
                    onError.accept(t);
                } catch (Throwable e) {
                    Exceptions.throwIfFatal(e);
                    RxJavaPlugins.onError(new CompositeException(t, e));
                }
            } else {
                RxJavaPlugins.onError(t);
            }
        }
    
        @Override
        public void onComplete() {
            if (!isDisposed()) {
                lazySet(DisposableHelper.DISPOSED);
                try {
                    onComplete.run();
                } catch (Throwable e) {
                    Exceptions.throwIfFatal(e);
                    RxJavaPlugins.onError(e);
                }
            }
        }
    
        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }
    
        @Override
        public boolean isDisposed() {
            return get() == DisposableHelper.DISPOSED;
        }
    
        @Override
        public boolean hasCustomOnError() {
            return onError != Functions.ON_ERROR_MISSING;
        }
    }
    

    LambdaObserver这个类也是一个Observer和之前的观察者一样,都包含next、error、complete这几个方法,也是将LambdaObserver传入到ObservableCreate 与被观察者产生关联。

    这里可以只传入我们关心的回调,例如上面只传入了onNext()的回调,原因如下:

     public final Disposable subscribe(Consumer<? super T> onNext) {
            return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
        }
    

    这里的Functions.ON_ERROR_MISSING其实也是一个Consumer,可以理解为一个占位,例如当error回调的时候,我们并没有实现,他会回调到下面这个默认实现中去:

    public static final Consumer<Throwable> ON_ERROR_MISSING = new OnErrorMissingConsumer();
    
    //---------------------------------------------------------
      static final class OnErrorMissingConsumer implements Consumer<Throwable> {
            @Override
            public void accept(Throwable error) {
                RxJavaPlugins.onError(new OnErrorNotImplementedException(error));
            }
        }
    
    

    相关文章

      网友评论

          本文标题:RxJava2 源码解析(一)

          本文链接:https://www.haomeiwen.com/subject/bvotjctx.html