RxJava Error Handling Operators

作者: lluo2010 | 来源:发表于2016-10-04 15:53 被阅读273次

    RxJava Error Handling Operators

    简介

    RxJava提供了一些操作函数去处理错误相关的情景,使用这些函数可以:

    • 吞掉这个错误,切换到一个备用的Observable继续发射数据
    • 吞掉这个错误然后发射默认值
    • 吞掉这个错误并立即尝试重启这个Observable
    • 吞掉这个错误,在等待一段时间后重启这个Observable

    相关处理函数

    处理错误相关的操作函数包括:

    onErrorReturn

        public final Observable<T> onErrorReturn(Func1<java.lang.Throwable,? extends T> resumeFunction)
    

    Instructs an Observable to emit an item (returned by a specified function) rather than invoking onError if it encounters an error.

    一般出错时,会调用onError,然后结束.使用onErrorReturn,onError不会调用,而是发送onErrorReturn里面提供的值,然后onCompleted会被调用. 注意,紧随其后onCompleted会调用.

    例子:

        Observable<String> observable = Observable.create(new OnSubscribe<String>(){
                @Override
                public void call(Subscriber<? super String> arg0) {
                    arg0.onNext("a1");
                    arg0.onNext("a2");
                    arg0.onError(new Exception("testErrorReturn mock error"));
                }
    
            });
    
            observable.onErrorReturn(new Func1<Throwable, String>() {
    
                @Override
                public String call(Throwable arg0) {
                    Utils.println("onErrorReturn call");
                    return "aError";
                }
            })
            .subscribe(new Subscriber<String>() {
    
                @Override
                public void onCompleted() {
                    Utils.println("onCompleted");
                }
    
                @Override
                public void onError(Throwable arg0) {
                    Utils.println("onError");
                }
    
                @Override
                public void onNext(String arg0) {
                    Utils.println("onResult:"+arg0);
                }
            });
    输出:
        onResult:a1
        onResult:a2
        onErrorReturn call
        onResult:aError
        onCompleted
    

    onErrorResumeNext

    声明:

        public final Observable<T> onErrorResumeNext(final Observable<? extends T> resumeSequence) {
        public final Observable<T> onErrorResumeNext(final Func1<Throwable, ? extends Observable<? extends T>> resumeFunction);
    

    如果原Observable发生错误时,不调用onError,而是使用一个新的Observable,调用它的call函数(其实就是订阅它),里面onNext的值回传到subscriber,如果有调onCompleted,则subscriber会收到,如果没有则不会收到. 即后续的行为由新的Observable决定了.

    上面两个函数的区别是第一个始终提供一个Observable,而第二个可以根据不同的Throwable提供不同的Observable.

    例子:

            Observable<String> observable = Observable.create(new OnSubscribe<String>() {
    
                @Override
                public void call(Subscriber<? super String> arg0) {
                    arg0.onNext("value1");
                    arg0.onNext("value2");
                    arg0.onNext("value3");
                    arg0.onError(new Exception("mock testErrorResumeNext exception"));
                    arg0.onNext("value4");
                }
            });
            Observable<String> resumeObservable = Observable.create(new OnSubscribe<String>() {
    
                @Override
                public void call(Subscriber<? super String> arg0) {
                    arg0.onNext("valueMock1");
                    arg0.onNext("valueMock2");
                }
            });
    
            observable.onErrorResumeNext(resumeObservable).subscribe(getSubscriber());
    输出:
        onResult:value1
        onResult:value2
        onResult:value3
        onResult:valueMock1
        onResult:valueMock2
    

    如果resumeObservable调用onCompleted,则onCompleted会被调用.

    onExceptionResumeNext

        public final Observable<T> onExceptionResumeNext(final Observable<? extends T> resumeSequence);
    

    Instructs an Observable to pass control to another Observable rather than invoking Observer#onError if it encounters an Exception.

    *** 和onErrorResumeNext类似,区别是onErrorResumeNext是针对onError的throwable,而onExceptionResumeNext仅仅是针对onError里的类型是Exception. ***

    retry

    if a source Observable emits an error, resubscribe to it in the hopes that it will complete without error.

    如果Observable.onError调用,即发射错误,会重新订阅原来的Obserable,即重新触发Observable,也就是从新调它的call函数.这种方式数据会重复.

    1. public final Observable<T> retry():
      无限重试,直到成功.
    2. public final Observable<T> retry(final long count):
      指定次数重试,超过指定次数还没成功,则subscriber.onError会被调用.
    3. public final Observable<T> retry(Func2<Integer, Throwable, Boolean> predicate):
      传入一个Fun2,里面有两个入参,一个表示目前参数的次数,另一个就是Observable.onError里的类型,返回值true表示继续重试,false表示不重试,Subscriber.onError会被调用.
      也就是说只要Observable.onError被调用后,会调用Func2去判断是否要重试,Integer表示询问重试的次数,从1开始.

    retry(count)例子:

        Observable<String> observable = Observable.create(new OnSubscribe<String>() {
    
                @Override
                public void call(Subscriber<? super String> arg0) {
                    arg0.onNext("value1");
                    arg0.onNext("value2");
                    arg0.onNext("value3");
                    arg0.onError(new Exception("mock testRetryCount exception"));
                    arg0.onNext("value4");
                }
            });
    
            observable.retry(1).subscribe(getSubscriber());
    输出:
        onResult:value1
        onResult:value2
        onResult:value3
        onResult:value1
        onResult:value2
        onResult:value3
        onError
    

    从上面可以看出,重试了一次,后面不重试,Subscriber.onError被调用.另外由于重试,数据重复了.

    retry(Func2)例子:

        Observable<String> observable = Observable.create(new OnSubscribe<String>() {
                @Override
                public void call(Subscriber<? super String> arg0) {
                    arg0.onNext("value1");
                    arg0.onNext("value2");
                    arg0.onNext("value3");
                    arg0.onError(new Exception("mock testRetryFunc2 exception"));
                    arg0.onNext("value4");
                }
            });
    
            observable.retry(new Func2<Integer, Throwable,     Boolean>() {
                @Override
                public Boolean call(Integer arg0, Throwable arg1) {
                    Utils.println("testRetryFunc2 func2::call time:"+arg0.intValue());
                    return (arg0.intValue()>1)?false:true;
                }
            })
            .subscribe(getSubscriber());
    输出:
        onResult:value1
        onResult:value2
        onResult:value3
        testRetryFunc2 func2::call time:1
        onResult:value1
        onResult:value2
        onResult:value3
        testRetryFunc2 func2::call time:2
        onError
    

    Integer从1开始,第一次重试,第二次返回false所以就不重试了,直接onError了.

    retryWhen

        public final Observable<T> retryWhen(final Func1<? super Observable<? extends Throwable>, ? extends Observable<?>> notificationHandler);
        public final Observable<T> retryWhen(final Func1<? super Observable<? extends Throwable>, ? extends Observable<?>> notificationHandler, Scheduler scheduler);
    

    if a source Observable emits an error, pass that error to another Observable to determine whether to resubscribe to the source

    retryWhen的输入参数是Func1,接受一个输入Observable<Throwable>, 返回输出参数Observable<?>。

    返回的Observable<?>所要发送的事件决定了重试是否会发生:

    • 如果发送的是onCompleted或者onError事件,将不会触发重订阅。
    • 如果它发送onNext事件,则触发重订阅(不管onNext实际上是什么事件)。

    这就是为什么使用了通配符作为泛型类型:这仅仅是个通知(next, error或者completed),一个很重要的通知而已。

    *** 可以理解为如果原Observable.onError发生, 如果Func1返回的Observable<?>的onNext被调用,则重新订阅原来的Observable重试,否则不重试.***

    输入的Observable(即Func1里Observable<?extends Throwable>)必须作为输出Observable(即Func1的返回值)的源。我们必须对Observable<Throwable>做出反应,然后基于它发送事件;不能只返回一个通用泛型流。

    原Observable每次一调用onError(Throwable),Observable<Throwable>都会被作为输入传入方法中。换句话说就是,它的每一次调用我们都需要决定是否需要重订阅。

    下面的例子会重试:

    source.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
                  @Override public Observable<?> call(Observable<? extends Throwable> errors) {
                    return errors.flatMap(new Func1<Throwable, Observable<?>>() {
                      @Override public Observable<?> call(Throwable error) {
                        return Observable.timer(5, TimeUnit.SECONDS);
                      }
                    });
                  }
                })
    

    总结:

    1. onErrorReturn,吞掉错误,提供一个默认的返回值.
    2. onErrorResumeNext,吞掉错误,提供一个默认的Observable继续发送数据.
    3. onExceptionResumeNext,和onErrorResumeNext类似,区别是onErrorResumeNext是针对onError的throwable,而onExceptionResumeNext仅仅是针对onError里的类型是Exception.
    4. retry,吞掉错误,按条件重新订阅重试
    5. retryWhen,吞掉错误,由另外一个提供的Observable的行为决定是否重试,如果onNext被调用,则重新订阅重试,onError或者onComplete被调用,则不重试.

    RxJava特有的异常

    • CompositeException:
      表示发生了多个异常。你可以使用异常的 getExceptions() 方法获取单独的异常。

    • MissingBackpressureException:
      表示一个订阅者或者操作符试图对一个不支持反压操作的Observable应用该操作。可以参考 [[Backpressure]] 查找针对没有实现反压操作的Observable的解决办法。

    • OnErrorFailedException:
      表示Observable尝试调用观察者的 onError() 方法,但是那个方法自己抛出了异常。

    • OnErrorNotImplementedException:
      表示一个Observable尝试调用它的观察者的onError() 方法,但是那个方法不存在。有多种方法可以消除这个错误:可以调整Observable使它不会到达这个错误条件,也可以在观察者中实现一个onError 处理器, 或者使用其它的操作符在错误到达之前拦截这个 onError 通知。

    • OnErrorThrowable:
      观察者们可以用这种形式传递异常给它们的观察者们的 onError() 方法。相比标准的Throwable,这种Throwable包含更多的信息:错误本身和在错误发生时Observable的内部状态。

    ...

    ...

    Reference

    1. Error Handling Operators
    2. RxJava错误处理
    3. RxJava操作符(五)Error Handling
    4. RxJava 教程第三部分:驯服数据流之 高级错误处理

    相关文章

      网友评论

      本文标题:RxJava Error Handling Operators

      本文链接:https://www.haomeiwen.com/subject/lytsyttx.html