前言
subscribe()是将被观察者(Observable)和观察者(Observer)连接起来的桥梁
作为开篇我们首先解决三个问题:
- 被观察者如何发送数据
- 观察者如何接收数据
一、最简单的subscribe()调用
先从简单的开始,本篇不涉及线程切换
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Hello world!");
emitter.onComplete();
emitter.onError(new Throwable("error"));
emitter.onNext("next");
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "--onSubscribe");
}
@Override
public void onNext(String str) {
Log.d(TAG, "--onNext: str = " + str);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "--onError");
}
@Override
public void onComplete() {
Log.d(TAG, "--onComplete:");
}
});
//发射数字1,日志打印
onSubscribe
onNext: str = Hello world!
onComplete
大家都知道Java的代码是一行一行执行的,所以我们首先看just操作符做了哪些事情
Observable.create(ObservableOnSubscribe<T> source)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
//判断传入的参数是否为null,为null则抛出NullPointerException异常
ObjectHelper.requireNonNull(source, "The source is null");
//onAssembly是RxJavaPlugins的钩子函数,会出现很多次和类似的方法,主要是给开发者用于扩展的方法,有必要说一下
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(item));
}
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
//onObservableAssembly是RxJavaPlugins的静态全局对象,默认为null,需求外部手动设置
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
//假如我们在外部设置onObservableAssembly,在这里就启了一个过滤转化的功能,具体看setOnObservableAssembly()
if (f != null) {
return apply(f, source);
}
//所以在没有手动设置的前提下,这个钩子函数,传进来什么就返回什么
return source;
}
public static void setOnObservableAssembly(@Nullable Function<? super Observable, ? extends Observable> onObservableAssembly) {
if (lockdown) {
throw new IllegalStateException("Plugins can't be changed anymore");
}
//onObservableAssembly其实就是Function接口的实例,将我们传入的Observable对象,进行转化过滤等操作,方便我们进行扩展
RxJavaPlugins.onObservableAssembly = onObservableAssembly;
}
//具体使用
RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
@Override
public Observable apply(Observable observable) throws Exception {
//对全局使用的ObservableJust对象,转化成Observable.empty()返回;
if (observable instanceof ObservableJust){
return Observable.empty();
}
//否则直接返回
return observable;
}
});
create操作符其实就是返回了一个ObservableCreate对象,继承至Observable类
public final class ObservableCreate<T> extends Observable<T>{
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
//这是我们创建的匿名内部类
this.source = source;
}
}
接着看本文的猪脚:subscribe(Observer<? super T> observer)
//我们在外部传入了observer实例
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
//onSubscribe也是钩子函数,与onAssembly类似,这里返回的就是observer本身
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "...");
//这是真正的抽象订阅方法,继承Observable的子类必须覆写此方法,所以这里调用的是ObservableCreate类里的subscribeActual
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
...
}
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
//CreateEmitter是ObservableCreate的静态内部类,
//继承AtomicReference<Disposable>类,泛型Disposable,确保数据的原子操作,后面的篇幅单独拎出来说
//实现ObservableEmitter<T>, Disposable接口
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//这里打印我们的第一行日志 ----onSubscribe
observer.onSubscribe(parent);
try {
//重点看这里
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
//Emitter是发射器的意思,看到这三个接口,是不是很激动,对应了Observer观察者接口的三个方法
//这就很容易联系到我们的问题上来,发射和接收
public interface ObservableEmitter<T> extends Emitter<T> {...}
public interface Emitter<T> {
void onNext(@NonNull T value);
void onError(@NonNull Throwable error);
void onComplete();
}
//还记得我们创建的ObservableOnSubscribe匿名内部类吗?
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
//发送数据
emitter.onNext("Hello world!");
//发射完毕
emitter.onComplete();
}
}
//ObservableEmitter类的onNext()
//这里发射数据
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
//observer是在构造函数中传入的,也就是我们创建的Observer匿名内部类
//这里调用观察者对象接收数据
//打印第二行日志 -------onNext: str = Hello world!
observer.onNext(t);
}
}
public void onComplete() {
//判断是否解除订阅 true 代表 已经解除订阅
if (!isDisposed()) {
try {
//通知观察者对象,数据发送完毕
//打印第三行日志 ---onComplete
observer.onComplete();
} finally {
//主动解除订阅,后续发射的数据,观察者都不会接收
//这也解释了我们的日志只打印了三行
dispose();
}
}
}
二、总结
- 通过just操作符创建ObservableCreate被观察者
- 创建观察者Observer实例
- ObservableCreate通过subscribe订阅Observer观察者
- ObservableCreate执行subscribeActual抽象方法
- CreateEmitter包装Observer观察者和需要发送的值
- ObservableEmitter调用onNext(),onComplete()完成数据发送
通过以上源码分析,我们很清楚的知道,数据源的发送,以及观察者接收数据的逻辑,但是单纯的只分析subscribe()方法,是很好理解,当分析一长串链式调用的时候,很容易看着看着就摸不到驴屁股了,我觉得最好的理解方法是先熟悉单个操作符的作用,了解执行内容,最后再将知识点串起来,会轻松一点,消化消化继续第二篇~
网友评论