美文网首页NetMobDevGroupAndroid开发收藏夹
All RxJava - 为Retrofit添加重试

All RxJava - 为Retrofit添加重试

作者: 小鄧子 | 来源:发表于2017-06-04 14:55 被阅读8538次

    在我们的日常开发中离不开I/O操作,尤其是网络请求,但并不是所有的请求都是可信赖的,因此我们必须为APP添加请求重试功能。

    对于一个网络请求重试而言,我认为它至少应该做到以下两点:

    • 可配置次数的重试
      因为并不是所有的网络请求都需要频繁地重试,比如说一个重要的表单提交,它应该尽可能多失败重连,相反地,埋点上报等统计功能,它可能最多只需要重试一次就足够了。因此针对不同的场景,我们需要不同的重试次数。

    • 退避策略
      我们应该为请求重试加入一个合理的退避算法,而不是一旦遭遇了失败就立即无脑般的再次发起请求,这样做没有一点好处,不但降低了用户体验,甚至还在浪费网络资源。一个合理的重试策略应该是:遇到网络异常时应该等待一段时间后再重试,若遇到的异常次数越多,等待(退避)的时间就应该越长。

    我一直使用SquareretrofitReactiveXRxJava,接下来我就来分享一下我是如何使用这两个库来实现一个可配置次数的退避重试策略的。

    Repeat? Retry!

    RxJava中有两个操作符能够触发重订阅,分别是:

    从上面的弹珠图中,我们可以了解到,这两个操作符的区别仅仅是针对不同的“终止事件”来会触发重订阅:.repeat()接收到onCompleted后触发重订阅;而.retry()则是接收到OnError后触发重订阅。

    需要注意的是,千万不要使用这两个操作符无限地重订阅源Observable,一定要在恰当的时候通过取消订阅的方式来停止它们,避免陷入无限循环,从而导致系统崩溃。除此之外还可以使用它们的重载函数.repeat(n).retry(n),来设置一个合适的重订阅次数n

    ps : 写这篇博客的时候我参照了RxJava-1.2.10的源码,.repeat().retry()的内部实现几乎是一模一样的,一点细微不同是:除了取消订阅能够同时终止它俩的重订阅之外,.repeat()还能被OnError终止,相对的.retry()能被onCompleted终止。

    回到本篇文章的主题上,我们需要的是在遭遇I/O异常时,发起重试,而不是请求成功时,很明显的.retry()胜出!

    Retry?RetryWhen!

    首先,我们需要认清的事实是:所有的网络异常都属于I/O异常

    我们的重点是,只有遭遇了IOException时才重试网络请求,也就是说那些IllegalStateExceptionNullPointerException或者当你使用gson来解析json时还可能出现的JsonParseException等非I/O异常均不在重试的范围内。

    因此.retry()以及它的重载函数已经不能满足我们的需求了,好在RxJava为我们提供了另一个非常有用的操作符.retryWhen(),我们可以通过判断异常类型,来决定是否发起重试(重订阅)。

    .retryWhen()的函数签名如下:

    public final Observable<T> retryWhen(Func1<? super Observable<? extends java.lang.Throwable>,? extends Observable<?>> notificationHandler)
    

    其中notificationHandler是我们需要实现的函数,它有两个概念必须弄清:

    • 参数Observable<Throwable>,其中的泛型意指上游操作符抛出的异常,我们可以通过这个条件来判断异常的类型。

    • 返回值Observable<?>,通配符(泛型)表示我们可以返回任意类型的Observable,它的作用是:一旦这个Observable通过onNext()发送事件,则重订阅(重试)发生一次,如果这个Observable调用了onComplete或者onError那么将跳过重订阅,最终这些终止事件将会向下传递,从此这个操作符的重订阅功能也就失效了。

    RX-CODE!

    下面这段代码是我使用的notificationHandler的实现类RetryWhenHandler,它基本满足了我的重试要求。

    final class RetryWhenHandler implements Func1<Observable<? extends Throwable>, Observable<Long>> {
    
      private static final int INITIAL = 1;
      private int maxConnectCount = 1;
    
      RetryWhenHandler(int retryCount) {
        this.maxConnectCount += retryCount;
      }
    
      @Override public Observable<Long> call(Observable<? extends Throwable> errorObservable) {
        return errorObservable.zipWith(Observable.range(INITIAL, maxConnectCount),
            new Func2<Throwable, Integer, ThrowableWrapper>() {
              @Override public ThrowableWrapper call(Throwable throwable, Integer i) {
    
                //①
                if (throwable instanceof IOException) return new ThrowableWrapper(throwable, i);
    
                return new ThrowableWrapper(throwable, maxConnectCount);
              }
            }).concatMap(new Func1<ThrowableWrapper, Observable<Long>>() {
          @Override public Observable<Long> call(ThrowableWrapper throwableWrapper) {
    
            final int retryCount = throwableWrapper.getRetryCount();
    
            //②
            if (maxConnectCount == retryCount) {
              return Observable.error(throwableWrapper.getSourceThrowable());
            }
    
            //③
            return Observable.timer((long) Math.pow(2, retryCount), TimeUnit.SECONDS,
                Schedulers.immediate());
          }
        });
      }
    
      private static final class ThrowableWrapper {
    
        private Throwable sourceThrowable;
        private Integer retryCount;
    
        ThrowableWrapper(Throwable sourceThrowable, Integer retryCount) {
          this.sourceThrowable = sourceThrowable;
          this.retryCount = retryCount;
        }
    
        Throwable getSourceThrowable() {
          return sourceThrowable;
        }
    
        Integer getRetryCount() {
          return retryCount;
        }
      }
    }
    

    有三点地方需要注意:

    ① 只在IOException的情况下记录本次请求在最大请求次数中的位置,否则视为最后一次请求,避免多余的请求重试。

    ②如果最后一次网络请求依然遭遇了异常,则将此异常继续向下传递,以便在最后的onError()函数中处理。

    ③使用.timer()操作符实现一个简单的二进制指数退避算法,需要注意的是.timer()操作符默认执行在Schedulers.computation(),我们并不希望它切换到别的线程去执行重试逻辑,因此使用了它的重载函数,并指定在当前线程立即执行。

    @Retry

    由于retrofit的请求参数是基于函数描述的,因此我们创建一个注解Retry用来描述重试次数。代码如下:

    @Documented
    @Retention(RUNTIME)
    @Target(METHOD)
    public @interface Retry {
      //retry times when an IOException is encountered
      int count() default 0;
    }
    

    值得一提的是,我们只希望这个注解能够被声明在方法上,而且必须是RuntimeVisibleAnnotations,否则我们无法在运行时拿到。

    假设你已经阅读过了retrofit的源码,至少知道如何使用CallAdapter.Factory来定义一个CallAdapter。如果对它不了解,则只需要记住,在CallAdapter.Factory中我们必须实现的抽象方法,其中第二个参数annotations包含了我们定义在方法上的所有RUNTIME注解。:

     public abstract @Nullable CallAdapter<?, ?> get(Type returnType, Annotation[] annotations,
     Retrofit retrofit);
    

    接下来,稍微改造一下RxJavaCallAdapter的构造函数,添加一个重试变量,并在Observable调用链中添加我们之前已经写好的RetryWhenHandler:

    final class RxJavaCallAdapter<R> implements CallAdapter<R, Object> {
      private final Type responseType;
      private final @Nullable Scheduler scheduler;
      private final int retryCount;
      private final boolean isAsync;
      private final boolean isResult;
      private final boolean isBody;
      private final boolean isSingle;
      private final boolean isCompletable;
    
      RxJavaCallAdapter(Type responseType, @Nullable Scheduler scheduler, int retryCount,
          boolean isAsync, boolean isResult, boolean isBody, boolean isSingle, boolean isCompletable) {
        this.responseType = responseType;
        this.scheduler = scheduler;
        this.retryCount = retryCount
        this.isAsync = isAsync;
        this.isResult = isResult;
        this.isBody = isBody;
        this.isSingle = isSingle;
        this.isCompletable = isCompletable;
      }
    
      @Override public Type responseType() {
        return responseType;
      }
    
      @Override public Object adapt(Call<R> call) {
        OnSubscribe<Response<R>> callFunc = isAsync
            ? new CallEnqueueOnSubscribe<>(call)
            : new CallExecuteOnSubscribe<>(call);
    
        OnSubscribe<?> func;
        if (isResult) {
          func = new ResultOnSubscribe<>(callFunc);
        } else if (isBody) {
          func = new BodyOnSubscribe<>(callFunc);
        } else {
          func = callFunc;
        }
        Observable<?> observable = Observable.create(func).retryWhen(new RetryWhenHandler(retryCount));
    
        if (scheduler != null) {
          observable = observable.subscribeOn(scheduler);
        }
    
        if (isSingle) {
          return observable.toSingle();
        }
        if (isCompletable) {
          return observable.toCompletable();
        }
        return observable;
      }
    }
    

    解析@Retry注解的操作需要放在RxJavaCallAdapterFactory#Line104中:

    int count;
    for (Annotation annotation : annotations) {
      if (!Retry.class.isAssignableFrom(annotation.getClass())) continue;
      count = Retry.class.cast(annotation).count();
      if (count<0) throw new IllegalArgumentException(
          "The count in the \'@Retry\' is less than zero");
    }
    

    总结

    至此,我们基本完成了通过RxJava为retrofit添加重试的功能,它利用retrofit本身的“基于方法描述的特性”,因此足够灵活,而且扩展性也很高 : )

    当然,不局限于此,如果你使用了okhttp,还可以通过自定义Interceptor的方式,为你的网络请求添加失败重试功能。

    这篇文章只是提供一个简单的思路,对于健壮应用程序,我们仍然需要不断的尝试与探索,如果你有更好的经验,欢迎分享,如果你喜欢这篇文章,请点个赞。

    文中所有代码,都可以从github上获取Forked from retrofit,希望这篇文章能够对你所有帮助。Happy coding, enjoy it.

    参考

    【译】对RxJava中.repeatWhen()和.retryWhen()操作符的思考 - 邓伟

    相关文章

      网友评论

      • hewking:请求的url 做过加密也就是 服务器同一个url 只能请求一次, 重试如何动态更改url 再重新发起请求
      • 81c17c76d562:hi,我想问一个问题,当使用RxJavaCallAdapter时,将Call转换为了observable,同时也丢失了call的其他信息,例如call里面的request的全部信息。当我在使用observable进行数据流操作的时候,如何再去获得request的原始信息,例如url?
        小鄧子:最简单也是最直接的办法就是将接口的返回值定义成Observable<Response<Foo>>,这样在onNext里面就能使用Response<Foo>了,既能拿到实体类又能获取请求信息。
      • 下位子:厉害了

      本文标题:All RxJava - 为Retrofit添加重试

      本文链接:https://www.haomeiwen.com/subject/sjmefxtx.html