前言:
本文所有代码基于RxJava 2.2.3和RxAndroid 2.1.0,并使用kotlin来介绍RxJava中常用的操作符的用法及效果。
第一步:创建被观察者Observable。
1、 create其实就是Observable里的一个静态方法用于创建一个Observable实例。在下面这段代码中我们主动创建了两个对象实例,一个是由Observable.create方法返回的Observable实例,一个是传入create方法的参数ObservableOnSubscribe匿名对象并覆写了定义事件发送顺序的方法 ObservableOnSubscribe.subscribe。
//第一步
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
Log.e("Create","ObservableOnSubscribe对象的subscribe方法被调用");
emitter.onNext("数据1");
emitter.onNext("数据2");
emitter.onNext("数据3");
emitter.onNext("数据4");
emitter.onError(new Exception());
emitter.onComplete();
}
});
2、Observable的create方法,该方法在这种情况下实际上返回的就是新new的 ObservableCreate对象,并且会将ObservableOnSubscribe对象赋值给其内部的成员变量source
//Observable的create方法
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
//ObservableCreate类的成员变量及其构造函数 create传入的参数ObservableOnSubscribe对象最终赋值给了source
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
//RxJavaPlugins.onAssembly方法
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {//此时 f == null 所以会返回source即传入的参数不会有其他操作
return apply(f, source);
}
return source;
}
第一步结论:创建了一个ObservableCreate对象,并将决定订阅时事件调动顺序的ObservableOnSubscribe对象赋值给其成员变量source。
第二步:创建观察者Observer
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("Create","Observer的onSubscribe方法");
}
@Override
public void onNext(String s) {
Log.e("Create","Observer的onNext方法 收到的参数为:"+s);
}
@Override
public void onError(Throwable e) {
Log.e("Create","Observer的onError方法");
}
@Override
public void onComplete() {
Log.e("Create","Observer的onComplete方法");
}
};
第二步结论:第二步就是单纯的创建了一个观察者,并覆写如上所示的四个方法。
第三步:让被观察者(Observable)和观察者(Observer)之间产生联系
//观察者订阅被观察者 两者关系发生
observable.subscribe(observer);
1、订阅之后立即调用被观察者(observable)的subscribe方法。该方法中调用了抽象方法subscribeActual(observer);并将观察者(observer)作为参数传入传入。
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned
a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe
for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
//核心代码是这句这个方法在Observable类中是个抽象方法第一步创建的ObservableCreate对象,覆写了该方法,该方法的参数为
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
2、当这个创建的Observable对象调用subscribe方法时,subscribe方法会调用ObservableCreate类中覆写的subscrubeActual方法,subscrubeActual的代码如下:
protected void subscribeActual(Observer<? super T> observer) {
//将订阅的Observer对象进行封装到CreateEmitter中,作为source调用subscribe方法时的参数
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
//source即为调用create操作符时传入的用于定义调用顺序的ObservableOnSubscribe对象,在此处调用了其subscribe方法
//传入的参数是上面封装了的CreateEmitter对象,所以我们在外面定义的调用顺序就是调用的CreateEmitter的方法如onNext,
//onError,onComplete等方法
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
3、再看看CreateEmitter中对onNext,onError,onComplete的封装。三个方法都调用了观察者(observer)的相应方法,但是在调用前都对其进行了处理,判断其是否被取消。若调用了onError或者onComplete中的任意一个方法时,就会被取消,即用dispose()方法,这时 !isDisposed())的值就为false了。所以后续的观察者的其他方法都不可调用了。
//observer对象即为我们在第二步创建的观察者(observer)
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally
not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not
allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
第三步总结:被观察者(observable)调用subscribe方法,此方法最终将观察者(observer)对象封装成Emitter并作为参数,调用ObservableCreate中source即第一步创建时传入的参数的subscribe方法。即实现了调用第一步时定义的调用顺序。而Emitter的封装使onError及onComplete的调用做了限制,保证了onError和onComplete方法只能调用其中一个。
最后总结:
上述一系列操作,可归结为以下伪代码:
//规定调用顺序
ObservableOnSubscribe<String> source = new ObservableOnSubscribe<String>(){
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
Log.e("Create","ObservableOnSubscribe对象的subscribe方法被调用");
emitter.onNext("数据1");
emitter.onNext("数据2");
emitter.onNext("数据3");
emitter.onNext("数据4");
emitter.onError(new Exception());
emitter.onComplete();
}
};
//创建被观察者
Observable<String> observable = new ObservableCreate<>(source);
observable.source = source;
//以上为第一步
//-------------------------------------------------------------------------
Observer<String> observer = new Observer<String>();
//创建观察者这是第二步
//---------------------------------------------------------------------------
//调用observable.subscribe(observer)
Emitter<String> emitter = new EmitterCreate(observer);
observable.source.subscribe(emitter);
//第三步订阅
//-----------------------------------------------------------------------------
网友评论