之前在学习Rxjava时,写demo。发现在FlatMap运算中,即使上游发送了onComplete,在下游也无法执行onComplete:
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
// Log.i(TAG, "!!!!onNext 1 before");
// e.onNext(1);
// Log.i(TAG, "!!!!onNext 1 after");
Log.i(TAG, "!!!!onNext 2 before");
e.onNext(2);
Log.i(TAG, "!!!!onNext 2 after");
// 位置1:如果注释以下代码,在位置10出将无法触发
Log.i(TAG, "!!!!onComplete before");
e.onComplete();
Log.i(TAG, "!!!!onComplete after");
}
})
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(final Integer integer) throws Exception {
Log.i(TAG, "@@@@flatMap " + integer);
Observable<String> ob = Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(final ObservableEmitter<String> e) throws Exception {
String value = "####flatMap: Observable2,create: " + integer;
Log.i(TAG,value);
Log.i(TAG, "####flatMap: Observable2, next " + integer + " before");
e.onNext(value);
Log.i(TAG, "####flatMap: Observable2, next " + integer + " after");
if(integer == 2){
// 位置2:如果注释以下代码,在位置10出将无法触发
// Log.i(TAG, "####flatMap: Observable2, onComplete 2 before");
// e.onComplete();
// Log.i(TAG, "####flatMap: Observable2, onComplete 2 after");
}
}
});
return ob;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i(TAG, "XXXXsubscribe:" + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.i(TAG, "TTTTerror");
}
}, new Action() {
@Override
public void run() throws Exception {
//位置10:触发onComplete .如果注释位置1和位置2中任何一处,都将无法触发这里
Log.i(TAG, "VVVVcomplete");
}
}
);
Log.i(TAG, "VVVVcomplete"); 没有执行
为什么???
分析Flatmap运算符源码,主要是ObservableFlatMap 这个类:
关键类:MergeObserver,InnerObserver
关键方法:mergeObserver.onNext(),merge.onComplete(),mergeObserver.drain(),merge.insertInnser(),merge.removeInner();
innerObserver.onComplete(),
mergeObserver.drain()关键代码:
-
当innerObserver中触发了onComplete,将移除innerObserver
QQ图片20171230235242.png -
当mergeObserver.onComplete或onError被触发,done置为true,并且没有innerObserver时,才能触发child.onComplete()
QQ图片20171230235250.png
网友评论