Rx-错误处理-Operators

作者: CokeNello | 来源:发表于2016-12-19 17:07 被阅读113次

    0 .概述

    感谢:对RxJava中.repeatWhen()和.retryWhen()操作符的思考

    本文章讲解的是RxJava1.0版本中的错误处理的操作符,
    其操作符主要有两个:
    Catch:从onError通知中恢复发射数据
    Retry:原始Observable遇到错误,重新订阅它期望它能正常终止


    1 .Retry

    Retry,顾名思义,是重试。当原始调用链抛出了onError事件,则,就会
    触发Retry。
    在RxJava中,关于Retry,有几个版本:

    1. retry() 没有传入参数,若有onError事件,则一直重试。
    2. retry(long) 传入重试的次数。
    3. retry(Func) 这个函数的两个参数是:重试次数和导
      致发射 onError 通知的 Throwable 。这个函数返回一个布尔值,如果返回 true , retry 应该
      再次订阅和镜像原始的Observable,如果返回 false , retry 会将最新的一个 onError 通知
      传递给它的观察者。

    我们来看具体的例子:

    • retry():没有传入参数,若有onError事件,则一直重试。
    Observable.just(1)
            .map(integer -> {
                LogUtils.e(OpenLog,TAG,"getInt:"+integer);
                int a = 0;
                int b = 1;
                int c = b/a;
                return integer + 10;
            })
            .retry()
            .subscribe(
                    integer -> {
                        LogUtils.e(OpenLog,TAG,"onNext:"+integer);
                    },
                    throwable -> {
                        LogUtils.e(OpenLog,TAG,"error:"+(throwable.getMessage()!=null?throwable.getMessage():"null"));
                    },
                    () -> {
                        LogUtils.e(OpenLog,TAG,"onComplete");
                    });
    

    这里,我们发生了一个整数1,在Map里面故意制造了一个onError事件,
    如果,没有onError,retry()不会起效果,把接收到的数据继续往下传。
    在这里,出现了onError,因为是retry(),所以会不断地重试。

    • retry(long):传入重试的次数。
    Observable.just(1)
            .map(integer -> {
                LogUtils.e(OpenLog,TAG,"getInt:"+integer);
                int a = 0;
                int b = 1;
                int c = b/a;
                return integer + 10;
            })
            .retry(3)
            .subscribe(
                    integer -> {
                        LogUtils.e(OpenLog,TAG,"onNext:"+integer);
                    },
                    throwable -> {
                        LogUtils.e(OpenLog,TAG,"error:"+(throwable.getMessage()!=null?throwable.getMessage():"null"));
                    },
                    () -> {
                        LogUtils.e(OpenLog,TAG,"onComplete");
                    });
    

    这里的输出是:

    12-18 14:38:05.434 11348-11348/testmodules.chestnut E/MainActivity: getInt:1
    12-18 14:38:05.437 11348-11348/testmodules.chestnut E/MainActivity: getInt:1
    12-18 14:38:05.437 11348-11348/testmodules.chestnut E/MainActivity: getInt:1
    12-18 14:38:05.438 11348-11348/testmodules.chestnut E/MainActivity: getInt:1
    12-18 14:38:05.438 11348-11348/testmodules.chestnut E/MainActivity: error:divide by zero
    

    可以看到,重试了3次。

    • retry(Func): 这个函数的两个参数是:integer, throwable,当前重试的次数和错误体。
      返回值:true:进入重试,false:结束重试,并把最新一次的throwable传递。
    Observable.just(1)
            .map(integer -> {
                LogUtils.e(OpenLog,TAG,"getInt:"+integer);
                int a = 0;
                int b = 1;
                int c = b/a;
                return integer + 10;
            })
            .retry((integer, throwable) -> {
                LogUtils.e(OpenLog,TAG,"retry:"+integer);
                if (integer>=3)
                    return false;
                else
                    return true;
            })
            .subscribe(
                    integer -> {
                        LogUtils.e(OpenLog,TAG,"onNext:"+integer);
                    },
                    throwable -> {
                        LogUtils.e(OpenLog,TAG,"error:"+(throwable.getMessage()!=null?throwable.getMessage():"null"));
                    },
                    () -> {
                        LogUtils.e(OpenLog,TAG,"onComplete");
                    });
    

    在retry里面,我们自定义了一个规则,当重试的次数达到3次,我们就返回false,也就是停止重试了。
    输出:

    12-18 14:40:20.950 11348-11348/testmodules.chestnut E/MainActivity: getInt:1
    12-18 14:40:20.951 11348-11348/testmodules.chestnut E/MainActivity: retry:1
    12-18 14:40:20.951 11348-11348/testmodules.chestnut E/MainActivity: getInt:1
    12-18 14:40:20.952 11348-11348/testmodules.chestnut E/MainActivity: retry:2
    12-18 14:40:20.952 11348-11348/testmodules.chestnut E/MainActivity: getInt:1
    12-18 14:40:20.952 11348-11348/testmodules.chestnut E/MainActivity: retry:3
    12-18 14:40:20.953 11348-11348/testmodules.chestnut E/MainActivity: error:divide by zero
    
    • retryWhen(Func1)
      **retryWhen(Func1<? super Observable<? extends java.lang.Throwable>,? extends Observable<?>> notificationHandler)**
      我们先明确:

    1 .Func1像个工厂类,用来实现你自己的重试逻辑。
    2 .输入的是一个Observable<Throwable>。
    3 .输出的是一个Observable<?>。

    如果发送的是onCompleted或者onError事件,将不会触发重订阅。相对的,如果它发送onNext事件,则触发重订阅(不管onNext实际上是什么事件)。
    其实这个跟retry差不多,只是每次把throwable用Obserable包裹了起来,当收到onError事件的时候,你可以直接在里面使用链式去判断,是否重试。

    Observable.just(1)
            .map(integer -> {
                LogUtils.e(OpenLog,TAG,"getInt:"+integer);
                int a = 1/0;
                return integer + 10;
            })
            .retryWhen(observable -> {
                return Observable.error(new Throwable("终止重试条件"));   //终止重试条件:发射onComplete或者error
            })
            .subscribe(
                    integer -> {
                        LogUtils.e(OpenLog,TAG,"onNext:"+integer);
                    },
                    throwable -> {
                        LogUtils.e(OpenLog,TAG,"error:"+(throwable.getMessage()!=null?throwable.getMessage():"null"));
                    },
                    () -> {
                        LogUtils.e(OpenLog,TAG,"onComplete");
                    });
    

    以上,在遇到onError后,直接是把Throwable再度抛出结束。

    Observable.just(1)
            .map(integer -> {
                LogUtils.e(OpenLog,TAG,"getInt:"+integer);
                int a = 1/0;
                return integer + 10;
            })
            .retryWhen(new Func1<Observable<? extends Throwable>, Observable<String>>() {
                @Override
                public Observable<String> call(Observable<? extends Throwable> observable) {
                    return observable.map(o -> {
                        LogUtils.e(OpenLog,TAG,"retryWhen:"+(o.getMessage()!=null?o.getMessage():"null"));
                        return "retryWhen,error";
                    });
                }
            })
            .subscribe(
                    integer -> {
                        LogUtils.e(OpenLog,TAG,"onNext:"+integer);
                    },
                    throwable -> {
                        LogUtils.e(OpenLog,TAG,"error:"+(throwable.getMessage()!=null?throwable.getMessage():"null"));
                    },
                    () -> {
                        LogUtils.e(OpenLog,TAG,"onComplete");
                    });
    

    上面这个,直接对错误链,用map操作符输出了一下,然后重试,其效果跟无参数的retry()差不多。

    • retryWhen(Func1,Scheduler)
      这个版本,可以制定其运行的调度器。默认的是:trampoline

    2 .Catch

    catch和程序中的catch是一样的,都是捕抓异常。
    在链式的传递中,当其发生错误时onError的时候,我们希望不终止链式的传递,而是希望根据
    异常去恢复链式。在Java中,Catch被实现成:onErrorReturn.

    • onErrorReturn
      **Observable<T> onErrorReturn(Func1<? super Throwable, ? extends T> resumeFunction)**
    • 由方法的定义可以看到,其传入异常体:throwable,然后返回一个异常前的对象体:? extends T
    • onErrorReturn 方法返回一个镜像原有Observable行为的新Observable,后者会忽略前者
      的 onError 调用,不会将错误传递给观察者,作为替代,它会发发射一个特殊的项并调用观
      察者的 onCompleted 方法。

    例子:

    Observable.just(1)
            .map(integer -> {
                LogUtils.e(OpenLog,TAG,"map:"+integer);
                int a = 1;
                int b = 0;
                int c = a/b;
                return integer;
            })
            .onErrorReturn(new Func1<Throwable, Integer>() {
                @Override
                public Integer call(Throwable throwable) {
                    LogUtils.e(OpenLog,TAG,"onErrorReturn:"+throwable.getMessage());
                    return 12321;
                }
            })
            .subscribe(
                    integer -> {
                        LogUtils.e(OpenLog,TAG,"onNext:"+integer);
                    },
                    throwable -> {
                        LogUtils.e(OpenLog,TAG,"onError:"+throwable.getMessage());
                    },
                    () -> {
                        LogUtils.e(OpenLog,TAG,"onCompleted:");
                    });
    

    Log输出如下:

    12-19 16:47:10.686 8522-8522/testmodules.chestnut E/MainActivity: map:1
    12-19 16:47:10.690 8522-8522/testmodules.chestnut E/MainActivity: onErrorReturn:divide by zero
    12-19 16:47:10.691 8522-8522/testmodules.chestnut E/MainActivity: onNext:12321
    12-19 16:47:10.691 8522-8522/testmodules.chestnut E/MainActivity: onCompleted:
    
    • onErrorResumeNext
      和onErrorReturn一样,只不过,它返回的是一个新的Observable。
    Observable.just(1)
            .map(integer -> {
                LogUtils.e(OpenLog,TAG,"map:"+integer);
                int a = 1;
                int b = 0;
                int c = a/b;
                return integer;
            })
            .onErrorResumeNext(new Func1<Throwable, Observable<? extends Integer>>() {
                @Override
                public Observable<? extends Integer> call(Throwable throwable) {
                    return Observable.just(123);
                }
            })
            .subscribe(
                    integer -> {
                        LogUtils.e(OpenLog,TAG,"onNext:"+integer);
                    },
                    throwable -> {
                        LogUtils.e(OpenLog,TAG,"onError:"+throwable.getMessage());
                    },
                    () -> {
                        LogUtils.e(OpenLog,TAG,"onCompleted:");
                    });
    
    • onExceptionResumeNext
      和 onErrorResumeNext 类似, onExceptionResumeNext 方法返回一个镜像原有Observable行为
      的新Observable,也使用一个备用的Observable,不同的是,如果 onError 收到
      的 Throwable 不是一个 Exception ,它会将错误传递给观察者的 onError 方法,不会使用备用
      的Observable。

    相关文章

      网友评论

        本文标题:Rx-错误处理-Operators

        本文链接:https://www.haomeiwen.com/subject/giwamttx.html