Observable创建和订阅流程
这里的创建和订阅不考虑操作符操作。
Observable<String> observable = Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("hello");
}
});
查看Observable的create方法:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
返回 ObservableCreate对象,ObservableCreate的构造函数的参数为create方法传递的参数,参数类型为ObservableOnSubscribe。ObservableCreate保存该参数为source。
ObservableCreate的构造方法如下:
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
这样observable对象实际上就是一个ObservableCreate对象。
subscribe方法调用如下:
observable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
comDisposable.add(d);
}
@Override
public void onNext(String s) {
Log.i("ThreadActivity", s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
subscribe方法的参数是一个Observer。执行了Observable的subscribe方法,subscribe方法代码如下:
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
真正起作用的是subscribeActual方法,由于Observable实际上是ObservableCreate对象,因此subscribeActual也就是调用ObservableCreate的subscribeActual方法。subscribeActual方法如下:
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
该方法中首先以observer为参数生成CreateEmitter对象,再调用obsever的onSubscribe方法,然后调用source的subscribe方法,而该source为Observable的create方法的参数。因此 source.subscribe(parent);实际上执行的是创建时参数的subscribe代码:
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("hello");
}
CreateEmitter的onNext方法定义如下:
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
实际上调用了observer的onNext方法,这里的observer就是Observable的subscribe方法传入的参数。
subscribeOn,observeOn方法影响
subscribeOn() : 影响的是最开始的被观察者(第一个被观察者)所在的线程。当使用多个subscribeOn() 的时候,只有第一个 subscribeOn() 起作用;
observeOn() : 影响的是跟在后面的操作(指定观察者运行的线程)。所以如果想要多次改变线程,可以多次使用 observeOn;
网友评论