美文网首页工作生活
RxJava学习总结-过滤类操作符

RxJava学习总结-过滤类操作符

作者: 取了个很好听的名字 | 来源:发表于2019-07-02 11:32 被阅读0次

    前言

    过滤类操作符就是过滤掉特殊的上游发射事件。

    distinct

    distinct:去除重复事件
    代码如下:

     Observable.just(1,1,2,2,3,3,4,4)
                            .distinct()
                            .subscribe(new Consumer<Integer>() {
                                @Override
                                public void accept(Integer integer) throws Exception {
                                    Log.e("accept",integer+"");
                                }
                            });
    

    测试结果如下:

    7-02 10:36:09.637 29251-29251/com.zhqy.myrxjava E/accept: 1
    07-02 10:36:09.637 29251-29251/com.zhqy.myrxjava E/accept: 2
    07-02 10:36:09.637 29251-29251/com.zhqy.myrxjava E/accept: 3
    07-02 10:36:09.637 29251-29251/com.zhqy.myrxjava E/accept: 4
    

    filter

    过滤掉不符合条件的

     Observable.just(1,2,3,4,5)
                            .filter(new Predicate<Integer>() {
                                @Override
                                public boolean test(Integer integer) throws Exception {
                                    return integer>2;
                                }
                            }).subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                                Log.e("accept",integer+"");
                        }
                    });
    

    测试结果如下:

    07-02 10:41:30.310 29802-29802/com.zhqy.myrxjava E/accept: 3
    07-02 10:41:30.310 29802-29802/com.zhqy.myrxjava E/accept: 4
    07-02 10:41:30.310 29802-29802/com.zhqy.myrxjava E/accept: 5
    

    elementAt

    只发射特定下标的事件
    代码如下:

      //找下标为1的事件,没有则发送-1
                    Observable.just(1,2,3,4,5)
                            .elementAt(100,-1)
                            .subscribe(new Consumer<Integer>() {
                                @Override
                                public void accept(Integer integer) throws Exception {
                                    Log.e("accept",integer+"");
                                }
                            });
    

    测试结果

    07-02 10:58:30.704 31487-31487/com.zhqy.myrxjava E/accept: -1
    

    skip

    忽视上游发射的前几项事件
    代码如下:

    Observable.just(1,2,3,4,5)
                              .skip(3)
                              .subscribe(new Consumer<Integer>() {
                                  @Override
                                  public void accept(Integer integer) throws Exception {
                                      Log.e("accept",integer+"");
                                  }
                              });
    

    测试结果如下:

    07-02 11:05:57.808 4543-4543/com.zhqy.myrxjava E/accept: 4
    07-02 11:05:57.809 4543-4543/com.zhqy.myrxjava E/accept: 5
    

    丢弃Observable开始的那段时间发射 的数据
    代码如下:

       Observable.interval(500,TimeUnit.MILLISECONDS)
                              .skip(3,TimeUnit.SECONDS)
                              .subscribe(new Consumer<Long>() {
                                  @Override
                                  public void accept(Long aLong) throws Exception {
                                      Log.e("accept",aLong+"");
                                  }
                              });
    

    测试结果

    07-02 11:24:26.240 11070-11115/com.zhqy.myrxjava E/accept: 5
    07-02 11:24:26.740 11070-11115/com.zhqy.myrxjava E/accept: 6
    07-02 11:24:27.240 11070-11115/com.zhqy.myrxjava E/accept: 7
    07-02 11:24:27.740 11070-11115/com.zhqy.myrxjava E/accept: 8
    07-02 11:24:28.240 11070-11115/com.zhqy.myrxjava E/accept: 9
    07-02 11:24:28.740 11070-11115/com.zhqy.myrxjava E/accept: 10
    ....
    

    take

    只接收上游发送的n个事件
    代码如下:

      Observable.range(0,100)
                            .take(10)
                            .subscribe(new Consumer<Integer>() {
                                @Override
                                public void accept(Integer integer) throws Exception {
                                    Log.e("accept",integer+"");
                                }
                            });
    

    测试结果如下:

    07-02 11:29:27.824 11880-11880/com.zhqy.myrxjava E/accept: 0
    07-02 11:29:27.824 11880-11880/com.zhqy.myrxjava E/accept: 1
    07-02 11:29:27.824 11880-11880/com.zhqy.myrxjava E/accept: 2
    07-02 11:29:27.824 11880-11880/com.zhqy.myrxjava E/accept: 3
    07-02 11:29:27.824 11880-11880/com.zhqy.myrxjava E/accept: 4
    07-02 11:29:27.824 11880-11880/com.zhqy.myrxjava E/accept: 5
    07-02 11:29:27.824 11880-11880/com.zhqy.myrxjava E/accept: 6
    07-02 11:29:27.824 11880-11880/com.zhqy.myrxjava E/accept: 7
    07-02 11:29:27.824 11880-11880/com.zhqy.myrxjava E/accept: 8
    07-02 11:29:27.824 11880-11880/com.zhqy.myrxjava E/accept: 9
    
    

    takeLast

    只接收最后n个上游发送的事件
    代码如下:

       Observable.range(0,100)
                            .takeLast(10)
                            .subscribe(new Consumer<Integer>() {
                                @Override
                                public void accept(Integer integer) throws Exception {
                                    Log.e("accept",integer+"");
                                }
                            });
    

    测试结果:

    07-02 11:31:57.949 12377-12377/com.zhqy.myrxjava E/accept: 90
    07-02 11:31:57.949 12377-12377/com.zhqy.myrxjava E/accept: 91
    07-02 11:31:57.949 12377-12377/com.zhqy.myrxjava E/accept: 92
    07-02 11:31:57.949 12377-12377/com.zhqy.myrxjava E/accept: 93
    07-02 11:31:57.949 12377-12377/com.zhqy.myrxjava E/accept: 94
    07-02 11:31:57.949 12377-12377/com.zhqy.myrxjava E/accept: 95
    07-02 11:31:57.949 12377-12377/com.zhqy.myrxjava E/accept: 96
    07-02 11:31:57.949 12377-12377/com.zhqy.myrxjava E/accept: 97
    07-02 11:31:57.949 12377-12377/com.zhqy.myrxjava E/accept: 98
    07-02 11:31:57.949 12377-12377/com.zhqy.myrxjava E/accept: 99
    

    ofType

    对发送的事件筛选出指定的数据类型发送
    代码如下:

      Observable.just(1,2,3,"4","5")
                              .ofType(String.class)
                              .subscribe(new Consumer<String>() {
                                  @Override
                                  public void accept(String string) throws Exception {
                                        Log.e("accept",string);
                                  }
                              });
    

    测试结果如下

    07-02 14:07:03.169 19405-19405/com.zhqy.myrxjava E/accept: 4
    07-02 14:07:03.170 19405-19405/com.zhqy.myrxjava E/accept: 5
    

    debounce

    根据发送的事件时间间隔做出筛选,如果两次发送事件的间隔小于设定的timeout,则会取消前一个事件的发送
    代码如下:

     Observable.create(new ObservableOnSubscribe<Integer>() {
                        @Override
                        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                            emitter.onNext(1);
                            emitter.onNext(2);
                            SystemClock.sleep(1000);
                            emitter.onNext(3);
                            emitter.onNext(4);
                            SystemClock.sleep(1000);
                            emitter.onNext(5);
                        }
                    }).debounce(500,TimeUnit.MILLISECONDS)
                      .subscribe(new Consumer<Integer>() {
                          @Override
                          public void accept(Integer integer) throws Exception {
                              Log.e("accept",integer+"");
                          }
                      });
    

    测试结果:

    07-02 14:15:40.029 19952-20086/com.zhqy.myrxjava E/accept: 2
    07-02 14:15:41.029 19952-20086/com.zhqy.myrxjava E/accept: 4
    07-02 14:15:42.029 19952-20086/com.zhqy.myrxjava E/accept: 5
    

    firstElement& lastElement

    获取上游发送的第一个和最后一个事件
    代码如下:

      Observable.just(1,2,3,4,5)
                              .firstElement()
                              .subscribe(new Consumer<Integer>() {
                                  @Override
                                  public void accept(Integer integer) throws Exception {
                                      Log.e("accept",integer+"");
                                  }
                              });
    
    Observable.just(1,2,3,4,5)
                              .lastElement()
                              .subscribe(new Consumer<Integer>() {
                                  @Override
                                  public void accept(Integer integer) throws Exception {
                                      Log.e("accept",integer+"");
                                  }
                              });
    

    测试结果

    07-02 14:22:25.757 20430-20430/com.zhqy.myrxjava E/accept: 1
    
    07-02 14:23:55.439 20621-20621/com.zhqy.myrxjava E/accept: 5
    

    相关文章

      网友评论

        本文标题:RxJava学习总结-过滤类操作符

        本文链接:https://www.haomeiwen.com/subject/daaacctx.html