美文网首页
RxJava的错误处理一-retry

RxJava的错误处理一-retry

作者: Juude | 来源:发表于2017-10-25 16:06 被阅读712次

    RxJava的错误处理主要分为两类,retry系列以及onErrorRetrun系列。retry系列是当错误的时候,重新subscribe。onErrorReturn系列则是当出错了返回数据到onNext中。本文介绍下retry系列相关用法。

    retry

    retry系列的操作符主要有retry()
    , retry(long)
    retry(Func2)
    retry(n)当发生onError的时候会重试n次,例如如下代码:

    @Test
    public void testRetry() {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext(String.valueOf(System.currentTimeMillis()));
                subscriber.onError(new Error("error"));
            }
        })
        .doOnSubscribe(new Action0() {
            @Override
            public void call() {
                atomicInteger.incrementAndGet();
            }
        })
        .retry(2)
        .toBlocking()
        .subscribe(new TestSubscriber<String>());
        Assert.assertTrue(atomicInteger.intValue() == 3);
    }
    

    初始化atomicInteger为0,在doOnSubscribe加一,重试次数为2次,所以最终相当于onSubscribe执行了3次。

    retryWhen

    另外一个方法retryWhen的方法是根据得到的Throwable生成新的Observerable, 示例代码如下:

    @Test
    public void testRetryWhen() {
        final AtomicInteger atomicInteger = new AtomicInteger(3);
        Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext(String.valueOf(System.currentTimeMillis()));
                subscriber.onError(new Error(String.valueOf(atomicInteger.decrementAndGet())));
            }
        })
        .retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
            @Override
            public Observable<?> call(Observable<? extends Throwable> observable) {
                return observable.takeWhile(new Func1<Throwable, Boolean>() {
                    @Override
                    public Boolean call(Throwable throwable) {
                        return Integer.parseInt(throwable.getMessage()) > 0;
                    }
                })
                .flatMap(new Func1<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> call(Throwable throwable) {
                        return Observable.timer(1, TimeUnit.SECONDS);
                    }
                });
            }
        })
        .toBlocking()
        .subscribe(new TestSubscriber<String>());
        Assert.assertEquals(atomicInteger.intValue(), 0);
    }
    

    这里接受到throwableObserverable后,用takeWhile来判断thrwoable的属性,这里用一个AtomicInteger,设置最大重试次数为3,每次减1,当等于0则不再重试,再现实生活中,也可以判断Exception的类型等方式判断是否需要重试。接着用flatMap返回Observerable.timer来延迟重试到1秒以后。

    参考

    相关文章

      网友评论

          本文标题:RxJava的错误处理一-retry

          本文链接:https://www.haomeiwen.com/subject/cgwkettx.html