1 上游怎么传递onComplete到下游
2 上游怎么传递onError到下游
3 它俩是互斥的吗
4 多次调用的结果是什么
5 onNext onSubscribe顺便分析一下
示例代码
Observable.create(new ObservableOnSubscribe<String>() {
public void subscribe(@NonNull ObservableEmitter<String> emitter) {
emitter.onNext("msg");
String data = null;
data.length();
// emitter.onComplete();
// emitter.onError(new IllegalArgumentException("1"));
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG, s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "" + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "complete");
}
});
先具体分析一下流程
Observable.create.subscribe链式调用
Observable.create返回一个Observable,实现是ObservableCreate
Observable.subcribe如下,会触发subcribeActual方法
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
subscribeActual是抽象的方法,所以针对操作符create需要看ObservableCreate的
subscribeActual
分析ObservableCreate的subscribeActual
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
先调用观察者onSubscribe,然后调用source的subscribe,也就是如下例子中new ObservableOnSubscribe,接着触发emitter.onNext,接着触发观察者的onNext,这也是上游的subscribe怎么通过onNext触发到下游的onNext
梳理调用流程
1 ObservableCreate.subcribe
2 Observable.subscibeActual
3 ObservableOnSubscribe.subscribe
4 CreateEmitter.onNext
5 observer.onNext(t);
所以CreateEmitter就把上游和下游关联起来,
当上游调用subcribeActual就会出发CreateEmitter然后调用下游onNext
Observable.create(new ObservableOnSubscribe<String>() {
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("msg");
public void onNext(T t) {
if (!isDisposed()) {
observer.onNext(t);
}
}
并且此操作会捕捉异常,如果出现异常交给parent.onError(ex),而onError方法触发tryOnError方法
public boolean tryOnError(Throwable t) {
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
这是程序运行抛出异常,也就是调用显式的onError:
如果第二次onError,就会判断当前订阅关系解除,然后交给RxJavaPlugins.onError(t);如果设置RxJava统一异常捕捉则交给此处理否则
直接抛出异常;一般为闪退;
subscribe(Consumer)分析
内部转成观察者Observer
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
在onNext触发accept方法
@Override
public void onNext(T t) {
if (!isDisposed()) {
try {
onNext.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
get().dispose();
onError(e);
}
}
}
onComplete调用也是判断没有解除订阅就finally解除订阅
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
所以调用一次complete就会解除订阅,多次complete就会失效,因为连接已经解除关系了。
onNext
onNext只是判断有没有解除上下游关系,并没有finally里面解除关联,所以可以多次调用
public void onNext(T t) {
if (!isDisposed()) {
observer.onNext(t);
}
}
onSubcribe
这个观察者触发条件只是subcribe触发,开发者无法调用CreateEmitter也没有onSubscribe方法
总结:onError调用一次则解除上下游关联,如果subscribe内部出现异常会自动捕捉传给下层观察者的onError;由于调用一次就会解除订阅,第二次调用交给默认Rxjava处理,默认抛出异常出现闪退
onNext则不会自动解除订阅
onSubcribe内部触发
onComplete也是第一次触发就不会
先onError后onComplete则由于前一次解除订阅则后续onComplete判断订阅解除也就不再触发
先onComplete后onError则前一次解除订阅,后续再onError默认会闪退
网友评论