美文网首页
Rxjava:flatmap源码分析:onComplete没有运

Rxjava:flatmap源码分析:onComplete没有运

作者: 沈杰3 | 来源:发表于2017-12-31 00:16 被阅读0次

    之前在学习Rxjava时,写demo。发现在FlatMap运算中,即使上游发送了onComplete,在下游也无法执行onComplete:

    
            Observable
                    .create(new ObservableOnSubscribe<Integer>() {
                        @Override
                        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
    //                        Log.i(TAG, "!!!!onNext 1 before");
    //                        e.onNext(1);
    //                        Log.i(TAG, "!!!!onNext 1 after");
    
                            Log.i(TAG, "!!!!onNext 2 before");
                            e.onNext(2);
                            Log.i(TAG, "!!!!onNext 2 after");
    
                            // 位置1:如果注释以下代码,在位置10出将无法触发
                            Log.i(TAG, "!!!!onComplete before");
                            e.onComplete();
                            Log.i(TAG, "!!!!onComplete after");
                        }
                    })
                    .flatMap(new Function<Integer, ObservableSource<String>>() {
                        @Override
                        public ObservableSource<String> apply(final Integer integer) throws Exception {
                            Log.i(TAG, "@@@@flatMap " + integer);
                            Observable<String> ob = Observable
                                    .create(new ObservableOnSubscribe<String>() {
                                        @Override
                                        public void subscribe(final ObservableEmitter<String> e) throws Exception {
                                            String value = "####flatMap: Observable2,create: " + integer;
                                            Log.i(TAG,value);
                                            Log.i(TAG, "####flatMap: Observable2, next " + integer + " before");
                                            e.onNext(value);
                                            Log.i(TAG, "####flatMap: Observable2, next " + integer + " after");
    
                                            if(integer == 2){
                                                // 位置2:如果注释以下代码,在位置10出将无法触发
    //                                            Log.i(TAG, "####flatMap: Observable2, onComplete 2 before");
    //                                            e.onComplete();
    //                                            Log.i(TAG, "####flatMap: Observable2, onComplete  2 after");
                                            }
                                        }
                                    });
                            return ob;
                        }
                    })
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) throws Exception {
                            Log.i(TAG, "XXXXsubscribe:" + s);
                        }
                    }, new Consumer<Throwable>() {
                        @Override
                        public void accept(Throwable throwable) throws Exception {
                            Log.i(TAG, "TTTTerror");
                        }
                    }, new Action() {
                        @Override
                        public void run() throws Exception {
                            //位置10:触发onComplete .如果注释位置1和位置2中任何一处,都将无法触发这里
                            Log.i(TAG, "VVVVcomplete");
                        }
                    }
                    );
    

    Log.i(TAG, "VVVVcomplete"); 没有执行

    为什么???

    分析Flatmap运算符源码,主要是ObservableFlatMap 这个类:

    关键类:MergeObserver,InnerObserver
    关键方法:mergeObserver.onNext(),merge.onComplete(),mergeObserver.drain(),merge.insertInnser(),merge.removeInner();
    innerObserver.onComplete(),

    mergeObserver.drain()关键代码:
    • 当innerObserver中触发了onComplete,将移除innerObserver


      QQ图片20171230235242.png
    • 当mergeObserver.onComplete或onError被触发,done置为true,并且没有innerObserver时,才能触发child.onComplete()


      QQ图片20171230235250.png
    innserObserver.onComplete(),onError()
    QQ图片20171230235254.png

    相关文章

      网友评论

          本文标题:Rxjava:flatmap源码分析:onComplete没有运

          本文链接:https://www.haomeiwen.com/subject/krfogxtx.html