美文网首页
RxJava2源码分析——订阅

RxJava2源码分析——订阅

作者: 谭嘉俊 | 来源:发表于2019-10-19 19:54 被阅读0次

    本文章主要是对RxJava2订阅流程进行源码分析,先说下我用的RxJavaRxAndroid版本,版本如下:

    implementation 'io.reactivex.rxjava2:rxjava:2.2.6'
    implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
    

    我们先写段示例代码,为了方便理解,我就不用上Lambda链式调用了,代码如下:

    // 创建被观察者
    Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) {
            Log.i("TanJiaJun", "subscribe");
    
            emitter.onNext("Tan");
            emitter.onNext("Jia");
            emitter.onNext("Jun");
            emitter.onComplete();
        }
    });
    
    // 创建观察者
    Observer<String> observer = new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.i("TanJiaJun", "onSubscribe");
        }
    
        @Override
        public void onNext(String s) {
            Log.i("TanJiaJun", "onNext:" + s);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.i("TanJiaJun", "onError");
        }
    
        @Override
        public void onComplete() {
            Log.i("TanJiaJun", "onComplete");
        }
    };
    
    // 订阅
    observable.subscribe(observer);
    

    分成三步:

    1. 创建被观察者(Observable)。
    2. 创建观察者(Observer)。
    3. 调用被观察者的subscribe方法,传入观察者,将两者进行关联并且订阅。

    源码分析

    我们先从subscribe方法入手,代码如下:

    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        // 判断observer是不是空
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);
    
            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
    
            // 调用子类的subscribeActual方法
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);
    
            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }
    

    看下RxJavaPlugins.onSubscribe方法,代码如下:

    @SuppressWarnings({ "rawtypes", "unchecked" })
    @NonNull
    public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
        BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
        if (f != null) {
            return apply(f, source, observer);
        }
        return observer;
    }
    

    注释说这个方法会调用关联的钩子函数(hook function),我们看到它会判断一下onObservableSubscribe是不是空,这个变量是通过setOnObservableSubscribe方法赋值的,代码如下:

    @SuppressWarnings("rawtypes")
    public static void setOnObservableSubscribe(
            @Nullable BiFunction<? super Observable, ? super Observer, ? extends Observer> onObservableSubscribe) {
        if (lockdown) {
            throw new IllegalStateException("Plugins can't be changed anymore");
        }
        RxJavaPlugins.onObservableSubscribe = onObservableSubscribe;
    }
    

    然而我们没有调用这个方法,所以这里空的,直接返回observer

    我们接着往下看,subscribeActual是个很重要的方法,它是个接口来的,Observable的子类都要去实现这个方法,接下来在讲创建被观察者的时候就会遇到。

    我们调用Observable.create方法,代码如下:

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
    

    RxJavaPlugins.onAssembly方法也是一个钩子函数,代码如下:

    @SuppressWarnings({ "rawtypes", "unchecked" })
    @NonNull
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }
    

    它会判断onObservableAssembly变量是不是空,这个变量是通过setOnObservableAssembly方法赋值的,代码如下:

    @SuppressWarnings("rawtypes")
    public static void setOnObservableAssembly(@Nullable Function<? super Observable, ? extends Observable> onObservableAssembly) {
        if (lockdown) {
            throw new IllegalStateException("Plugins can't be changed anymore");
        }
        RxJavaPlugins.onObservableAssembly = onObservableAssembly;
    }
    

    然而我们没有调用这个方法,所以我们直接看创建的ObservableCreate对象,要注意的点我都写上注释了,代码如下:

    public final class ObservableCreate<T> extends Observable<T> {
        final ObservableOnSubscribe<T> source;
    
        // source在我们的示例代码里是上游Observable对象(被观察者)
        public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            // 创建CreateEmitter对象,传入下游Observer对象(观察者)
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            // 调用下游Observer对象的onSubscribe方法,并且传入CreateEmitter对象
            observer.onSubscribe(parent);
    
            try {
                // 调用上游Observable对象的subscribe方法,并且传入CreateEmitter对象
                source.subscribe(parent);
                // 这里可以得出结论,先执行下游Observer的onSubscribe方法,然后执行上游Observable的subscribe方法
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                // 发射一个错误的事件
                parent.onError(ex);
            }
        }
    
        // 该类继承了AtomicReference,可以实现原子操作
        static final class CreateEmitter<T>
        extends AtomicReference<Disposable>
        implements ObservableEmitter<T>, Disposable {
    
    
            private static final long serialVersionUID = -3434801548987643227L;
    
            final Observer<? super T> observer;
    
            // 传入下游Observer对象
            CreateEmitter(Observer<? super T> observer) {
                this.observer = observer;
            }
    
            @Override
            public void onNext(T t) {
                // 在RxJava2.x版本中,onNext方法不能传null,否则抛出空指针异常
                if (t == null) {
                    onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                    return;
                }
                // 当isDisposed方法为false时,调用下游observe的onNext方法,并且传入对应的对象
                if (!isDisposed()) {
                    observer.onNext(t);
                }
            }
    
            @Override
            public void onError(Throwable t) {
                if (!tryOnError(t)) {
                    RxJavaPlugins.onError(t);
                }
            }
    
            @Override
            public boolean tryOnError(Throwable t) {
                // 在RxJava2.x版本中,onError方法不能串null,否则抛出空指针异常
                if (t == null) {
                    t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
                }
                // 当isDisposed方法返回false时,调用下游observer的onError方法,并且传入Throwable对象,然后调用dispose方法
                if (!isDisposed()) {
                    try {
                        observer.onError(t);
                    } finally {
                        dispose();
                    }
                    return true;
                }
                return false;
            }
    
            @Override
            public void onComplete() {
                // 当isDispoesed方法返回false时,调用下游observer的onComplete方法,然后调用dispose方法
                if (!isDisposed()) {
                    try {
                        observer.onComplete();
                    } finally {
                        dispose();
                    }
                }
            }
    
            @Override
            public void setDisposable(Disposable d) {
                DisposableHelper.set(this, d);
            }
    
            @Override
            public void setCancellable(Cancellable c) {
                setDisposable(new CancellableDisposable(c));
            }
    
            @Override
            public ObservableEmitter<T> serialize() {
                return new SerializedEmitter<T>(this);
            }
    
            @Override
            public void dispose() {
                DisposableHelper.dispose(this);
            }
    
            @Override
            public boolean isDisposed() {
                return DisposableHelper.isDisposed(get());
            }
    
            @Override
            public String toString() {
                return String.format("%s{%s}", getClass().getSimpleName(), super.toString());
            }
        }
    
        // 省略部分代码
    }
    

    传入的是ObservableOnSubscribe接口,里面有个带ObservableEmitter参数的subscribe方法,代码如下:

    public interface ObservableOnSubscribe<T> {
    
        void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
    
    }
    

    我们示例代码实现了这个方法,代码如下:

    Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) {
            Log.i("TanJiaJun","subscribe");
    
            emitter.onNext("Tan");
            emitter.onNext("Jia");
            emitter.onNext("Jun");
            emitter.onComplete();
        }
    });
    

    依次调用了ObservableEmitteronNext方法和onComplete方法,这里的ObservableEmitter实现类是CreateEmitter,代码如下:

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
    
        try {
            // parent是CreateEmitter
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    

    调用onNext方法和onComplete方法,实际上是调用了下游ObserveronNext方法和onComplete方法,代码如下:

    @Override
    public void onNext(T t) {
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        if (!isDisposed()) {
            // observer是下游观察者
            observer.onNext(t);
        }
    }
    
    @Override
    public void onComplete() {
        if (!isDisposed()) {
            try {
                // observer是下游观察者
                observer.onComplete();
            } finally {
                dispose();
            }
        }
    }
    

    也就是调用了我们示例代码中这些方法,代码如下:

    @Override
    public void onNext(String s) {
        Log.i("TanJiaJun",s);
    }
    
    @Override
    public void onComplete() {
        Log.i("TanJiaJun","onComplete");
    }
    

    总结一下,整个流程如下:

    1. 调用上游Observablesubscribe方法,并且传入下游Observer
    2. subscribe方法里面执行了Observable的子类ObservableCreatesubscribeActual方法,并且传入下游Observer
    3. subscribeActual方法里面会依次执行下游ObserveronSubscribe方法和ObservableOnSubscribesubscribe方法,从而完成整个订阅流程。
    4. 如果我们去发射事件,例如示例代码中调用ObservableEmitteronNext方法和onComplete方法,那么下游ObserveronNext方法和onComplete方法就会执行。

    我的GitHub:TanJiaJunBeyond

    Android通用框架:Android通用框架

    我的掘金:谭嘉俊

    我的简书:谭嘉俊

    我的CSDN:谭嘉俊

    相关文章

      网友评论

          本文标题:RxJava2源码分析——订阅

          本文链接:https://www.haomeiwen.com/subject/yfmnmctx.html