首先要明确RxJava的订阅流程 使用方式 = 基于事件流的链式调用
步骤1:创建被观察者(Observable)& 定义需发送的事件
步骤2:创建观察者(Observer) & 定义响应事件的行为
步骤3:通过订阅(subscribe)连接观察者和被观察者
分析的实例代码如下:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
1、先分析Observable如何创建
点create方法进去看到new ObservableCreate并且返回
ObservableCreate是Observable的子类,子类复写了重要的subscribeActual方法
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
2、分析Observable什么时候发送事件
上面ObservableCreate看出只有调subscribeActual方法才会执行source.subscribe方法
所以在没订阅之前subscribe里面方法只是定义
3、分析Observer如何创建
里面就通过Observer接口来生成,接口内含4个方法,分别用于响应对应于被观察者发送的不同事件
4、分析subscribe订阅做了什么
我们来看Observable中的subscribe方法
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
看到没,主要是调了Observable的subscribeActual(observer)的方法,ObservableCreate是Observable的子类,自然就会调到ObservableCreate复写的subscribeActual,这时就会调ObservableOnSubscribe的subscribe方法,就开始发送事件emitter.onNext(1)。就开始发送事件了
5、分析下emitter.onNext(1)做了什么
我们点emitter.onNext进去啥也没有,因为它是接口,对于接口类型的,我们都应该找它们的实现的子类
我们看下ObservableEmitter,结果它也是接口,继续找实现的子类
还是得点create方法一步步找在ObservableCreate找到CreateEmitter实现的子类
顺便重点分析下subscribeActual方法
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
分析发现
- CreateEmitter实现了ObservableEmitter和Disposable接口
- CreateEmitter<T> parent = new CreateEmitter<T>(observer) 即parent是实现了ObservableEmitter,Disposable的对象,并且封装了observer。可以简单认为它具有这三者的功能
- observer.onSubscribe(parent),这里的parent就是封装后的Disposable。也就会调Observer的onSubscribe方法
- source.subscribe(parent),开始调Observable的subscribe,并传入了带有observer的CreateEmitter
这时可以看CreateEmitter的onNext方法,也就是emitter.onNext(1)做了什么
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
惊讶的发现,里面其实就是调了observer.onNext,这样就把事件发送到了observer,observer的onNext也就调用了
还有看到没有observer.onNext(t)调之前都有!isDisposed()判断,这也是为什么Disposable可以切断Observable和Observer之间的连接
但Observable还是会继续发事件,只是Observer不接收了的缘故
分析完挺愉悦的,喜欢就点个赞!!!
网友评论