RxJava Observable 使用和源码阅读

作者: 三流之路 | 来源:发表于2018-05-24 22:54 被阅读19次

    ReactiveX 系列文章目录


    implementation "io.reactivex.rxjava2:rxjava:2.1.9"
    

    Observable/Observer 的使用

    过去的 Observer 观察者回调有 onNext()、onComplete()、onError(),现在多了一个 onSubscribe(),刚开始调用,相当于 1.x 的 onStart(),参数是 Disposable,相当于 1.x 中的 Subscription,用于解除订阅。

    // 被观察者
    var observable = Observable.create(ObservableOnSubscribe<String> { emitter ->
        emitter.onNext("create message") // 通知观察者,调用其 onNext 方法
        emitter.onComplete()
    })
    
    // 观察者,和 1.x 相比多了个方法
    observerStr = object : Observer<String> {
      override fun onNext(t: String) {
          textView.text = "${textView.text} onNext $t\n"
      }
    
      override fun onError(e: Throwable) {
          textView.text = "${textView.text} onError\n"
      }
    
      override fun onComplete() {
          textView.text = "${textView.text} onComplete\n"
          disposable?.dispose() // 解除订阅
      }
    
      override fun onSubscribe(d: Disposable) {
          disposable = d
      }
    
    // 订阅
    observable.subscribe(observerStr)
    

    create 方法的参数和 1.x 不同,是

    public interface ObservableOnSubscribe<T> {
        void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
    }
    

    Consumer

    public interface Consumer<T> {
        /**
         * Consume the given value.
         * @param t the value
         * @throws Exception on error
         */
        void accept(T t) throws Exception;
    }
    

    Consumer 是更简单的观察者,只有一个 accept 方法,方法只有一个参数。比如

    // accept 依次收到被观察者发过来的 a 和 b
    val disposable: Disposable = Observable.fromArray("a","b").subscribe({
        textView.text = "${textView.text}\n $it "
    })
    

    此时方法返回值是 Disposable 对象,可用于解除订阅。

    源码分析

    Observable 实现了 ObservableSource,只有一个 subscribe 方法。

    先看如何创建一个被观察者的

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
      // source 为 null,抛出异常信息
      ObjectHelper.requireNonNull(source, "source is null");
      // 用参数 source 构造 ObservableCreate 对象
      return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
    

    通过 RxJavaPlugins 的 onAssembly 返回最后的 Observable。

    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        // 现在这情况,f 是 null,于是直接返回参数传进来的 source
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }
    

    在调用 create 时,最终返回的对象是 ObservableCreate,它内部有一个 source 属性,就是 create 的参数 ObservableOnSubscribe 对象,代表发射数据的源头。

    当有观察者订阅时,调用 subscribe 方法,重载方法有几个,Consumer 最后也是封装成一个 LambdaObserver,最终都是调到了下面的方法

    public final void subscribe(Observer<? super T> observer) {
        ...
        try {
            ...
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            ...
        }
    }
    

    主要的方法其实就是一句话 subscribeActual(observer),这是一个抽象方法,由不同的被观察者实现。在这里显示是 ObservableCreate,看它的 subscribeActual 方法。

    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
    
        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    

    CreateEmitter 是一个静态内部类,持有观察者 Observer 的引用,它实现了 Disposable,可用于解除订阅,然后立刻调用 observer.onSubscribe,这样外面的观察者第一个执行到的回调就是 onSubscribe,并且拿到了 Disposable 对象。

    然后就是 source.subscribe(parent),这个 source 是 ObservableOnSubscribe 对象,只有一个 subscribe 方法,现在调用这个 subscribe 方法,并且把 parent 传进去,返回去看 create 的参数。

    ObservableOnSubscribe<String> { emitter ->
       emitter.onNext("字符串消息")
       emitter.onComplete()
    }
    

    这个参数 emitter 就是 parent,subscribe 方法内部调用 onNext 之类的方法,看下 CreateEmitter 的实现。

    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {
    
        ...
    
        final Observer<? super T> observer;
    
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
    
        @Override
        public void onNext(T t) {
            ...
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
    
        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }
    
        @Override
        public boolean tryOnError(Throwable t) {
            ...
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }
    
        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }
    
        @Override
        public void setDisposable(Disposable d) {
            DisposableHelper.set(this, d);
        }
    
        ....
    
        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }
    
        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
    }
    

    可见各种方法,最后都是调用了 Observer 里对应的方法,在 complete 和 error 之后都会执行 dispose 方法

    总体看下来,就是一个普通的观察者模式,被观察者里持有观察者,然后调用观察者的方法使其收到回调,其实就和自己平时写监听一个意思,只是做了一些封装便于流式调用。

    1. Observable 的方法,创建了一个具体的 Observable 的实现类,其内部有一个属性 source,表示上游 Observable。
    2. Observer 订阅后,Observable 内部创建一个实现了 Dispoable 的对象,持有 Observer 的引用,然后让这个对象开始发射数据或事件。
    3. 发射的数据或事件最终都传递到 Observer 的对应的方法。

    相关文章

      网友评论

        本文标题:RxJava Observable 使用和源码阅读

        本文链接:https://www.haomeiwen.com/subject/qmpejftx.html