RxJava2 链式调用方案比较

作者: 凌峰 | 来源:发表于2017-04-25 15:30 被阅读809次

    目标场景:

    熟悉Promise的朋友应该知道Promise中我们可以通过next进行链式调用。在调用的任何一个环节如果出现错误可以直接发射error信号终止链式调用。RxJava的onNext的发射逻辑和Promise有一些不同,并且一个订阅事件不能再继续执行后续的异步调用。在某一些场景下,我们希望能够像Promise一样通过一系列的链式异步调用完成一些系统功能的流程。例如,如果目前有N个任务,我们希望能够对这N个任务进行顺序执行,上一个任务如果正常执行,则继续执行下一个任务,否则结束链式调用。

    实现方案:

    • 采用单个Flowable循环调用的方式。
    Flowable.just(List<T> list)
    .map(new Function<List<T>, O>(){
        @Override
        public O apply(List<T> list) throws Exception{
            for (int m = 0; m < list.size(); m ++){
                result = t.handle(); //伪方法示例
                if (result!=SUCCESS){
                     throw new Exception(); //结束链式调用
                }
             }
            return new O(); //返回一个结束消息
          }
        })
    .subscribe(new Subscriber(){
    ...
        @Override
        void onNext(){
            //这里处理返回结果
        }
    
        @Override
        void onError(){
            //这里处理错误
        }
    });
    

    这种方案,怎么说呢,感觉和异步的结构不太一致,毕竟是采用循环的方式嵌套在一个map操作符里。

    • 采用fromIterable

    RxJava2 将RxJava1中的from拆解为若干个fromXXX操作符,对于容器,我们可以采用fromIterable。这个操作符是顺序执行每次遍历出来的内容,因此可以满足链式调用的方法。

    Flowable.fromIterable(List<T> list)              
                  .map(new Function<T, O>(){
                        @Override
                        public O apply(T t){
                           result = t.handle(); //伪方法示例
                           if (result!=SUCCESS){
                               throw new Exception(); //结束链式调用
                           }
                         }
                  })
                  .subscribe(...);           
    

    这种方式比上一种在结构上更加清晰。我们也可以把map操作符替换成flatmap操作符去实现嵌套形式的链式调用。

    但是,方案1和2有一个共同的问题,则是所有的任务必须在执行前全部写入一个List。我们希望能够更加灵活的添加链式任务,这时候我们可以直接使用连续的map操作符。

    • 采用连续map
    Flowable.just(T t)     
                  //任务1         
                  .map(new Function<T, T>(){
                        @Override
                        public T apply(T t){
                           result = t.handle(); //伪方法示例
                           if (result!=SUCCESS){
                               throw new Exception(); //结束链式调用
                           }
                         }
                  })
                  //任务2         
                  .map(new Function<T, T>(){
                        @Override
                        public T apply(T t){
                           result = t.handle(); //伪方法示例
                           if (result!=SUCCESS){
                               throw new Exception(); //结束链式调用
                           }
                         }
                  })
                  ...
                  .subscribe(...);           
    
    • 更复杂的场景

    如果每个链式任务执行成功后要执行一些不同的额外操作,一个subsriber订阅会有一些问题,RxJava2中提供了若干个doOnXXX操作符(doOnNext,doOnError,doOnComplete),原理上可以解决这个问题。如果采用fromIterable传入链式任务,我们需要使用flatmap为每个调用单独配置doOnXXX。

    • 首先,把任务和额外的操作封装成一个Bundle
    interface RxTask{
        int task(Object o);
    }
    class RxBundle{
        RxTask task;
        RxTask sideTask;    
    }
    
    • 然后,把任务通过List方式传入Flowable
    Flowable.fromIterable(List<RxBundle> list)              
                  .flatMap(new Function<RxBundle, Publisher<?>(){
                        @Override
                        public Publisher<?> apply(Bundle bundle){
                            final RxTask task = bundle.task;
                            final RxTask sideTask = bundle.sideTask
                            return Flowable.create(new FlowableOnSubscribe<Object>() {
                                @Override
                                public void subscribe(FlowableEmitter<Object> e) throws Exception {
                                    int result = task.task();
                                    if (result!=SUCCESS){
                                        throw new Exception(); //结束链式调用
                                    }
                                }
                            }, BackpressureStrategy.BUFFER))
                            .doOnNext(new Consumer(){
                                  @Override
                                  public void apply(Object o){
                                      sideTask.task(); //这里执行额外操作
                                  }
                            });                       
                  })
                  .subscribe(...);           
    

    这样我们最终的回调结果是发射到.subscribe()当中,并且每一个调用都有针对的处理。

    目前尚未验证的是当某一个任务时扔出Exception整条链是否会停止执行。欢迎有经验的同学补充。

    相关文章

      网友评论

        本文标题:RxJava2 链式调用方案比较

        本文链接:https://www.haomeiwen.com/subject/gapvzttx.html