美文网首页Android学习之旅
RxJava基本使用-源码解析(一)

RxJava基本使用-源码解析(一)

作者: pj0579 | 来源:发表于2019-10-26 11:36 被阅读0次

    最简单的使用方法是这样的

           // 被观察者发送事件  观察者响应事件
          Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                    emitter.onComplete();
                }
            }).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "subscribe");
                }
    
                @Override
                public void onNext(Integer value) {
                    Log.d(TAG, "" + value);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "error");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "complete");
                }
            });
    

    这边分为两个步骤
    1.create
    2.subscribe
    create

     public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
             // 判空操作
            ObjectHelper.requireNonNull(source, "source is null");
            // 继续包装
            return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
        }
     // 看下onAssembly方法 实际返回的就是ObservableCreate类对象
     public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
            Function<? super Observable, ? extends Observable> f = onObservableAssembly;
            if (f != null) {
                return apply(f, source);
            }
            return source;
        }
    

    由上面可以知道subscribeObservableOnSubscribe类的方法,在ObservableOnSubscribe类中找不到,很容易在Observable中找到subscribe的方法。

     public final void subscribe(Observer<? super T> observer) {
            // 判空操作  
            ObjectHelper.requireNonNull(observer, "observer is null");
            try {
                // 返回observer对象
                observer = RxJavaPlugins.onSubscribe(this, observer);
                ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
                subscribeActual(observer);
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                // can't call onError because no way to know if a Disposable has been set or not
                // can't call onSubscribe because the call might have set a Subscription already
                RxJavaPlugins.onError(e);
                NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
                npe.initCause(e);
                throw npe;
            }
        }
    

    subscribeActual(observer);是这段代码的关键 调用的是ObservableCreatesubscribeActual方法

     @Override
        protected void subscribeActual(Observer<? super T> observer) {
             // 实现ObservableEmitter<T>把这个对象传递到subscribe 通过这个对象发送事件
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            // 调用我们代码里写的onSubscribe方法
            observer.onSubscribe(parent);
    
            try {
                // 回到我们代码里写的subscribe方法
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
    

    到这里最简单的调用过程就结束了

    相关文章

      网友评论

        本文标题:RxJava基本使用-源码解析(一)

        本文链接:https://www.haomeiwen.com/subject/olbdvctx.html