美文网首页
RxJava创建订阅流程

RxJava创建订阅流程

作者: wangsye | 来源:发表于2019-03-19 23:54 被阅读0次

    Observable创建和订阅流程

    这里的创建和订阅不考虑操作符操作。

     Observable<String> observable = Observable
                    .create(new ObservableOnSubscribe<String>() {
                        @Override
                        public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
                            emitter.onNext("hello");
                        }
                    });
    

    查看Observable的create方法:

     public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
            ObjectHelper.requireNonNull(source, "source is null");
            return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
        }
    

    返回 ObservableCreate对象,ObservableCreate的构造函数的参数为create方法传递的参数,参数类型为ObservableOnSubscribe。ObservableCreate保存该参数为source。
    ObservableCreate的构造方法如下:

        final ObservableOnSubscribe<T> source;
        public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
        }
    

    这样observable对象实际上就是一个ObservableCreate对象。

    subscribe方法调用如下:

     observable.subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    comDisposable.add(d);
                }
    
                @Override
                public void onNext(String s) {
                    Log.i("ThreadActivity", s);
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    

    subscribe方法的参数是一个Observer。执行了Observable的subscribe方法,subscribe方法代码如下:

        public final void subscribe(Observer<? super T> observer) {
            ObjectHelper.requireNonNull(observer, "observer is null");
            try {
                observer = RxJavaPlugins.onSubscribe(this, observer);
    
                ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
                subscribeActual(observer);
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                // can't call onError because no way to know if a Disposable has been set or not
                // can't call onSubscribe because the call might have set a Subscription already
                RxJavaPlugins.onError(e);
                NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
                npe.initCause(e);
                throw npe;
            }
        }
    

    真正起作用的是subscribeActual方法,由于Observable实际上是ObservableCreate对象,因此subscribeActual也就是调用ObservableCreate的subscribeActual方法。subscribeActual方法如下:

        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            observer.onSubscribe(parent);
            try {
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
    

    该方法中首先以observer为参数生成CreateEmitter对象,再调用obsever的onSubscribe方法,然后调用source的subscribe方法,而该source为Observable的create方法的参数。因此 source.subscribe(parent);实际上执行的是创建时参数的subscribe代码:

      public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
                            emitter.onNext("hello");
               }
    

    CreateEmitter的onNext方法定义如下:

     @Override
            public void onNext(T t) {
                if (t == null) {
                    onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                    return;
                }
                if (!isDisposed()) {
                    observer.onNext(t);
                }
            }
    

    实际上调用了observer的onNext方法,这里的observer就是Observable的subscribe方法传入的参数。

    subscribeOn,observeOn方法影响

    subscribeOn() : 影响的是最开始的被观察者(第一个被观察者)所在的线程。当使用多个subscribeOn() 的时候,只有第一个 subscribeOn() 起作用;

    observeOn() : 影响的是跟在后面的操作(指定观察者运行的线程)。所以如果想要多次改变线程,可以多次使用 observeOn;

    参考:
    https://www.jianshu.com/p/d9da64774f7b

    相关文章

      网友评论

          本文标题:RxJava创建订阅流程

          本文链接:https://www.haomeiwen.com/subject/yhsomqtx.html