美文网首页android开发杂识MobDevGroupRxjava
【译】对RxJava中.repeatWhen()和.retryW

【译】对RxJava中.repeatWhen()和.retryW

作者: 小鄧子 | 来源:发表于2016-02-06 11:26 被阅读11295次

    第一次见到.repeatWhen().retryWhen()这两个操作符的时候就非常困惑了。不得不说,它们绝对是“最令人困惑弹珠图”的有力角逐者。

    然而它们都是非常有用的操作符:允许你有条件的重新订阅已经结束的Observable。我最近研究了它们的工作原理,现在我希望尝试着去解释它们(因为,我也是耗费了一些精力才参透它们)。

    Repeat与Retry的对比

    首先,来了解一下.repeat().retry()之间最直观的区别是什么?这个问题并不难:区别就在于什么样的终止事件会触发重订阅。

    .repeat()接收到.onCompleted()事件后触发重订阅。

    .retry()接收到.onError()事件后触发重订阅。

    然而,这种简单的叙述尚不能令人满意。试想如果你要实现一个延迟数秒的重订阅该如何去做?或者想通过观察错误来决定是否应该重订阅呢?这种情况下就需要.repeatWhen().retryWhen()的介入了,因为它们允许你为重试提供自定义逻辑。

    Notification Handler

    你可以通过一个叫做notificationHandler的函数来实现重试逻辑。这是.retryWhen()的方法签名(译者注:方法签名,指方法名称、参数类型和参数数量等):

    retryWhen(Func1<? super Observable<? extends java.lang.Throwable>,? extends Observable<?>> notificationHandler) 
    

    签名很长,甚至不能一口气读完。我发现它很难理解的原因是因为存在一大堆的泛型约定。

    简化后,它包括三个部分:

    1. Func1像个工厂类,用来实现你自己的重试逻辑。
    2. 输入的是一个Observable<Throwable>
    3. 输出的是一个Observable<?>

    首先,让我们来看一下最后一部分。被返回的Observable<?>所要发送的事件决定了重订阅是否会发生。如果发送的是onCompleted或者onError事件,将不会触发重订阅。相对的,如果它发送onNext事件,则触发重订阅(不管onNext实际上是什么事件)。这就是为什么使用了通配符作为泛型类型:这仅仅是个通知(next, error或者completed),一个很重要的通知而已。

    source每次一调用onError(Throwable)Observable<Throwable>都会被作为输入传入方法中。换句话说就是,它的每一次调用你都需要决定是否需要重订阅。

    当订阅发生的时候,工厂Func1被调用,从而准备重试逻辑。那样的话,当onError被调用后,你已经定义的重试逻辑就能够处理它了。

    这里有个例子展示了我们应该在哪些场景下订阅source,比如,只有在ThrowableIOException的情况下请求重订阅,否则不(重订阅)。

    source.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
              @Override public Observable<?> call(Observable<? extends Throwable> errors) {
    
                return errors.flatMap(new Func1<Throwable, Observable<?>>() {
                  @Override public Observable<?> call(Throwable error) {
    
                    // For IOExceptions, we  retry
                    if (error instanceof IOException) {
                      return Observable.just(null);
                    }
    
                    // For anything else, don't retry
                    return Observable.error(error);
                  }
                });
              }  
            })
    

    由于每一个error都被flatmap过,因此我们不能通过直接调用.onNext(null)触发重订阅或者.onError(error)来避免重订阅。

    经验之谈

    这里有一些关于.repeatWhen().retryWhen()的要点,我们应该牢记于心。

    • .repeatWhen().retryWhen()非常相似,只不过不再响应onError作为重试条件,而是onCompleted。因为onCompleted没有类型,所有输入变为Observable<Void>

    • 每一次事件流的订阅notificationHandler(也就是Func1)只会调用一次。这也是讲得通的,因为你有一个可观测的Observable<Throwable>,它能够发送任意数量的error。

    • 输入的Observable必须作为输出Observable的源。你必须对Observable<Throwable>做出反应,然后基于它发送事件;你不能只返回一个通用泛型流。

    换言之就是,你不能做类似的操作:

     .retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
                  @Override public Observable<?> call(Observable<? extends Throwable> errors) {
    
                    return Observable.just(null);}
                })
    

    因为它不仅不能奏效,而且还会打断你的链式结构。你应该做的是,而且至少应该做的是,把输入作为结果返回,就像这样:

    .retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
                  @Override public Observable<?> call(Observable<? extends Throwable> errors) {
    
                    return errors;
                  }
                })
    

    (顺便提一下,这在逻辑上与单纯使用.retry()操作符的效果是一样哒)

    • 输入Observable只在终止事件发生的时候才会触发(对于.repeatWhen()来说是onCompleted,而对于.retryWhen()来说是onError)。它不会从源中接收到任何onNext的通知,所以你不能通过观察被发送的事件来决定重订阅。如果你真的需要这样做,你应该添加像.takeUntil()这样的操作符,来拦截事件流。

    使用方式

    现在,假设你已大概了解了.repeatWhen().retryWhen(),那么你能将一些什么样的精简逻辑放入到notificationHandler中呢?

    使用.repeatWhen() + .delay()定期轮询数据:

    source.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
                  @Override public Observable<?> call(Observable<? extends Void> completed) {
    
                    return completed.delay(5, TimeUnit.SECONDS);
                  }
                })
    

    直到notificationHandler发送onNext()才会重订阅到source。因为在发送onNext()之前delay了一段时间,所以优雅的实现了延迟重订阅,从而避免了不间断的数据轮询。

    非此即彼,使用.flatMap() + .timer()实现延迟重订阅:
    (译者注:在RxJava 1.0.0及其之后的版本,官方已不再提倡使用.timer()操作符,因为.interval()具有同样的功能)

    source.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
                  @Override public Observable<?> call(Observable<? extends Throwable> errors) {
    
                    return errors.flatMap(new Func1<Throwable, Observable<?>>() {
                      @Override public Observable<?> call(Throwable error) {
    
                        return Observable.timer(5, TimeUnit.SECONDS);
                      }
                    });
                  }
                })
    

    当需要与其他逻辑协同的时候,这种替代方案就变得非常有用了,比如。。。

    使用.zip() + .range()实现有限次数的重订阅

    source.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
                  @Override public Observable<?> call(Observable<? extends Throwable> errors) {
    
                    return errors.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Integer>() {
                      @Override public Integer call(Throwable throwable, Integer i) {
    
                        return i;
                      }
                    });
                  }
                })
    
    

    最后的结果就是每个error都与range中一个输出配对出现,就像这样:

    zip(error1, 1) -> onNext(1)  <-- Resubscribe  
    zip(error2, 2) -> onNext(2)  <-- Resubscribe  
    zip(error3, 3) -> onNext(3)  <-- Resubscribe  
    onCompleted()                <-- No resubscription  
    

    因为当第四次error出现的时候,range(1,3)中的数字已经耗尽了,所以它隐式调用了onCompleted(),从而导致整个zip的结束。防止了进一步的重试。

    将可变延迟策略与次数限制的重试机制结合起来

    source.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
                  @Override public Observable<?> call(Observable<? extends Throwable> errors) {
    
                    return errors.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Integer>() {
                      @Override public Integer call(Throwable throwable, Integer i) {
    
                        return i;
                      }
                    }).flatMap(new Func1<Integer, Observable<? extends Long>>() {
                      @Override public Observable<? extends Long> call(Integer retryCount) {
                        
                        return Observable.timer((long) Math.pow(5, retryCount), TimeUnit.SECONDS);
                      }
                    });
                  }
                })
    

    在这种用例的比较上,我认为.flatMap()+.timer()的组合比单纯使用.delay()更可取,因为我们可以通过重试次数来修改延迟时间。重试三次,并且每一次的重试时间都是5 ^ retryCount,仅仅通过一些操作符的组合就帮助我们实现了指数退避算法(译者注:可参考二进制指数退避算法)。

    相关文章

      网友评论

      • leilifengxingmw:very nice:smile:
      • IT枫:mark
      • LuckyTerry:在2.1.8中,Observable中retryWhen的方法文档已更新,不再使用range,而是使用takeWhile,我也是刚刚发现。
        小鄧子:感谢你的指出
      • LuckyTerry:楼主,不建议使用range操作符,某些情况下会出错,可见
        https://github.com/ReactiveX/RxJava/issues/5772
      • Dinos:博主,有一点没有看懂,希望能解释一下,就是,errors.zipWith(Observable.range(1, 3),..)使用zipWith的时候,第一个Obserable和第二Obserable的数量不是需要一样的吗?那么怎么确保异常的内容的数量和range是一样的呢?谢谢了
        LuckyTerry:接着上面回答,if (wip.getAndIncrement() == 0) 和if (isDisposed()) 都存在condition race,所以,如果重试不加延时(如timer、delay)并且使用range操作符,那么重试次数不会达到预期。range操作符发送完指定count的emissions时立刻就complete,会导致上一次(或多次)的retry检测isDisposed()时直接就返回了,达不到预期。也就是说尽量使用延迟complete的操作符来控制重试次数(或者自己定义suppressing逻辑)如takewhile等。
        错误 .retryWhen(throwableObservable ->
        throwableObservable.zipWith(Observable.range(1, 1), (throwable, integer) -> integer))

        可行 .retryWhen(throwableObservable -> {
        AtomicInteger counter = new AtomicInteger();
        return throwableObservable.takeWhile(e -> counter.getAndIncrement() < 1);
        })
        LuckyTerry:上一次回答错了,必须使用zipWith(),这样重试的发起时间才是收到异常后才开始。
        源码中有这么一段
        void subscribeNext() {
        if (wip.getAndIncrement() == 0) {

        do {
        if (isDisposed()) {
        return;
        }

        if (!active) {
        active = true;
        source.subscribe(this);
        }
        } while (wip.decrementAndGet() != 0);
        }
        }
        必须等收到错误后重试(即上面的active值为false)才能再次订阅。
        LuckyTerry:不需要一样,zipWith检测到其中一个onComplete就执行下游的onComplete了。
        不使用zip, 直接下面即可
        return Observable.range(1, 3).flatMap(retryCount ->
        Observable.timer((long) Math.pow(5, retryCount), TimeUnit.SECONDS)
        );

        return Observable.range(1, 3).delay(retryCount ->
        Observable.timer((long) Math.pow(5, retryCount), TimeUnit.SECONDS)
        );
      • 24K纯帅豆:LZ你好,我想问一下2.0版本中使用retryWhen如何判断错误的类型:
        /**
        * Returns an Observable that emits the same values as the source ObservableSource with the exception of an
        * {@code onComplete}. An {@code onComplete} notification from the source will result in the emission of
        * a {@code void} item to the ObservableSource provided as an argument to the {@code notificationHandler}
        * function. If that ObservableSource calls {@code onComplete} or {@code onError} then {@code repeatWhen} will
        * call {@code onComplete} or {@code onError} on the child subscription. Otherwise, this ObservableSource will
        * resubscribe to the source ObservableSource.
        * <p>
        * <img width="640" height="430" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/repeatWhen.f.png&quot; alt="">
        * <dl>
        * <dt><b>Scheduler:</b></dt>
        * <dd>{@code repeatWhen} does not operate by default on a particular {@link Scheduler}.</dd>
        * </dl>
        *
        * @param handler
        * receives an ObservableSource of notifications with which a user can complete or error, aborting the repeat.
        * @Return the source ObservableSource modified with repeat logic
        * @see <a href="http://reactivex.io/documentation/operators/repeat.html&quot;>ReactiveX operators documentation: Repeat</a>
        */
        @CheckReturnValue
        @SchedulerSupport(SchedulerSupport.NONE)
        public final Observable<T> repeatWhen(final Function<? super Observable<Object>, ? extends ObservableSource<?>> handler) {
        ObjectHelper.requireNonNull(handler, "handler is null");
        return RxJavaPlugins.onAssembly(new ObservableRepeatWhen<T>(this, handler));
        }
      • 5b7d74b3c1ba:博主的这句话 ---> 当.repeat()接收到.onCompleted()事件后触发重订阅, 意思是说 每重复submit 1 次都会走onCompleted()方法。 但是实际情况却是
        onNext-->2
        onNext-->3
        onNext-->4

        onNext-->2
        onNext-->3
        onNext-->4

        onNext-->2
        onNext-->3
        onNext-->4

        onCompleted-->

        Process finished with exit code 0
        5b7d74b3c1ba:还是我理解错了?
        :flushed:
      • xyzsyy0102:好东西,找了好久,谢谢博主
      • newstrong:看代码 感觉不对呢 代码逻辑是只要收到错误就重订阅三次 哪怕中间正确了
      • 叨码:首先谢谢你的译文。想请教一个问题:retryWhen()的时候,能修改默认请求超时时间的设置么?比如正常请求是10秒,retryWhen的时候是3秒。如果能修改怎么修改,如果不能,有什么好的方法么。谢谢。
      • 7faa5b5c2ee5: /**
        * 统一返回结果处理
        *
        * @param <T>
        * @Return
        */
        public static <T extends MyHttpResponse> Observable.Transformer<T, T> applyRetry(final RetrofitHelper apis) {
        return new Observable.Transformer<T, T>() {
        @Override
        public Observable<T> call(Observable<T> tObservable) {

        return tObservable.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {

        @Override
        public Observable<?> call( Observable<? extends Throwable> errors) {
        return errors.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Integer>() {
        @Override public Integer call(Throwable throwable, Integer i) {
        @@@@@** :flushed: 这里不执行怎么回事* :flushed: *@@@@@

        LogUtil.d("请求出现错误!重试code=" );

        if (throwable instanceof ApiException){
        if (((ApiException) throwable).isSessionExpried()){


        apis.getApi().login(model.getLoginParams(host, account, password))
        .doOnNext(new Action1<LoginBean>() {
        @Override
        public void call(LoginBean loginBean) {
        LogUtil.d("重新登录成功!");
        App.setLoginBean(loginBean);
        }
        7faa5b5c2ee5:@永恒的星 errors.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Integer>() 这个地方不执行怎么回事????谁知道原因
      • a1dbebe3b27c:小橙子,问下如果repeatWhen+delay使用轮询想要中途暂停的话,有什么好的建议没?
      • SoloHo:如果用interval替换了timer,上面效果无法实现的。实测
      • Jackoder:博主的文章写得很不错,实践之后发现一点可以改进的地方。就是在重试耗尽之后,提供失败的回调。代码如下:
        return errors.zipWith(Observable.range(1, 3+1), new Func2<Throwable, Integer, Integer>() {
        @Override public Integer call(Throwable throwable, Integer i) {
        return i;
        }
        }).flatMap(new Func1<Integer, Observable<? extends Long>>() {
        @Override public Observable<? extends Long> call(Integer retryCount) {
        if (retryCount == 3+1) { //重试耗尽,提示失败
        Observable.error(null);
        } else {
        return Observable.timer((long) Math.pow(5, retryCount), TimeUnit.SECONDS);
        }
        }
        });
      • 遇见67:请教一下是否可以根据onNext的参数来决定是否重复。举个例子就是使用retrofit + rxjava从网络获取数据列表,接口规定每次最多请求十条,但是我想一次将数据都请求过来,就需要判断服务器返回数量是否是10条,如果是10条,就继续请求,如果少于10条,说明已经请求完毕,这种情况可以使用repeatwhen吗?但是我看repeatwhen是接收到onCompleted事件才触发的
        李泽1988:@遇见67 谢谢!我用retryWhen操作符解决了.
        遇见67:@4cbee389ca38 我使用了repeat操作符和takeWhile操作符组合来解决的。
        李泽1988:@遇见67 你找到解决方案了吗?
      • be5d965156e4:问下问什么设置的重试次数无效
        小鄧子:@TryCatch 能看下你的实现逻辑吗
      • __Berial___:经验之谈里的第一条关于repeatWhen()和retryWhen()貌似说反了
        冰冰的冻结:文章里 现在是正确的吗? 有点不解啊
        __Berial___:@小鄧子 :smirk:
        小鄧子:@__Berial___ 呀吼,这可是要命的错误,这都怪我,非常感谢你能指出。Thanks
      • 皮球二二:你好,那个指数的地方我没看明白,这样不是每次重连等待时间变得超长了?
      • 耗子点灯:写得漂亮,看2次终于看懂了😁
        小鄧子:@Hideeee 能看懂就好:kissing_heart:
        6db2cd5e23ce:@耗子点灯 隔了几天回头再看这篇文章,果然能看懂了。。
      • 6db2cd5e23ce:辛苦了。前面还能看懂,后面有点懵了…… RxJava不自己实践只看文章果然不知道在说什么😂
        30035123f1bc:@Hideeee 不知道你自己实践的话怎么实践?不知道有啥好的RxJava练手项目吗?

      本文标题:【译】对RxJava中.repeatWhen()和.retryW

      本文链接:https://www.haomeiwen.com/subject/jgvekttx.html