1 简单使用步骤
1、创建被观察者(Observable),定义要发送的事件。
2、创建观察者(Observer),接受事件并做出响应操作。
3、观察者通过订阅(subscribe)被观察者把它们连接到一起。
2 RxJava的消息订阅例子
//步骤1. 创建被观察者(Observable),定义要发送的事件。
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("文章1");
emitter.onNext("文章2");
emitter.onNext("文章3");
emitter.onComplete();
}
});
//步骤2. 创建观察者(Observer),接受事件并做出响应操作。
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext : " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError : " + e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
//步骤3. 观察者通过订阅(subscribe)被观察者把它们连接到一起。
observable.subscribe(observer);
其输出结果为:
onSubscribe
onNext : 文章1
onNext : 文章2
onNext : 文章3
onComplete
3 源码分析
3.1 创建被观察者过程
首先来看下创建被观察者(Observable)的过程,上面的例子中我们是直接使用Observable.create()来创建Observable
3.1.1 Observable类的create()
创建一个ObservableCreate对象出来,然后把我们自定义的ObservableOnSubscribe作为参数传到ObservableCreate中去,最后就是调用 RxJavaPlugins.onAssembly()方法。
3.1.2 ObservableCreate类
public final class ObservableCreate<T> extends Observable<T> {//继承自Observable
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;//把我们创建的ObservableOnSubscribe对象赋值给source。
}
}
可以看到,ObservableCreate是继承自Observable的,并且会把ObservableOnSubscribe对象给存起来
。
3.1.3 RxJavaPlugins类的onAssembly()
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
//省略无关代码
return source;
}
很简单,就是把上面创建的ObservableCreate给返回。
3.1.4 简单总结
Observable.create()中就是把我们自定义的ObservableOnSubscribe对象重新包装成一个ObservableCreate对象,然后返回这个ObservableCreate对象。

3.1.5
Observable.create()的时序图如下所示:

3.2 订阅过程
3.2.1 Observable类的subscribe()
public final void subscribe(Observer<? super T> observer) {
//省略无关代码
observer = RxJavaPlugins.onSubscribe(this, observer);
subscribeActual(observer);
//省略无关代码
}
可以看到,实际上其核心的代码也就两句,我们分开来看下:
3.2.2 RxJavaPlugins类的onSubscribe()
public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
//省略无关代码
return observer;
}
跟之前代码一样,这里同样也是把原来的observer返回而已。
再来看下subscribeActual()方法。
3.2.3 Observable类的subscribeActual()
protected abstract void subscribeActual(Observer<? super T> observer);
Observable类的subscribeActual()中的方法是一个抽象方法,那么其具体实现在哪呢?还记得我们前面创建被观察者的过程吗,最终会返回一个ObservableCreate对象,这个ObservableCreate就是Observable的子类,我们点进去看下:
3.2.4 ObservableCreate类的subscribeActual()
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//触发我们自定义的Observer的onSubscribe(Disposable)方法
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
可以看到,subscribeActual()方法中首先会创建一个CreateEmitter对象,然后把我们自定义的观察者observer作为参数给传进去。这里同样也是包装起来
这个CreateEmitter实现了ObservableEmitter接口和Disposable接口,如下:

