- 原文链接: RxJava's repeatWhen and retryWhen, explained
- 原文作者: Daniel Lew
- 译文出自: 小鄧子的简书
- 译者: 小鄧子
- 状态: 完成
- 译者注:为了方便因Lambda(译文)还不够了解的同学进行阅读,本篇译文替换了原文中全部Lambda表达式。
第一次见到.repeatWhen()和.retryWhen()这两个操作符的时候就非常困惑了。不得不说,它们绝对是“最令人困惑弹珠图”的有力角逐者。
然而它们都是非常有用的操作符:允许你有条件的重新订阅已经结束的Observable
。我最近研究了它们的工作原理,现在我希望尝试着去解释它们(因为,我也是耗费了一些精力才参透它们)。
Repeat与Retry的对比
首先,来了解一下.repeat()和.retry()之间最直观的区别是什么?这个问题并不难:区别就在于什么样的终止事件会触发重订阅。
当.repeat()接收到.onCompleted()事件后触发重订阅。
当.retry()接收到.onError()事件后触发重订阅。
然而,这种简单的叙述尚不能令人满意。试想如果你要实现一个延迟数秒的重订阅该如何去做?或者想通过观察错误来决定是否应该重订阅呢?这种情况下就需要.repeatWhen()
和.retryWhen()
的介入了,因为它们允许你为重试提供自定义逻辑。
Notification Handler
你可以通过一个叫做notificationHandler
的函数来实现重试逻辑。这是.retryWhen()
的方法签名(译者注:方法签名,指方法名称、参数类型和参数数量等):
retryWhen(Func1<? super Observable<? extends java.lang.Throwable>,? extends Observable<?>> notificationHandler)
签名很长,甚至不能一口气读完。我发现它很难理解的原因是因为存在一大堆的泛型约定。
简化后,它包括三个部分:
-
Func1
像个工厂类,用来实现你自己的重试逻辑。 - 输入的是一个
Observable<Throwable>
。 - 输出的是一个
Observable<?>
。
首先,让我们来看一下最后一部分。被返回的Observable<?>
所要发送的事件决定了重订阅是否会发生。如果发送的是onCompleted
或者onError
事件,将不会触发重订阅。相对的,如果它发送onNext
事件,则触发重订阅(不管onNext
实际上是什么事件)。这就是为什么使用了通配符作为泛型类型:这仅仅是个通知(next, error或者completed),一个很重要的通知而已。
source每次一调用onError(Throwable)
,Observable<Throwable>
都会被作为输入传入方法中。换句话说就是,它的每一次调用你都需要决定是否需要重订阅。
当订阅发生的时候,工厂Func1
被调用,从而准备重试逻辑。那样的话,当onError
被调用后,你已经定义的重试逻辑就能够处理它了。
这里有个例子展示了我们应该在哪些场景下订阅source,比如,只有在Throwable
是IOException
的情况下请求重订阅,否则不(重订阅)。
source.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override public Observable<?> call(Observable<? extends Throwable> errors) {
return errors.flatMap(new Func1<Throwable, Observable<?>>() {
@Override public Observable<?> call(Throwable error) {
// For IOExceptions, we retry
if (error instanceof IOException) {
return Observable.just(null);
}
// For anything else, don't retry
return Observable.error(error);
}
});
}
})
由于每一个error都被flatmap过,因此我们不能通过直接调用.onNext(null)
触发重订阅或者.onError(error)
来避免重订阅。
经验之谈
这里有一些关于.repeatWhen()
和.retryWhen()
的要点,我们应该牢记于心。
-
.repeatWhen()
与.retryWhen()
非常相似,只不过不再响应onError
作为重试条件,而是onCompleted
。因为onCompleted
没有类型,所有输入变为Observable<Void>
。 -
每一次事件流的订阅
notificationHandler
(也就是Func1
)只会调用一次。这也是讲得通的,因为你有一个可观测的Observable<Throwable>
,它能够发送任意数量的error。 -
输入的
Observable
必须作为输出Observable
的源。你必须对Observable<Throwable>
做出反应,然后基于它发送事件;你不能只返回一个通用泛型流。
换言之就是,你不能做类似的操作:
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override public Observable<?> call(Observable<? extends Throwable> errors) {
return Observable.just(null);}
})
因为它不仅不能奏效,而且还会打断你的链式结构。你应该做的是,而且至少应该做的是,把输入作为结果返回,就像这样:
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override public Observable<?> call(Observable<? extends Throwable> errors) {
return errors;
}
})
(顺便提一下,这在逻辑上与单纯使用.retry()
操作符的效果是一样哒)
- 输入
Observable
只在终止事件发生的时候才会触发(对于.repeatWhen()
来说是onCompleted
,而对于.retryWhen()
来说是onError
)。它不会从源中接收到任何onNext
的通知,所以你不能通过观察被发送的事件来决定重订阅。如果你真的需要这样做,你应该添加像.takeUntil()
这样的操作符,来拦截事件流。
使用方式
现在,假设你已大概了解了.repeatWhen()
和.retryWhen()
,那么你能将一些什么样的精简逻辑放入到notificationHandler
中呢?
使用.repeatWhen()
+ .delay()
定期轮询数据:
source.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override public Observable<?> call(Observable<? extends Void> completed) {
return completed.delay(5, TimeUnit.SECONDS);
}
})
直到notificationHandler
发送onNext()
才会重订阅到source。因为在发送onNext()
之前delay
了一段时间,所以优雅的实现了延迟重订阅,从而避免了不间断的数据轮询。
非此即彼,使用.flatMap()
+ .timer()
实现延迟重订阅:
(译者注:在RxJava 1.0.0及其之后的版本,官方已不再提倡使用.timer()
操作符,因为.interval()
具有同样的功能)
source.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override public Observable<?> call(Observable<? extends Throwable> errors) {
return errors.flatMap(new Func1<Throwable, Observable<?>>() {
@Override public Observable<?> call(Throwable error) {
return Observable.timer(5, TimeUnit.SECONDS);
}
});
}
})
当需要与其他逻辑协同的时候,这种替代方案就变得非常有用了,比如。。。
使用.zip()
+ .range()
实现有限次数的重订阅
source.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override public Observable<?> call(Observable<? extends Throwable> errors) {
return errors.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Integer>() {
@Override public Integer call(Throwable throwable, Integer i) {
return i;
}
});
}
})
最后的结果就是每个error都与range
中一个输出配对出现,就像这样:
zip(error1, 1) -> onNext(1) <-- Resubscribe
zip(error2, 2) -> onNext(2) <-- Resubscribe
zip(error3, 3) -> onNext(3) <-- Resubscribe
onCompleted() <-- No resubscription
因为当第四次error出现的时候,range(1,3)
中的数字已经耗尽了,所以它隐式调用了onCompleted()
,从而导致整个zip
的结束。防止了进一步的重试。
将可变延迟策略与次数限制的重试机制结合起来
source.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override public Observable<?> call(Observable<? extends Throwable> errors) {
return errors.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Integer>() {
@Override public Integer call(Throwable throwable, Integer i) {
return i;
}
}).flatMap(new Func1<Integer, Observable<? extends Long>>() {
@Override public Observable<? extends Long> call(Integer retryCount) {
return Observable.timer((long) Math.pow(5, retryCount), TimeUnit.SECONDS);
}
});
}
})
在这种用例的比较上,我认为.flatMap()
+.timer()
的组合比单纯使用.delay()
更可取,因为我们可以通过重试次数来修改延迟时间。重试三次,并且每一次的重试时间都是5 ^ retryCount
,仅仅通过一些操作符的组合就帮助我们实现了指数退避算法(译者注:可参考二进制指数退避算法)。
网友评论
https://github.com/ReactiveX/RxJava/issues/5772。
错误 .retryWhen(throwableObservable ->
throwableObservable.zipWith(Observable.range(1, 1), (throwable, integer) -> integer))
和
可行 .retryWhen(throwableObservable -> {
AtomicInteger counter = new AtomicInteger();
return throwableObservable.takeWhile(e -> counter.getAndIncrement() < 1);
})
源码中有这么一段
void subscribeNext() {
if (wip.getAndIncrement() == 0) {
do {
if (isDisposed()) {
return;
}
if (!active) {
active = true;
source.subscribe(this);
}
} while (wip.decrementAndGet() != 0);
}
}
必须等收到错误后重试(即上面的active值为false)才能再次订阅。
不使用zip, 直接下面即可
return Observable.range(1, 3).flatMap(retryCount ->
Observable.timer((long) Math.pow(5, retryCount), TimeUnit.SECONDS)
);
或
return Observable.range(1, 3).delay(retryCount ->
Observable.timer((long) Math.pow(5, retryCount), TimeUnit.SECONDS)
);
/**
* Returns an Observable that emits the same values as the source ObservableSource with the exception of an
* {@code onComplete}. An {@code onComplete} notification from the source will result in the emission of
* a {@code void} item to the ObservableSource provided as an argument to the {@code notificationHandler}
* function. If that ObservableSource calls {@code onComplete} or {@code onError} then {@code repeatWhen} will
* call {@code onComplete} or {@code onError} on the child subscription. Otherwise, this ObservableSource will
* resubscribe to the source ObservableSource.
* <p>
* <img width="640" height="430" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/repeatWhen.f.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code repeatWhen} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param handler
* receives an ObservableSource of notifications with which a user can complete or error, aborting the repeat.
* @Return the source ObservableSource modified with repeat logic
* @see <a href="http://reactivex.io/documentation/operators/repeat.html">ReactiveX operators documentation: Repeat</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> repeatWhen(final Function<? super Observable<Object>, ? extends ObservableSource<?>> handler) {
ObjectHelper.requireNonNull(handler, "handler is null");
return RxJavaPlugins.onAssembly(new ObservableRepeatWhen<T>(this, handler));
}
onNext-->2
onNext-->3
onNext-->4
onNext-->2
onNext-->3
onNext-->4
onNext-->2
onNext-->3
onNext-->4
onCompleted-->
Process finished with exit code 0
* 统一返回结果处理
*
* @param <T>
* @Return
*/
public static <T extends MyHttpResponse> Observable.Transformer<T, T> applyRetry(final RetrofitHelper apis) {
return new Observable.Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> tObservable) {
return tObservable.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call( Observable<? extends Throwable> errors) {
return errors.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Integer>() {
@Override public Integer call(Throwable throwable, Integer i) {
@@@@@** 这里不执行怎么回事* *@@@@@
LogUtil.d("请求出现错误!重试code=" );
if (throwable instanceof ApiException){
if (((ApiException) throwable).isSessionExpried()){
apis.getApi().login(model.getLoginParams(host, account, password))
.doOnNext(new Action1<LoginBean>() {
@Override
public void call(LoginBean loginBean) {
LogUtil.d("重新登录成功!");
App.setLoginBean(loginBean);
}
return errors.zipWith(Observable.range(1, 3+1), new Func2<Throwable, Integer, Integer>() {
@Override public Integer call(Throwable throwable, Integer i) {
return i;
}
}).flatMap(new Func1<Integer, Observable<? extends Long>>() {
@Override public Observable<? extends Long> call(Integer retryCount) {
if (retryCount == 3+1) { //重试耗尽,提示失败
Observable.error(null);
} else {
return Observable.timer((long) Math.pow(5, retryCount), TimeUnit.SECONDS);
}
}
});