美文网首页
RxJava 2.0 源码分析一(RxJava的订阅流程)

RxJava 2.0 源码分析一(RxJava的订阅流程)

作者: zl_adams | 来源:发表于2020-12-03 10:52 被阅读0次

    RxJava框架内部采用观察者模式,基于事件流的链式调用、逻辑简洁、使用简单,在Android开发中被广泛的使用。

    简单用例:
           //创建被观察者
          Observable observable = Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                    e.onNext(0);
                    e.onNext(1);
                    e.onNext(2);
                    e.onNext(3);
                }
            });
            //创建被观察者
            Observer observer = new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Integer integer) {
    
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            };
            //绑定观察者和被观察者
            observable.subscribe(observer);
    
    1. 创建被观察者。
    2. 创建观察者。
    3. 观察者订阅被观察者,连接双方。

    一、创建被观察者

    首先需要调用Observable的create()方法创建Observable对象。

    //Observable.java
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
            ObjectHelper.requireNonNull(source, "source is null");
            return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
        }
    

    Observable的create方法需要传递参数ObservableOnSubscribe。ObservableOnSubscribe是一个接口。

    public interface ObservableOnSubscribe<T> {
        void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
    }
    

    而ObservableOnSubscribe接口中的subscribe方法参数是ObservableEmitter, ObservableEmitter继承了Emitter接口。

    public interface ObservableEmitter<T> extends Emitter<T> {
    
        void setDisposable(@Nullable Disposable d);
    
        void setCancellable(@Nullable Cancellable c);
    
        boolean isDisposed();
    
        @NonNull
        ObservableEmitter<T> serialize();
    
        @Experimental
        boolean tryOnError(@NonNull Throwable t);
    }
    
    
    public interface Emitter<T> {
    
        void onNext(@NonNull T value);  
    
        void onError(@NonNull Throwable error);
    
        void onComplete();
    }
    

    Emitter的英文翻译是发射器,在这里我们可以理解为事件的生产者,调用Emitter#onNext()方法发送事件给观察者。Emitter的onNext()、onError()、onComplete()与观察者Observer的onNext()、onError()、onComplete()方法一一对应。

    Observable#create()

    再回到Observable.create()方法
    这里要说一个Rxjava操作符的套路,Observable.create()会返回ObservableCreate对象,返回对象就是类+方法名,比如Observable.zip()方法就会返回ObservableZip对象。

    //Observable.java
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
            ObjectHelper.requireNonNull(source, "source is null");
            return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
        }
    

    新建ObservableCreate对象,然后调用RxJavaPlugins.onAssembly()方法返回Observable类型参数。

    public final class ObservableCreate<T> extends Observable<T> {
        final ObservableOnSubscribe<T> source;
    
        public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
        }
    }
    
    1. ObservableCreate构造函数需要传递一个ObservableOnSubscribe对象,ObservableCreate内部保存ObservableOnSubscribe为全局变量source。
    2. ObservableCreate继承了Observable,创建ObservableCreate对象就是创建了Observable对象。

    RxJavaPlugins#onAssembly()

        @NonNull
        public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
            //onObservableAssembly默认为空
            Function<? super Observable, ? extends Observable> f = onObservableAssembly;
            if (f != null) {
                return apply(f, source);
            }
            return source;
        }
    

    RxJavaPlugins中的onObservableAssembly变量默认为空,所以onAssembly()方法是直接把参数source返回出去。

    小结:创建被观察者主要经过两个步骤;
    (1) 创建ObservableOnSubscribe实例对象;
    (2) 把ObservableOnSubscribe对象传递给ObservableCreate的构造方法,创建ObservableCreate对象,把ObservableOnSubscribe对象作为全局变量source保存起来。
    (3)把第二步创建的ObservableCreate对象返回出去。

    二、创建观察者

    public interface Observer<T> {
    
        void onSubscribe(@NonNull Disposable d);
    
        void onNext(@NonNull T t);
    
        void onError(@NonNull Throwable e);
    
        void onComplete();
    
    }
    

    Observer只是一个接口,创建观察者就是创建Observer的实例。

    三、让观察者订阅被观察者,连接双方。

    observable.subscribe(observer);
    

    1. 调用Observable的subscribe()方法

        //Observable.java
        @SchedulerSupport(SchedulerSupport.NONE)
        @Override
        public final void subscribe(Observer<? super T> observer) {
            //非空判断
            ObjectHelper.requireNonNull(observer, "observer is null");
            try {
                ...
                subscribeActual(observer);
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                // can't call onError because no way to know if a Disposable has been set or not
                // can't call onSubscribe because the call might have set a Subscription already
                RxJavaPlugins.onError(e);
    
                NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
                npe.initCause(e);
                throw npe;
            }
        }
    

    Observable.subscribe()方法也有一个套路,subscribe()最后都会调用Observable的派生类的subscribeActual()方法。

    2. 调用subscribeActual(observer)方法。
    subscribeActual()是Obserable的抽象方法,而上文有讲到创建Obserable.create最后得到ObservableCreate对象。所以subscribeActual()会调用ObservableCreate对象的subscribeActual()方法。

        //ObservableCreate.java
        @Override
        protected void subscribeActual(Observer<? super T> observer) 
            //创建CreateEmitter实例对象
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            //回调observer的onSubscribe()方法
            observer.onSubscribe(parent);
    
            try {
                //回调ObservableOnSubscribe的subscribe()方法
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
    

    2.1 首先通过传递observer参数创建CreateEmitter的实例。看一下CreateEmitter这个类

    static final class CreateEmitter<T>extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
    
            final Observer<? super T> observer;
    
            CreateEmitter(Observer<? super T> observer) {
                this.observer = observer;
            }
    
            @Override
            public void onNext(T t) {
                ...
                if (!isDisposed()) {
                    observer.onNext(t);//回调observer的onNext()方法
                }
            }
    
            @Override
            public void onError(Throwable t) {
                //调用tryOnError回调observer的onError()方法
                if (!tryOnError(t)) {
                    RxJavaPlugins.onError(t);
                }
            }
    
            @Override
            public boolean tryOnError(Throwable t) {
                ...
                if (!isDisposed()) {
                    try {
                        observer.onError(t);//回调observer的onError()方法
                    } finally {
                        dispose();
                    }
                    return true;
                }
                return false;
            }
    
            @Override
            public void onComplete() {
                if (!isDisposed()) {
                    try {
                        observer.onComplete();;//回调observer的onComplete()方法
                    } finally {
                        dispose();
                    }
                }
            }
    
    }
    

    observer传给CreateEmitter之后,赋值给全局变量observer。CreateEmitter继承了ObservableEmitter接口,是ObservableEmitter的实现,CreateEmitter中onNext()、onError()、onComplete()方法被调用时会调用观察者observer的同名方法。

    2.2 调用observer的onSubscribe方法,说明observable.subscribe(observer)方法执行时,观察者observer的onSubscribe()方法就会马上被回调。

    2.3 执行source.subscribe(parent)方法,source为ObservableOnSubscribe的实例,parent为CreateEmitter的实例,那么ObservableOnSubscribe的subscribe()方法会被回调。

           Observable observable = Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                    e.onNext(0);
                    e.onNext(1);
                    e.onNext(2);
                    e.onNext(3);
                }
            });
    

    source.subscribe(parent)方法的执行,会通过调用ObservableEmitter的e变量执行onNext方法,也就是执行了CreateEmitter的onNext()方法,CreateEmitter的onNext方法内部会调用observer.onNext()方法。

    到这里,RxJava的订阅流程就分析完成了。

    总结:
    1、创建ObservableOnSubscribe对象。
    2、把ObservableOnSubscribe对象传递给ObservableCreate的构造方法,创建ObservableCreate对象(即Obserable对象),把ObservableOnSubscribe对象作为全局变量source保存起来。
    3、创建Observer对象。
    4、执行observable.subscribe(observer)连接观察者和被观察者,会调用ObservableCreate的subscribeActual方法。
    5、把Observer对象传递给CreateEmitter的构造方法,创建CreateEmitter对象。
    6、回调observer的onSubscribe()方法。
    7、回调ObservableOnSubscribe的subscribe(ObservableEmitter<T> e)方法,参数为CreateEmitter对象;那么 e.onNext(0)方法就会调用CreateEmitter中的onNext()方法,然后回调observer的onNext()方法。

    Rxjava订阅流程.png

    相关文章

      网友评论

          本文标题:RxJava 2.0 源码分析一(RxJava的订阅流程)

          本文链接:https://www.haomeiwen.com/subject/fwluihtx.html