这个CreateEmitter实现了ObservableEmitter接口和Disposable接口,如下:
然后就是调用了observer.onSubscribe(parent),实际上就是调用观察者的onSubscribe()方法,即告诉观察者已经成功订阅到了被观察者。
继续往下看,subscribeActual()方法中会继续调用source.subscribe(parent),这里的source就是ObservableOnSubscribe对象,即这里会调用ObservableOnSubscribe的subscribe()方法。
我们具体定义的subscribe()方法如下:
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("文章1");
emitter.onNext("文章2");
emitter.onNext("文章3");
emitter.onComplete();
}
});
ObservableEmitter,顾名思义,就是被观察者发射器。
所以,subscribe()里面的三个onNext()方法和一个onComplete()会逐一被调用。
3.2.5 CreateEmitter类的onNext()和onComplete()等
//省略其他代码
@Override
public void onNext(T t) {
//省略无关代码
if (!isDisposed()) {
//调用观察者的onNext()
observer.onNext(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
//调用观察者的onComplete()
observer.onComplete();
} finally {
dispose();
}
}
}
可以看到,最终就是会调用到观察者的onNext()和onComplete()方法。
可以看到,上面有个isDisposed()方法能控制消息的走向,即能够切断消息的传递,这个后面再来说。
3.2.6 简单总结
Observable(被观察者)和Observer(观察者)建立连接(订阅)之后,会创建出一个发射器CreateEmitter,发射器会把被观察者中产生的事件发送到观察者中去,观察者对发射器中发出的事件做出响应处理。
可以看到,是订阅之后,Observable(被观察者)才会开始发送事件。

3.2.7 时序流程图
再来看下订阅过程的时序流程图:

4 切断消息
4.1 切断消息
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("文章1");
emitter.onNext("文章2");
emitter.onNext("文章3");
emitter.onComplete();
}
});
Observer<String> observer = new Observer<String>() {
private Disposable mDisposable;
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe : " + d);
mDisposable=d;
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext : " + s);
mDisposable.dispose();
Log.d(TAG, "切断观察者与被观察者的连接");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError : " + e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
observable.subscribe(observer);
输出结果为
onSubscribe : null
onNext : 文章1
切断观察者与被观察者的连接
可以看到,要切断消息的传递很简单,调用下Disposable的dispose()方法即可。调用dispose()之后,被观察者虽然能继续发送消息,但是观察者却收不到消息了
。
另外有一点需要注意,上面onSubscribe输出的Disposable值是"null",并不是空引用null。
4.2 切断消息源码分析
Disposable是一个接口,可以理解Disposable为一个连接器,调用dispose()后,这个连接器将会中断。
其具体实现在CreateEmitter类.
4.2.1 CreateEmitter的dispose()
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
就是调用DisposableHelper.dispose(this)而已。
4.2.2 DisposableHelper类
public enum DisposableHelper implements Disposable {
DISPOSED
;
//其他代码省略
public static boolean isDisposed(Disposable d) {
//判断Disposable类型的变量的引用是否等于DISPOSED
//即判断该连接器是否被中断
return d == DISPOSED;
}
public static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get();
Disposable d = DISPOSED;
if (current != d) {
//这里会把field给设为DISPOSED
current = field.getAndSet(d);
if (current != d) {
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}
}
可以看到DisposableHelper是一个枚举类,并且只有一个值:DISPOSED。dispose()方法中会把一个原子引用field设为DISPOSED,即标记为中断状态。
因此后面通过isDisposed()方法即可以判断连接器是否被中断。
4.2.3 CreateEmitter类中的方法
@Override
public void onNext(T t) {
//省略无关代码
if (!isDisposed()) {
//如果没有dispose(),才会调用onNext()
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
//如果dispose()了,会调用到这里,即最终会崩溃
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
//省略无关代码
if (!isDisposed()) {
try {
//如果没有dispose(),才会调用onError()
observer.onError(t);
} finally {
//onError()之后会dispose()
dispose();
}
//如果没有dispose(),返回true
return true;
}
//如果dispose()了,返回false
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
//如果没有dispose(),才会调用onComplete()
observer.onComplete();
} finally {
//onComplete()之后会dispose()
dispose();
}
}
}
-
如果没有dispose,observer.onNext()才会被调用到。
-
onError()和onComplete()互斥,只能其中一个被调用到,因为调用了他们的任意一个之后都会调用dispose()。
-
先onError()后onComplete(),onComplete()不会被调用到。反过来,则会崩溃,因为onError()中抛出了异常:RxJavaPlugins.onError(t)。实际上是dispose后继续调用onError()都会炸。
网友评论