美文网首页
10. Rxjava2 : 重试

10. Rxjava2 : 重试

作者: 0青衣小褂0 | 来源:发表于2019-02-13 14:45 被阅读111次

    1. RxJava2 : 什么是观察者模式
    2. RxJava2 : 创建操作符(无关时间)
    3. Rxjava2 : 创建操作符(有关时间)
    4. Rxjava2 : 变换操作符
    5. Rxjava2 : 判断操作符
    6. Rxjava2 : 筛选操作符
    7. Rxjava2 : 合并操作符
    8. Rxjava2 : do操作符
    9. Rxjava2 : error处理
    10. Rxjava2 : 重试
    11. Rxjava2 : 线程切换

    api use
    retry {{retry}}
    retryUntil {{retryUntil}}
    retryWhen {{retryWhen}}
    repeat {{repeat}}
    repeatUntil {{repeatUntil}}
    repeatWhen {{repeatWhen}}

    retry

    • retry
      1.触发retry最重要的条件是看时候会发射throwable(与repeat对比)
      2.没有任何参数指定的时候,会无限次重试{{demo0}}
      3.指定重试的次数{{demo1}}
      4.指定终止条件{{demo2}}
      5.指定重试次数以及终止条件,则两条有一条满足即终止,其余均与3,4同

    • demo0 :
      无限重试

    Observable.create((ObservableOnSubscribe<Integer>) e -> {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onError(new NullPointerException());
                e.onNext(4);
            }).retry()
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.d(TAG, "integer:" + integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "onError");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete");
                        }
                    });
    

    log

    02-13 13:39:06.431 23097-23097/... D/SplashActivity: integer:1
    02-13 13:39:06.431 23097-23097/... D/SplashActivity: integer:2
    02-13 13:39:06.431 23097-23097/... D/SplashActivity: integer:3
    02-13 13:39:06.431 23097-23097/... D/SplashActivity: integer:1
    02-13 13:39:06.431 23097-23097/... D/SplashActivity: integer:2
    02-13 13:39:06.431 23097-23097/... D/SplashActivity: integer:3
    02-13 13:39:06.431 23097-23097/... D/SplashActivity: integer:1
    02-13 13:39:06.431 23097-23097/... D/SplashActivity: integer:2
    02-13 13:39:06.431 23097-23097/... D/SplashActivity: integer:3
    
    • demo1 :
      自身发送一次
      重试两次
      在第三次的时候碰到error,以onError作为终止标识
    Observable.create((ObservableOnSubscribe<Integer>) e -> {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onError(new NullPointerException());
                e.onNext(4);
            }).retry(2)
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.d(TAG, "integer:" + integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "onError");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete");
                        }
                    });
    

    log

    02-13 13:42:41.231 24025-24025/... D/SplashActivity: integer:1
    02-13 13:42:41.231 24025-24025/... D/SplashActivity: integer:2
    02-13 13:42:41.231 24025-24025/... D/SplashActivity: integer:3
    02-13 13:42:41.231 24025-24025/... D/SplashActivity: integer:1
    02-13 13:42:41.231 24025-24025/... D/SplashActivity: integer:2
    02-13 13:42:41.231 24025-24025/... D/SplashActivity: integer:3
    02-13 13:42:41.231 24025-24025/... D/SplashActivity: integer:1
    02-13 13:42:41.231 24025-24025/... D/SplashActivity: integer:2
    02-13 13:42:41.231 24025-24025/... D/SplashActivity: integer:3
    02-13 13:42:41.231 24025-24025/... D/SplashActivity: onError
    
    • demo2 :
      指定终止条件
      以onError作为终止标识
      需要注意的是:
      参数1是从1开始记,并不是从0
      第三次检测是否需要重试,条件并不满足,所以是重试了2次
    Observable.create((ObservableOnSubscribe<Integer>) e -> {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onError(new NullPointerException());
                e.onNext(4);
            }).retry(new BiPredicate<Integer, Throwable>() {
                @Override
                public boolean test(Integer integer, Throwable throwable) throws Exception {
                    Log.d(TAG, "检测是否需要重试:" + integer);
                    return integer < 3;
                }
            })
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.d(TAG, "integer:" + integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "onError");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete");
                        }
                    });
    

    log

    02-13 13:58:33.771 26179-26179/... D/SplashActivity: integer:1
    02-13 13:58:33.771 26179-26179/... D/SplashActivity: integer:2
    02-13 13:58:33.771 26179-26179/... D/SplashActivity: integer:3
    02-13 13:58:33.771 26179-26179/... D/SplashActivity: 检测是否需要重试:1
    02-13 13:58:33.771 26179-26179/... D/SplashActivity: integer:1
    02-13 13:58:33.771 26179-26179/... D/SplashActivity: integer:2
    02-13 13:58:33.771 26179-26179/... D/SplashActivity: integer:3
    02-13 13:58:33.771 26179-26179/... D/SplashActivity: 检测是否需要重试:2
    02-13 13:58:33.771 26179-26179/... D/SplashActivity: integer:1
    02-13 13:58:33.771 26179-26179/... D/SplashActivity: integer:2
    02-13 13:58:33.771 26179-26179/... D/SplashActivity: integer:3
    02-13 13:58:33.771 26179-26179/... D/SplashActivity: 检测是否需要重试:3
    02-13 13:58:33.771 26179-26179/... D/SplashActivity: onError
    

    retryUntil

    • retryUntil
      1.需注意判定条件: (与retry相反)
      true:不再重试
      false:继续重试
      2.一般用于外界指定停止条件
    Observable.create((ObservableOnSubscribe<Integer>) e -> {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onError(new NullPointerException());
                e.onNext(4);
            }).retryUntil(new BooleanSupplier() {
                @Override
                public boolean getAsBoolean() throws Exception {
                    //true 则不继续重试
                    //false 一直重试
                    i++;
                    Log.d(TAG, "进入检测了,i = " + i);
                    return i > 2;
                }
            }).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "integer:" + integer);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "onError");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
    

    log

    02-13 14:15:09.471 27881-27881/... D/SplashActivity: integer:1
    02-13 14:15:09.471 27881-27881/... D/SplashActivity: integer:2
    02-13 14:15:09.471 27881-27881/... D/SplashActivity: integer:3
    02-13 14:15:09.471 27881-27881/... D/SplashActivity: 进入检测了,i = 1
    02-13 14:15:09.471 27881-27881/... D/SplashActivity: integer:1
    02-13 14:15:09.471 27881-27881/... D/SplashActivity: integer:2
    02-13 14:15:09.471 27881-27881/... D/SplashActivity: integer:3
    02-13 14:15:09.471 27881-27881/... D/SplashActivity: 进入检测了,i = 2
    02-13 14:15:09.471 27881-27881/... D/SplashActivity: integer:1
    02-13 14:15:09.471 27881-27881/... D/SplashActivity: integer:2
    02-13 14:15:09.471 27881-27881/... D/SplashActivity: integer:3
    02-13 14:15:09.471 27881-27881/... D/SplashActivity: 进入检测了,i = 3
    02-13 14:15:09.471 27881-27881/... D/SplashActivity: onError
    

    retryWhen

    • retryWhen
      返回一个Observable
      1.这个返回的Observable当中可以正常返回的数目,代表的是原Observable发送的次数(重试次数-1)
      2.如果再次返回的Observable并不是以onError作为结束标识的,则整体会以onComplete作为结束标识
    //返回了5,6两个正常的元素,则会重试1次
    Observable.create((ObservableOnSubscribe<Integer>) e -> {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onError(new NullPointerException());
                e.onNext(4);
            }).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
                    return Observable.create(new ObservableOnSubscribe<Integer>() {
                        @Override
                        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                            e.onNext(5);
                            e.onNext(6);
                            e.onError(new NullPointerException());
                        }
                    });
                }
            }).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "integer:" + integer);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "onError");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
    

    log

    02-13 14:29:50.331 28805-28805/com.showdemo D/SplashActivity: integer:1
    02-13 14:29:50.331 28805-28805/com.showdemo D/SplashActivity: integer:2
    02-13 14:29:50.331 28805-28805/com.showdemo D/SplashActivity: integer:3
    02-13 14:29:50.331 28805-28805/com.showdemo D/SplashActivity: integer:1
    02-13 14:29:50.331 28805-28805/com.showdemo D/SplashActivity: integer:2
    02-13 14:29:50.331 28805-28805/com.showdemo D/SplashActivity: integer:3
    02-13 14:29:50.331 28805-28805/com.showdemo D/SplashActivity: onError
    
    //如果直接返回error,则原Observable发送的也不会接收
    Observable.create((ObservableOnSubscribe<Integer>) e -> {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onError(new NullPointerException());
                e.onNext(4);
            }).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
                    return Observable.create(new ObservableOnSubscribe<Integer>() {
                        @Override
                        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                            e.onError(new NullPointerException());
                        }
                    });
                }
            }).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "integer:" + integer);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "onError");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
    

    log

    02-13 14:33:00.351 29108-29108/... D/SplashActivity: onError
    
    //返回的Observable并不是以onError作为结束标识
     Observable.create((ObservableOnSubscribe<Integer>) e -> {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onError(new NullPointerException());
            })
                    .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
                        @Override
                        public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
                            return Observable.just(4,5);
                        }
                    }).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "integer:" + integer);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "onError");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
    

    log

    02-13 14:43:54.841 30166-30166/... D/SplashActivity: integer:1
    02-13 14:43:54.841 30166-30166/... D/SplashActivity: integer:2
    02-13 14:43:54.841 30166-30166/... D/SplashActivity: integer:3
    02-13 14:43:54.841 30166-30166/... D/SplashActivity: integer:1
    02-13 14:43:54.841 30166-30166/... D/SplashActivity: integer:2
    02-13 14:43:54.841 30166-30166/... D/SplashActivity: integer:3
    02-13 14:43:54.841 30166-30166/... D/SplashActivity: onComplete
    

    repeat

    • repeat
      repeat与retry功能类似,只是retry需要error来进行触发,而repeat则并不需要
    Observable.just(1,2,3)
                    .repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
                        @Override
                        public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
                            return Observable.create(new ObservableOnSubscribe<Integer>() {
                                @Override
                                public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                                    e.onNext(4);
                                    e.onNext(5);
                                    e.onError(new NullPointerException());
                                }
                            });
                        }
                    }).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "integer:" + integer);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "onError");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
    

    log

    02-13 14:38:51.091 29551-29551/... D/SplashActivity: integer:1
    02-13 14:38:51.091 29551-29551/... D/SplashActivity: integer:2
    02-13 14:38:51.091 29551-29551/... D/SplashActivity: integer:3
    02-13 14:38:51.091 29551-29551/... D/SplashActivity: integer:1
    02-13 14:38:51.091 29551-29551/... D/SplashActivity: integer:2
    02-13 14:38:51.091 29551-29551/... D/SplashActivity: integer:3
    02-13 14:38:51.091 29551-29551/... D/SplashActivity: onError
    

    相关文章

      网友评论

          本文标题:10. Rxjava2 : 重试

          本文链接:https://www.haomeiwen.com/subject/bnbreqtx.html