美文网首页JAVA相关Android开发Android知识
RxJava入门(5):功能性操作符

RxJava入门(5):功能性操作符

作者: tmyzh | 来源:发表于2018-02-23 15:52 被阅读18次

    新年快乐,开工大吉

    filter 过滤范围外的数据
    Observable.just(1,2,3,4,5)
    
                   .filter(new Predicate() {
    
                       @Override
    
                      public boolean test(Integer integer) throws Exception {
    
                                  Log.e("yzh","filter");
    
                                    return integer>3;
    
                  }
    
                  }).subscribe(new Observer() {
    
               @Override
    
              public void onSubscribe(Disposable d) {
    
    
    
               }
    
               @Override
    
               public void onNext(Integer integer) {
    
                   Log.e("yzh","onNext--"+integer);
    
                }
    
    
    
               @Override
    
                public void onError(Throwable e) {
    
               }
    
    
    
               @Override
    
                public void onComplete() {
    
    
    
               }
    
            });
    

    打印结果

           02-07 15:35:28.549 22207-22207/com.example.issuser.rxtest E/yzh: filter
           02-07 15:35:28.549 22207-22207/com.example.issuser.rxtest E/yzh: filter
           02-07 15:35:28.549 22207-22207/com.example.issuser.rxtest E/yzh: filter
           02-07 15:35:28.549 22207-22207/com.example.issuser.rxtest E/yzh: filter
           02-07 15:35:28.549 22207-22207/com.example.issuser.rxtest E/yzh: onNext--4
           02-07 15:35:28.549 22207-22207/com.example.issuser.rxtest E/yzh: filter
           02-07 15:35:28.549 22207-22207/com.example.issuser.rxtest E/yzh: onNext--5
    
    ofType 过滤掉某些类型的数据
            Observable.just("1",2,"abv",5,"qqq")
    
                    .ofType(Integer.class)
    
                    .subscribe(new Consumer() {
    
                        @Override
    
                        public void accept(Integer integer) throws Exception {
    
                                Log.e("yzh","accept--"+integer);
    
                        }
    
                    }) ;
    

    打印结果

     02-07 15:40:03.320 22735-22735/com.example.issuser.rxtest E/yzh: accept--2
     02-07 15:40:03.320 22735-22735/com.example.issuser.rxtest E/yzh: accept--5
    
    skip去掉第一个数据 skiplast去掉最后一个数据
       Observable.intervalRange(0,5,1,1, TimeUnit.SECONDS)
                          .skip(1)
                          .skipLast(1)
                          .subscribe(new Consumer() {
                                @Override
                                public void accept(Long integer)throws Exception {
                                      Log.e("yzh","accept--"+integer);
                              }
    });
    

    打印结果

    02-07 15:59:26.198 24724-24765/com.example.issuser.rxtest E/yzh: accept--1
    02-07 15:59:27.198 24724-24765/com.example.issuser.rxtest E/yzh: accept--2
    02-07 15:59:28.198 24724-24765/com.example.issuser.rxtest E/yzh: accept--3
    
    distinct去掉重复数据
       Observable.just(1,2,3,2,1)
                         .distinct()
                        .subscribe(new Consumer() {
                            @Override
                            public void accept(Integer integer)throws Exception {
                                  Log.e("yzh","accept--"+integer);
                        }
    });
    

    打印结果

    accept--1
    accept--2
    accept--3
    
    delay 使得被观察者延迟一段时间发送
      Observable.just(1,2,3)
                    .delay(3, TimeUnit.SECONDS)
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.e("yzh","onSubscribe");
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.e("yzh","onNext--"+integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
    

    打印结果

    02-23 15:21:24.146 9436-9436/com.example.issuser.rxtest E/yzh: onSubscribe
    02-23 15:21:27.148 9436-9478/com.example.issuser.rxtest E/yzh: onNext--1
    02-23 15:21:27.148 9436-9478/com.example.issuser.rxtest E/yzh: onNext--2
    02-23 15:21:27.148 9436-9478/com.example.issuser.rxtest E/yzh: onNext--3
    
    do 在某个事件的生命周期中调用
     Observable.create(new ObservableOnSubscribe<Integer>() {
    
                @Override
                public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                    e.onNext(1);
                    e.onNext(2);
                    e.onNext(3);
                    e.onError(new Throwable("发生错误了"));
                }
            })
                    //当Observable每发送1次数据事件就会调用1次
                    .doOnEach(new Consumer<Notification<Integer>>() {
                @Override
                public void accept(Notification<Integer> integerNotification) throws Exception {
                    Log.e("yzh","doOnEach---"+integerNotification.getValue());
                }
            })
                    //执行Next事件前调用
                    .doOnNext(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            Log.e("yzh","doOnNext--"+integer);
                        }
                    })
                    //执行Next事件后调用
                    .doAfterNext(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            Log.e("yzh","doAfterNext--"+integer);
                        }
                    })
                    //observable发送事件完毕后调用
                    .doOnComplete(new Action() {
                        @Override
                        public void run() throws Exception {
                            Log.e("yzh","doOnComplete");
                        }
                    })
                    //observable发送错误事件时调用
                    .doOnError(new Consumer<Throwable>() {
                        @Override
                        public void accept(Throwable throwable) throws Exception {
                            Log.e("yzh","doOnError--"+throwable.getMessage());
                        }
                    })
                    //观察者订阅时调用
                    .doOnSubscribe(new Consumer<Disposable>() {
                        @Override
                        public void accept(Disposable disposable) throws Exception {
                            Log.e("yzh","donOnSubscribe");
                        }
                    })
                    //observable发送事件完毕后调用 无论正常发送完毕/异常终止
                    .doAfterTerminate(new Action() {
                        @Override
                        public void run() throws Exception {
                            Log.e("yzh","doAfterTerminate");
                        }
                    })
                    //最后执行
                    .doFinally(new Action() {
                        @Override
                        public void run() throws Exception {
                            Log.e("yzh","doFinally");
                        }
                    })
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.e("yzh","onNext--"+integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
    

    打印结果

     02-23 11:27:09.946 31016-31016/com.example.issuser.rxtest E/yzh: donOnSubscribe
            02-23 11:27:09.947 31016-31016/com.example.issuser.rxtest E/yzh: doOnEach---1
            02-23 11:27:09.947 31016-31016/com.example.issuser.rxtest E/yzh: doOnNext--1
            02-23 11:27:09.947 31016-31016/com.example.issuser.rxtest E/yzh: onNext--1
            02-23 11:27:09.947 31016-31016/com.example.issuser.rxtest E/yzh: doAfterNext--1
            02-23 11:27:09.947 31016-31016/com.example.issuser.rxtest E/yzh: doOnEach---2
            02-23 11:27:09.947 31016-31016/com.example.issuser.rxtest E/yzh: doOnNext--2
            02-23 11:27:09.947 31016-31016/com.example.issuser.rxtest E/yzh: onNext--2
            02-23 11:27:09.947 31016-31016/com.example.issuser.rxtest E/yzh: doAfterNext--2
            02-23 11:27:09.947 31016-31016/com.example.issuser.rxtest E/yzh: doOnEach---3
            02-23 11:27:09.947 31016-31016/com.example.issuser.rxtest E/yzh: doOnNext--3
            02-23 11:27:09.947 31016-31016/com.example.issuser.rxtest E/yzh: onNext--3
            02-23 11:27:09.947 31016-31016/com.example.issuser.rxtest E/yzh: doAfterNext--3
            02-23 11:27:09.947 31016-31016/com.example.issuser.rxtest E/yzh: doOnEach---null
            02-23 11:27:09.947 31016-31016/com.example.issuser.rxtest E/yzh: doOnError--发生错误了
            02-23 11:27:09.947 31016-31016/com.example.issuser.rxtest E/yzh: doFinally
            02-23 11:27:09.947 31016-31016/com.example.issuser.rxtest E/yzh: doAfterTerminate
    
    onErrorReturn 遇到错误时,发送1个特殊事件 ,然后 正常终止
     Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                    e.onNext(1);
                    e.onNext(2);
                    e.onError(new Throwable("发生错误了"));
                }
            })
                    //遇到错误时,发送一个特殊事件&正常终止
                    .onErrorReturn(new Function<Throwable, Integer>() {
                        @Override
                        public Integer apply(Throwable throwable) throws Exception {
                            Log.e("yzh","onErrorReturn--"+throwable.getMessage());
    
                            return 123;
                        }
                    })
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                                Log.e("yzh","onNext--"+integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.e("yzh","onError--"+e.toString());
                        }
    
                        @Override
                        public void onComplete() {
                            Log.e("yzh","onComplete");
                        }
                    });
    

    打印结果

     02-23 11:46:07.620 2761-2761/com.example.issuser.rxtest E/yzh: onNext--1
            02-23 11:46:07.620 2761-2761/com.example.issuser.rxtest E/yzh: onNext--2
            02-23 11:46:07.620 2761-2761/com.example.issuser.rxtest E/yzh: onErrorReturn--发生错误了
            02-23 11:46:07.620 2761-2761/com.example.issuser.rxtest E/yzh: onNext--123
            02-23 11:46:07.620 2761-2761/com.example.issuser.rxtest E/yzh: onComplete
    
    onErrorResumeNext 发送一个新的observable
    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                    e.onNext(1);
                    e.onNext(2);
                    e.onError(new Exception("发生错误了"));
                }
            })
                    //遇到错误时,发送1个新的Observable 不分Throwable 和Exception
                    .onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
                        @Override
                        public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {
                            Log.e("yzh","onErrorResumeNext--"+throwable.getMessage());
                            return Observable.just(3,4);
                        }
                    })
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.e("yzh","onNext--"+integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.e("yzh","onError--"+e.getMessage());
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
    

    打印结果

    注意 被观察者中发送onError事件时,不区分Throwable和Exception
    02-23 11:59:30.439 5596-5596/? E/yzh: onNext--1
    02-23 11:59:30.439 5596-5596/? E/yzh: onNext--2
    02-23 11:59:30.439 5596-5596/? E/yzh: onErrorResumeNext--发生错误了
    02-23 11:59:30.439 5596-5596/? E/yzh: onNext--3
    02-23 11:59:30.439 5596-5596/? E/yzh: onNext--4
    
    onExceptionResumeNext与onErrorResumeNext 差不多,但是区分Throwable和Exception
    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                    e.onNext(1);
                    e.onNext(2);
                    e.onError(new Exception("发生错误了"));
                }
            })
                    .onExceptionResumeNext(new ObservableSource<Integer>() {
                        @Override
                        public void subscribe(Observer<? super Integer> observer) {
                            observer.onNext(11);
                            observer.onNext(22);
                            observer.onComplete();
                        }
                    })
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.e("yzh","onNext--"+integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.e("yzh","onError--"+e.getMessage());
                        }
    
                        @Override
                        public void onComplete() {
                            Log.e("yzh","onComplete");
                        }
                    });
    

    打印结果

    注意区分Throwable和Exception的不同
     Throwable
            02-23 14:38:58.163 30725-30725/com.example.issuser.rxtest E/yzh: onNext--1
            02-23 14:38:58.163 30725-30725/com.example.issuser.rxtest E/yzh: onNext--2
            02-23 14:38:58.163 30725-30725/com.example.issuser.rxtest E/yzh: onError--发生错误了
    
       Exception
       onNext--1
       onNext--2
       onNext--1
       onNext--2
       onComplete
    
    retry 重试,即当出现错误时,让被观察者(Observable)重新发射数据,类似作用还有repeat
    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                    e.onNext(1);
                    e.onNext(2);
                    e.onError(new Exception("发生错误了"));
                    e.onNext(3);
                }
            })
                    .retry()
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.e("yzh","onNext-"+integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.e("yzh","onError--"+e.getMessage());
                        }
    
                        @Override
                        public void onComplete() {
                            Log.e("yzh","onComplete");
                        }
                    });
    

    打印结果

     3 14:45:31.130 31188-31188/com.example.issuser.rxtest E/yzh: onNext-1
            02-23 14:45:31.130 31188-31188/com.example.issuser.rxtest E/yzh: onNext-2
            02-23 14:45:31.130 31188-31188/com.example.issuser.rxtest E/yzh: onNext-1
            02-23 14:45:31.130 31188-31188/com.example.issuser.rxtest E/yzh: onNext-2
            .....无限循环
    

    相关文章

      网友评论

        本文标题:RxJava入门(5):功能性操作符

        本文链接:https://www.haomeiwen.com/subject/hlwtxftx.html