这篇文章算是对【译】对RxJava中.repeatWhen()和.retryWhen()操作符的思考的一个简单的补充,建议没看过的先看看上面这篇。
前言
才学RxJava的时候还是挺困惑的,感觉有特别多的对『时间』的操作符,比如timer()
、interval()
、delay()
、defer()
等等……
总之功能太强大,直接吓跑了一群初学者。身边有朋友这样跟我说:
RxJava为了省点代码,把逻辑弄这么复杂,看着都晕了,我宁愿多写点 if-else
我只能回复:用RxJava逻辑肯定是变简单了,一旦用上手了,再也离不开了。现在让我写个Thread + Handler
我都觉得麻烦。
正题
先看timer()
、interval()
、delay()
的小球图吧
interval
timer
timer()
这里说的是新版本的timer()
,而老版本的timer()
已经跟interval()
合并了。
timer()
:创建一个Observable
,它在一个给定的延迟后发射一个特殊的值
这里需要注意,定义里面说的是『一个』,所以有别于之前用的TimerTask
。timer()
只是用来创建一个Observable
,并延迟发送一次的操作符,timer()
并不会按周期执行。
interval()
interval()
:创建一个按固定时间间隔发射整数序列的Observable
这个比较好理解,interval()
也是用来创建Observable
的,并且也可以延迟发送。但interval()
是按周期执行的,所以可以这么认为:interval()
是一个可以指定线程的TimerTask
(威力加强版……)
delay()
delay()
:延迟一段指定的时间再发送来自Observable
的发送结果
语文没学好肯定读不懂这一段,我才看到这句话的时候也懵了……
其实delay()
的常规使用跟timer()
一致,那区别在哪呢?delay()
是用于流中的操作,跟map()
、flatMap()
的级别是一样的。而timer()
是用于创建Observable
,跟just()
、from()
的级别是一样的。
总结
timer()
:用于创建Observable
,延迟发送一次。
interval()
:用于创建Observable
,跟TimerTask
类似,用于周期性发送。
delay()
:用于事件流中,可以延迟发送事件流中的某一次发送。
网络连接失败的处理
看过最前面那篇文章的应该很清楚retryWhen()
是什么了。
我再来总结一下,retryWhen()
的直面意思就是:发生错误了,接下来该做什么。
retryWhen()
是RxJava
的一种错误处理机制,当遇到错误时,将错误传递给另一个Observable来决定是否要重新给订阅这个Observable
延迟重试
来想象一个场景:用户用的2G网络或者WiFi信号不稳定,导致网络经常连接失败,其实这个时候只要多努力一下就可以连接成功了,如果此时弹出错误提示,体验肯定不好,所以这里就要用到重试机制
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
return observable.flatMap(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable throwable) {
return Observable.timer(5, TimeUnit.SECONDS);
}
});
}
这里就是一个最简单的错误重试机制(别看字母多就复杂,其实逻辑只有两句,用lambda
表达式的知道我不是乱说…),如果网络连接失败那么会每隔5秒进行一次重试,直到连接成功为止。
刚刚我说了timer()
跟delay()
很像,只是用的时机不同,所以上面的代码最后的return
部分还可以这样写:
return Observable.just(throwable).delay(5, TimeUnit.SECONDS);
判断错误类型
再模拟一个场景:用户不是手机信号不好,而是根本就没打开网络,此时还傻傻的重试只是浪费电量而已,所以我们可以加一个判断,打开了网络才重试,没有打开网络就继续发送失败的消息。
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
return observable.flatMap(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable throwable) {
if (throwable instanceof UnknownHostException) {
return Observable.error(throwable);
}
return Observable.timer(5, TimeUnit.SECONDS);
}
});
加入重试超时
继续想,重试也不可能永远进行,一般都会设置一个重试超时的机制。这里我借用了上面那篇文章的代码:
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
return observable.flatMap(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable throwable) {
if (throwable instanceof UnknownHostException) {
return Observable.error(throwable);
}
return Observable.just(throwable).zipWith(Observable.range(1, 5), new Func2<Throwable, Integer, Integer>() {
@Override
public Integer call(Throwable throwable, Integer i) {
return i;
}
}).flatMap(new Func1<Integer, Observable<? extends Long>>() {
@Override
public Observable<? extends Long> call(Integer retryCount) {
return Observable.timer((long) Math.pow(5, retryCount), TimeUnit.SECONDS);
}
});
}
});
}
代码变得越来越多和难读了……试一下lambda
表达式看看:
.retryWhen(observable -> observable.flatMap((Throwable throwable) -> {
if (throwable instanceof UnknownHostException) {
return Observable.error(throwable);
}
return Observable.just(throwable)
.zipWith(Observable.range(1, 5), (throwable1, i) -> i)
.flatMap(retryCount -> Observable
.timer((long) Math.pow(5, retryCount), TimeUnit.SECONDS));
}));
这样清爽多了,不过还是先用上面那种吧,毕竟更多人还没用到lambda
。
重试的复用
想一下,没有哪个APP只有一个接口地址吧 - -#,如果你用的Retrofit
那么每一个接口返回的Observable
都要手动加上上面的重试代码,如果是我,我肯定报警了……所以我们必须把刚刚写的重试代码封装成一个类:
public class RetryWhenProcess implements Func1<Observable<? extends Throwable>, Observable<?>> {
private long mInterval;
public RetryWhenProcess(long interval) {
mInterval = interval;
}
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
return observable.flatMap(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable throwable) {
return observable.flatMap(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable throwable) {
if (throwable instanceof UnknownHostException) {
return Observable.error(throwable);
}
return Observable.just(throwable).zipWith(Observable.range(1, 5), new Func2<Throwable, Integer, Integer>() {
@Override
public Integer call(Throwable throwable, Integer i) {
return i;
}
}).flatMap(new Func1<Integer, Observable<? extends Long>>() {
@Override
public Observable<? extends Long> call(Integer retryCount) {
return Observable.timer((long) Math.pow(mInterval, retryCount), TimeUnit.SECONDS);
}
});
}
});
}
});
}
}
使用时,只需要加上:
.retryWhen(new RetryWhenProcess(5))
即可。
终极网络处理方案
最后,终极的处理方案肯定是这样的:能监听网络连接的广播自动重试,对网络无连接的情况不进行重试,并且重试有超时机制与重试间隔。
用RxJava
可以比较方便的做到,请看这篇文章Improving UX with RxJava
网友评论
.retryWhen(new Function<Observable<Throwable>, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable
.zipWith(Observable.range(1, 10)
, new BiFunction<Throwable, Integer, Integer>() {
@Override
public Integer apply(@NonNull Throwable throwable, @nonnull Integer integer) throws Exception {
Log.d(TAG, "apply: integer" + integer + " time:" + DateUtil.getDateString());
return integer;
}
}).flatMap(new Function<Integer, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(@NonNull Integer retryCount) throws Exception {
Log.d(TAG, "apply: retryCount" + retryCount + " time:" + DateUtil.getDateString());
return Observable.timer(50, TimeUnit.MILLISECONDS);//不能使用 //Math.pow(mInterval, retryCount)延时太久了,指数幂太恐怖
}
});
}
})
RetryWhenProcess.class
你会发现retryCount永远都等于1,这样的话他就会无限轮询重试,直至程序ANR,不知道你是怎么解决这个问题的
然后if(++retryCount>=3){
Observable.error(throwable);
}else{
轮询
}
你所说的改为3是像下面这么改的?
range(1, 3)