首先简单介绍Rxjava2 的四个基本的概念
Observable (可观察者,即被观察者)
Observer (观察者)
subscribe (订阅) 通过该方法,将 Observable 与 Observer 关联起来
事件 (包括 onNext,onComplete,onError 等事件)
Observable
/**
* The Observable class is the non-backpressured, optionally multi-valued base reactive class that
* offers factory methods, intermediate operators and the ability to consume synchronous
* and/or asynchronous reactive dataflows.
* <p>
* Many operators in the class accept {@code ObservableSource}(s), the base reactive interface
* for such non-backpressured flows, which {@code Observable} itself implements as well.
* <p>
* The Observable's operators, by default, run with a buffer size of 128 elements (see {@link Flowable#bufferSize()},
* that can be overridden globally via the system parameter {@code rx2.buffer-size}. Most operators, however, have
* overloads that allow setting their internal buffer size explicitly.
* <p>
* The documentation for this class makes use of marble diagrams. The following legend explains these diagrams:
* <p>
* <img width="640" height="317" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/legend.png" alt="">
* <p>
* The design of this class was derived from the
* <a href="https://github.com/reactive-streams/reactive-streams-jvm">Reactive-Streams design and specification</a>
* by removing any backpressure-related infrastructure and implementation detail, replacing the
* {@code org.reactivestreams.Subscription} with {@link Disposable} as the primary means to dispose of
* a flow.
* <p>
* The {@code Observable} follows the protocol
* <pre><code>
* onSubscribe onNext* (onError | onComplete)?
* </code></pre>
* where
* the stream can be disposed through the {@code Disposable} instance provided to consumers through
* {@code Observer.onSubscribe}.
* <p>
* Unlike the {@code Observable} of version 1.x, {@link #subscribe(Observer)} does not allow external disposal
* of a subscription and the {@code Observer} instance is expected to expose such capability.
* <p>Example:
* <pre><code>
* Disposable d = Observable.just("Hello world!")
* .delay(1, TimeUnit.SECONDS)
* .subscribeWith(new DisposableObserver<String>() {
* @Override public void onStart() {
* System.out.println("Start!");
* }
* @Override public void onNext(String t) {
* System.out.println(t);
* }
* @Override public void onError(Throwable t) {
* t.printStackTrace();
* }
* @Override public void onComplete() {
* System.out.println("Done!");
* }
* });
*
* Thread.sleep(500);
* // the sequence can now be disposed via dispose()
* d.dispose();
* </code></pre>
*
* @param <T>
* the type of the items emitted by the Observable
* @see Flowable
* @see io.reactivex.observers.DisposableObserver
*/
public abstract class Observable<T> implements ObservableSource<T> {
...
}
Observer
Observer 其实也是一个接口,里面定义了若干方法,onSubscribe ,onNext,onError,onComplete 方法。
public interface Observer<T> {
/**
* Provides the Observer with the means of cancelling (disposing) the
* connection (channel) with the Observable in both
* synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
* @param d the Disposable instance whose {@link Disposable#dispose()} can
* be called anytime to cancel the connection
* @since 2.0
*/
void onSubscribe(@NonNull Disposable d);
/**
* Provides the Observer with a new item to observe.
* <p>
* The {@link Observable} may call this method 0 or more times.
* <p>
* The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
* {@link #onError}.
*
* @param t
* the item emitted by the Observable
*/
void onNext(@NonNull T t);
/**
* Notifies the Observer that the {@link Observable} has experienced an error condition.
* <p>
* If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
* {@link #onComplete}.
*
* @param e
* the exception encountered by the Observable
*/
void onError(@NonNull Throwable e);
/**
* Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
* <p>
* The {@link Observable} will not call this method if it calls {@link #onError}.
*/
void onComplete();
}
一个正常的事件序列的调用顺序会是这样的 onSubscribe > onNext > onComplete,若中途出错了,那调用顺序可能是这样的 onSubscribe > onNext > onError
onSubscribe 方法,当我们调用 Observable 的 subscribe 方法的时候,会先回调 Observer 的 onSubscribe 方法,此方法的调用顺序先于 onNext,onError ,onComplete 方法。
onError 方法与 onComplete 方法可以说是互斥的,调用了其中一个方法就不会调用另外一个方法
基本使用
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("z");
emitter.onNext("h");
emitter.onNext("u");
emitter.onComplete();
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("TAG", "onSubscribe(): ");
}
@Override
public void onNext(String s) {
Log.e("TAG", "onNext(): " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.e("TAG", "onComplete(): ");
}
});
先来看 Observable 的 create 方法
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
在 create 方法中,其实很简单,只是对 source 进行判空处理,并将 source 用 ObservableCreate 包装起来,并返回回去。下面让我们一起来看一下 ObservableCreate 是什么东西?
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
ObservableCreate 其实也很简单,它是 Observable 的子类,持有了上游 source 的引用,并重写 subscribeActual 方法。
接下来我们来看重点了,即 Observable 的 subscribe 方法,在该方法中,他会将 Observalble 与 observer 关联起来。
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
// 检查 observer 是否为 null,为 null 抛出异常
ObjectHelper.requireNonNull(observer, "observer is null");
try {
// RxJavaPlugins 插件的,暂时不管
observer = RxJavaPlugins.onSubscribe(this, observer);
// 检查 observer 是否为 null,为 null 抛出异常
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
subscribe 方法也比较简单,大概可以分为以下两步:
首先检查 observer 是否为空,为 null 抛出异常
第二步,调用 subscribeActual 方法,而我们知道在 Observable 类中 subscribeActual 是抽象方法,因此,我们只需要关注其实现类的 subscribeActual 方法。从上面的分析,我们知道,当我们调用 Observable create(ObservableOnSubscribe source) 方法的时候,最终会返回 ObservableCreate 实例。因此,我们只需要关注 ObservableCreate 的 subscribeActual 方法
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
----
}
ObservableCreate 的核心代码主要也只有几行,source 是上游 ObservableOnSubscribe 的引用,而 CreateEmitter 这个类,它是 ObservableCreate 的一个静态内部类,实现了 ObservableEmitter,Disposable 接口 它持有 observer 的引用,当我们调用 CreateEmitter 的 next 方法的时候,它会判断当前的 CreateEmitter 有没有被 dispose 掉,如果没有,调用他持有的 observer 的 onNext 方法, 同理 onComplete 方法一一样,只不过执行完 onComplete 方法的时候,还会执行 dispose 方法,dispose 当前的 CreateEmitter。(dispose 方法这里先记住以下,下面会讲到)
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
@Override
public void setDisposable(Disposable d) {
DisposableHelper.set(this, d);
}
@Override
public void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}
@Override
public ObservableEmitter<T> serialize() {
return new SerializedEmitter<T>(this);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
好,看完上面的代码,我们回到 ObservableCreate 的 subscribeActual 方法,我们调用 observer.onSubscribe 方法的时候,会将 parent 对象作为方法参数暴露出去(而这个 parent 正是我们的 CreateEmitter,通过 CreateEmitter 的 dispose 方法可以取消订阅关系)。接着,当我们调用 source.subscribe(parent) 的时候,会调用 ObservableOnSubscribe 的 subscribe 方法。
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
因此,在我们上面的例子中,若不出错,调用顺序
Observable subcrible > Observable subscribeActual > ObservableCreate subscribeActual > observer.onSubscribe > ObservableOnSubscribe subscribe(emitter 是 CreateEmitter 的实例,包装了 observer,调用 emitter 的相应方法 ,会进而调用 observer 的 onNext onComplete 方法,而不会调用 onError 方法)
若在调用 onNext 方法的过程中出错,那调用顺序可能是这样的
Observable subcrible > Observable subscribeActual > ObservableCreate subscribeActual > observer.onSubscribe > ObservableOnSubscribe subscribe(@NonNull ObservableEmitter emitter)
(emitter 是 CreateEmitter 的实例,包装了 observer,调用 emitter 的相应方法 ,会进而调用 observer 的 onNext onError 方法,而不会调用 onComplete 方法 )
observable 与 Observer 是如何取消订阅关系的
在上面讲解的时候,其实我们已经有提到 CreateEmitter 的 dispose 方法,该方法就是用来取消订阅关系的。
假设这样一个场景,当我们收到的 value 的值大于等于 2 的时候,这个时候认为是异常的,解决两者之间的订阅关系
Observable<Integer> observable=Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
});
Observer<Integer> observer = new Observer<Integer>() {
private Disposable disposable;
@Override
public void onSubscribe(Disposable d) {
disposable = d;
}
@Override
public void onNext(Integer value) {
Log.d("zhxh", value.toString());
if (value >=2) { // >=2 时为异常数据,解除订阅
disposable.dispose();
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
observable.subscribe(observer); //建立订阅关系
总结
Rxjava 的原理其实不难,Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer,并且回调 Observer 的相应的方法。
用一张简单的流程图描述如下:
RxJava2整体设计参考
网友评论