RxJava Observable 使用和源码阅读

作者: 三流之路 | 来源:发表于2018-05-24 22:54 被阅读19次

ReactiveX 系列文章目录


implementation "io.reactivex.rxjava2:rxjava:2.1.9"

Observable/Observer 的使用

过去的 Observer 观察者回调有 onNext()、onComplete()、onError(),现在多了一个 onSubscribe(),刚开始调用,相当于 1.x 的 onStart(),参数是 Disposable,相当于 1.x 中的 Subscription,用于解除订阅。

// 被观察者
var observable = Observable.create(ObservableOnSubscribe<String> { emitter ->
    emitter.onNext("create message") // 通知观察者,调用其 onNext 方法
    emitter.onComplete()
})

// 观察者,和 1.x 相比多了个方法
observerStr = object : Observer<String> {
  override fun onNext(t: String) {
      textView.text = "${textView.text} onNext $t\n"
  }

  override fun onError(e: Throwable) {
      textView.text = "${textView.text} onError\n"
  }

  override fun onComplete() {
      textView.text = "${textView.text} onComplete\n"
      disposable?.dispose() // 解除订阅
  }

  override fun onSubscribe(d: Disposable) {
      disposable = d
  }

// 订阅
observable.subscribe(observerStr)

create 方法的参数和 1.x 不同,是

public interface ObservableOnSubscribe<T> {
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}

Consumer

public interface Consumer<T> {
    /**
     * Consume the given value.
     * @param t the value
     * @throws Exception on error
     */
    void accept(T t) throws Exception;
}

Consumer 是更简单的观察者,只有一个 accept 方法,方法只有一个参数。比如

// accept 依次收到被观察者发过来的 a 和 b
val disposable: Disposable = Observable.fromArray("a","b").subscribe({
    textView.text = "${textView.text}\n $it "
})

此时方法返回值是 Disposable 对象,可用于解除订阅。

源码分析

Observable 实现了 ObservableSource,只有一个 subscribe 方法。

先看如何创建一个被观察者的

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
  // source 为 null,抛出异常信息
  ObjectHelper.requireNonNull(source, "source is null");
  // 用参数 source 构造 ObservableCreate 对象
  return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

通过 RxJavaPlugins 的 onAssembly 返回最后的 Observable。

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    // 现在这情况,f 是 null,于是直接返回参数传进来的 source
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}

在调用 create 时,最终返回的对象是 ObservableCreate,它内部有一个 source 属性,就是 create 的参数 ObservableOnSubscribe 对象,代表发射数据的源头。

当有观察者订阅时,调用 subscribe 方法,重载方法有几个,Consumer 最后也是封装成一个 LambdaObserver,最终都是调到了下面的方法

public final void subscribe(Observer<? super T> observer) {
    ...
    try {
        ...
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        ...
    }
}

主要的方法其实就是一句话 subscribeActual(observer),这是一个抽象方法,由不同的被观察者实现。在这里显示是 ObservableCreate,看它的 subscribeActual 方法。

protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);

    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

CreateEmitter 是一个静态内部类,持有观察者 Observer 的引用,它实现了 Disposable,可用于解除订阅,然后立刻调用 observer.onSubscribe,这样外面的观察者第一个执行到的回调就是 onSubscribe,并且拿到了 Disposable 对象。

然后就是 source.subscribe(parent),这个 source 是 ObservableOnSubscribe 对象,只有一个 subscribe 方法,现在调用这个 subscribe 方法,并且把 parent 传进去,返回去看 create 的参数。

ObservableOnSubscribe<String> { emitter ->
   emitter.onNext("字符串消息")
   emitter.onComplete()
}

这个参数 emitter 就是 parent,subscribe 方法内部调用 onNext 之类的方法,看下 CreateEmitter 的实现。

static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {

    ...

    final Observer<? super T> observer;

    CreateEmitter(Observer<? super T> observer) {
        this.observer = observer;
    }

    @Override
    public void onNext(T t) {
        ...
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }

    @Override
    public void onError(Throwable t) {
        if (!tryOnError(t)) {
            RxJavaPlugins.onError(t);
        }
    }

    @Override
    public boolean tryOnError(Throwable t) {
        ...
        if (!isDisposed()) {
            try {
                observer.onError(t);
            } finally {
                dispose();
            }
            return true;
        }
        return false;
    }

    @Override
    public void onComplete() {
        if (!isDisposed()) {
            try {
                observer.onComplete();
            } finally {
                dispose();
            }
        }
    }

    @Override
    public void setDisposable(Disposable d) {
        DisposableHelper.set(this, d);
    }

    ....

    @Override
    public void dispose() {
        DisposableHelper.dispose(this);
    }

    @Override
    public boolean isDisposed() {
        return DisposableHelper.isDisposed(get());
    }
}

可见各种方法,最后都是调用了 Observer 里对应的方法,在 complete 和 error 之后都会执行 dispose 方法

总体看下来,就是一个普通的观察者模式,被观察者里持有观察者,然后调用观察者的方法使其收到回调,其实就和自己平时写监听一个意思,只是做了一些封装便于流式调用。

  1. Observable 的方法,创建了一个具体的 Observable 的实现类,其内部有一个属性 source,表示上游 Observable。
  2. Observer 订阅后,Observable 内部创建一个实现了 Dispoable 的对象,持有 Observer 的引用,然后让这个对象开始发射数据或事件。
  3. 发射的数据或事件最终都传递到 Observer 的对应的方法。

相关文章

  • RX 系列目录

    RxJava Observable 使用和源码阅读 RxJava Single Completable Maybe...

  • RxJava Observable 使用和源码阅读

    ReactiveX 系列文章目录 Observable/Observer 的使用 过去的 Observer 观察者...

  • 学习RxJava之Java的泛型

    在RxJava的代码中,使用了大量的泛型,如果不熟悉的话,阅读源码非常的困难,比如非常核心的Observable类...

  • RxJava源码解析

    RxJava源码解析 一,简单使用 目标: 被观察者 Observable 如何生产事件的? 被观察者 Obser...

  • RxJava2 使用 及 源码阅读

    RxJava2 使用 及 源码阅读 RxJava是什么?根据RxJava在GitHub上给出的描述:RxJava ...

  • # RxJava 源码分析

    引言 简单阐述RxJava流程源码,RxJava有以下三种流程,向下递增。 Observable->Observe...

  • RxJava源码解析:Observable

    本文主要目的是梳理Rxjava的核心调用流程,主要涉及以下3个类 核心类:Observable 调用者传入的sub...

  • RxJava2源码初探-整体设计

    RxJava2源码初探-整体设计 首先简单介绍Rxjava2 的四个基本的概念 Observable (可观察者,...

  • RxJava深入

    今天在公司做了一个RxJava分享, 为了准备这个分享, 阅读了RxJava的源码, 对于Rxjava的使用也有了...

  • JAVA之RxJava

    RxJava概述 RxJava的基本使用 RxJava有三个基本的元素: 被观察者(Observable) 观察者...

网友评论

    本文标题:RxJava Observable 使用和源码阅读

    本文链接:https://www.haomeiwen.com/subject/qmpejftx.html