美文网首页
过滤操作符

过滤操作符

作者: rkua | 来源:发表于2016-11-22 17:13 被阅读0次
    debounce操作符

    debounce操作符对源Observable每产生一个结果后,如果在规定的间隔时间内没有别的结果产生,则把这个结果提交给订阅者处理,否则忽略该结果。
    值得注意的是,如果源Observable产生的最后一个结果后在规定的时间间隔内调用了onCompleted,那么通过debounce操作符也会把这个结果提交给订阅者。

    Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    if(subscriber.isUnsubscribed()) return;
                    try {
                        //产生结果的间隔时间分别为100、200、300...900毫秒
                        for (int i = 1; i < 10; i++) {
                            subscriber.onNext(i);
                            Thread.sleep(i * 100);
                        }
                        subscriber.onCompleted();
                    }catch(Exception e){
                        subscriber.onError(e);
                    }
                }
            }).subscribeOn(Schedulers.newThread())
              .debounce(400, TimeUnit.MILLISECONDS)  //超时时间为400毫秒
              .subscribe(
                    new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            System.out.println("Next:" + integer);
                        }
                    }, new Action1<Throwable>() {
                        @Override
                        public void call(Throwable throwable) {
                            System.out.println("Error:" + throwable.getMessage());
                        }
                    }, new Action0() {
                        @Override
                        public void call() {
                            System.out.println("completed!");
                        }
                    });
    

    运行结果如下:
    Next:4
    Next:5
    Next:6
    Next:7
    Next:8
    Next:9
    completed!


    distinct操作符

    distinct操作符对源Observable产生的结果进行过滤,把重复的结果过滤掉,只输出不重复的结果给订阅者,非常类似于SQL里的distinct关键字。

    
    Observable.just(1, 2, 1, 1, 2, 3)
              .distinct()
              .subscribe(new Subscriber<Integer>() {
            @Override
            public void onNext(Integer item) {
                System.out.println("Next: " + item);
            }
    
            @Override
            public void onError(Throwable error) {
                System.err.println("Error: " + error.getMessage());
            }
    
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }
        });
    

    运行结果如下:
    Next: 1
    Next: 2
    Next: 3
    Sequence complete.


    elementAt操作符

    elementAt操作符在源Observable产生的结果中,仅仅把指定索引的结果提交给订阅者,索引是从0开始的。

    Observable.just(1,2,3,4,5,6).elementAt(2)
              .subscribe(
                    new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            System.out.println("Next:" + integer);
                        }
                    }, new Action1<Throwable>() {
                        @Override
                        public void call(Throwable throwable) {
                            System.out.println("Error:" + throwable.getMessage());
                        }
                    }, new Action0() {
                        @Override
                        public void call() {
                            System.out.println("completed!");
                        }
                    });
    

    运行结果如下:
    Next:3
    completed!


    filter操作符

    filter操作符是对源Observable产生的结果按照指定条件进行过滤,只有满足条件的结果才会提交给订阅者

    Observable.just(1, 2, 3, 4, 5)
              .filter(new Func1<Integer, Boolean>() {
                  @Override
                  public Boolean call(Integer item) {
                    return( item < 4 );
                  }
              }).subscribe(new Subscriber<Integer>() {
            @Override
            public void onNext(Integer item) {
                System.out.println("Next: " + item);
            }
    
            @Override
            public void onError(Throwable error) {
                System.err.println("Error: " + error.getMessage());
            }
    
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }
        });
    

    运行结果如下:
    Next: 1
    Next: 2
    Next: 3
    Sequence complete.


    ofType操作符

    ofType操作符类似于filter操作符,区别在于ofType操作符是按照类型对结果进行过滤

    
    Observable.just(1, "hello world", true, 200L, 0.23f)
              .ofType(Float.class)
              .subscribe(new Subscriber<Object>() {
                  @Override
                  public void onNext(Object item) {
                      System.out.println("Next: " + item);
                  }
    
                  @Override
                  public void onError(Throwable error) {
                      System.err.println("Error: " + error.getMessage());
                  }
    
                  @Override
                  public void onCompleted() {
                      System.out.println("Sequence complete.");
                  }
              });
    

    运行结果如下:
    Next: 0.23
    Sequence complete.


    first操作符

    first操作符是把源Observable产生的结果的第一个提交给订阅者,first操作符可以使用elementAt(0)和take(1)替代。

    
            Observable.just(1,2,3,4,5,6,7,8)
              .first()
              .subscribe(new Subscriber<Integer>() {
                  @Override
                  public void onNext(Integer item) {
                      System.out.println("Next: " + item);
                  }
    
                  @Override
                  public void onError(Throwable error) {
                      System.err.println("Error: " + error.getMessage());
                  }
    
                  @Override
                  public void onCompleted() {
                      System.out.println("Sequence complete.");
                  }
              });
    

    运行结果如下:
    Next: 1
    Sequence complete.


    single操作符

    single操作符是对源Observable的结果进行判断,如果产生的结果满足指定条件的数量不为1,则抛出异常,否则把满足条件的结果提交给订阅者

    Observable.just(1,2,3,4,5,6,7,8)
              .single(new Func1<Integer, Boolean>() {
                  @Override
                  public Boolean call(Integer integer) {
                      //取大于10的第一个数字
                      return integer>10;
                  }
              })
              .subscribe(new Subscriber<Integer>() {
                  @Override
                  public void onNext(Integer item) {
                      System.out.println("Next: " + item);
                  }
    
                  @Override
                  public void onError(Throwable error) {
                      System.err.println("Error: " + error.getMessage());
                  }
    
                  @Override
                  public void onCompleted() {
                      System.out.println("Sequence complete.");
                  }
              });
    

    运行结果如下:
    Error: Sequence contains no elements


    last操作符

    last操作符把源Observable产生的结果的最后一个提交给订阅者,last操作符可以使用takeLast(1)替代。

    Observable.just(1, 2, 3)
              .last()
              .subscribe(new Subscriber<Integer>() {
            @Override
            public void onNext(Integer item) {
                System.out.println("Next: " + item);
            }
    
            @Override
            public void onError(Throwable error) {
                System.err.println("Error: " + error.getMessage());
            }
    
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }
        });
    

    运行结果如下:
    Next: 3
    Sequence complete.


    ignoreElements操作符

    ignoreElements操作符忽略所有源Observable产生的结果,只把Observable的onCompleted和onError事件通知给订阅者。ignoreElements操作符适用于不太关心Observable产生的结果,只是在Observable结束时(onCompleted)或者出现错误时能够收到通知。

            Observable.just(1,2,3,4,5,6,7,8).ignoreElements()
              .subscribe(new Subscriber<Integer>() {
                  @Override
                  public void onNext(Integer item) {
                      System.out.println("Next: " + item);
                  }
    
                  @Override
                  public void onError(Throwable error) {
                      System.err.println("Error: " + error.getMessage());
                  }
    
                  @Override
                  public void onCompleted() {
                      System.out.println("Sequence complete.");
                  }
              });
    

    运行结果如下:
    Sequence complete.


    sample操作符

    sample操作符定期扫描源Observable产生的结果,在指定的时间间隔范围内对源Observable产生的结果进行采样

    
    Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    if(subscriber.isUnsubscribed()) return;
                    try {
                        //前8个数字产生的时间间隔为1秒,后一个间隔为3秒
                        for (int i = 1; i < 9; i++) {
                            subscriber.onNext(i);
                            Thread.sleep(1000);
                        }
                        Thread.sleep(2000);
                        subscriber.onNext(9);
                        subscriber.onCompleted();
                    } catch(Exception e){
                        subscriber.onError(e);
                    }
                }
            }).subscribeOn(Schedulers.newThread())
              .sample(2200, TimeUnit.MILLISECONDS)  //采样间隔时间为2200毫秒
              .subscribe(new Subscriber<Integer>() {
                  @Override
                  public void onNext(Integer item) {
                      System.out.println("Next: " + item);
                  }
    
                  @Override
                  public void onError(Throwable error) {
                      System.err.println("Error: " + error.getMessage());
                  }
    
                  @Override
                  public void onCompleted() {
                      System.out.println("Sequence complete.");
                  }
              });
    

    运行结果如下:
    Next: 3
    Next: 5
    Next: 7
    Next: 8
    Sequence complete.


    skip操作符

    skip操作符针对源Observable产生的结果,跳过前面n个不进行处理,而把后面的结果提交给订阅者处理

    
    Observable.just(1,2,3,4,5,6,7).skip(3)
              .subscribe(new Subscriber<Integer>() {
                  @Override
                  public void onNext(Integer item) {
                      System.out.println("Next: " + item);
                  }
    
                  @Override
                  public void onError(Throwable error) {
                      System.err.println("Error: " + error.getMessage());
                  }
    
                  @Override
                  public void onCompleted() {
                      System.out.println("Sequence complete.");
                  }
              });
    

    运行结果如下:
    Next: 4
    Next: 5
    Next: 6
    Next: 7
    Sequence complete.


    skipLast操作符

    skipLast操作符针对源Observable产生的结果,忽略Observable最后产生的n个结果,而把前面产生的结果提交给订阅者处理
    值得注意的是,skipLast操作符提交满足条件的结果给订阅者是存在延迟效果的

    
    可以看到skipLast操作符把最后的天蓝色球、蓝色球、紫色球忽略掉了,但是前面的红色球等并不是源Observable一产生就直接提交给订阅者,这里有一个延迟的效果。
    Observable.just(1,2,3,4,5,6,7).skipLast(3)
              .subscribe(new Subscriber<Integer>() {
                  @Override
                  public void onNext(Integer item) {
                      System.out.println("Next: " + item);
                  }
    
                  @Override
                  public void onError(Throwable error) {
                      System.err.println("Error: " + error.getMessage());
                  }
    
                  @Override
                  public void onCompleted() {
                      System.out.println("Sequence complete.");
                  }
              });
    

    运行结果如下:
    Next: 1
    Next: 2
    Next: 3
    Next: 4
    Sequence complete.


    take操作符

    take操作符是把源Observable产生的结果,提取前面的n个提交给订阅者,而忽略后面的结果
    调用例子如下:

    Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
              .take(4)
              .subscribe(new Subscriber<Integer>() {
            @Override
            public void onNext(Integer item) {
                System.out.println("Next: " + item);
            }
    
            @Override
            public void onError(Throwable error) {
                System.err.println("Error: " + error.getMessage());
            }
    
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }
        });
    
    

    运行结果如下:
    Next: 1
    Next: 2
    Next: 3
    Next: 4
    Sequence complete.


    takeFirst操作符

    takeFirst操作符类似于take操作符,同时也类似于first操作符,都是获取源Observable产生的结果列表中符合指定条件的前一个或多个,与first操作符不同的是,first操作符如果获取不到数据,则会抛出NoSuchElementException异常,而takeFirst则会返回一个空的Observable,该Observable只有onCompleted通知而没有onNext通知。

    
    Observable.just(1,2,3,4,5,6,7).takeFirst(new Func1<Integer, Boolean>() {
                @Override
                public Boolean call(Integer integer) {
                    //获取数值大于3的数据
                    return integer>3;
                }
            })
              .subscribe(new Subscriber<Integer>() {
                  @Override
                  public void onNext(Integer item) {
                      System.out.println("Next: " + item);
                  }
    
                  @Override
                  public void onError(Throwable error) {
                      System.err.println("Error: " + error.getMessage());
                  }
    
                  @Override
                  public void onCompleted() {
                      System.out.println("Sequence complete.");
                  }
              });
    

    运行结果如下:
    Next: 4
    Sequence complete.


    takeLast操作符

    takeLast操作符是把源Observable产生的结果的后n项提交给订阅者,提交时机是Observable发布onCompleted通知之时

    
    Observable.just(1,2,3,4,5,6,7).takeLast(2)
              .subscribe(new Subscriber<Integer>() {
                  @Override
                  public void onNext(Integer item) {
                      System.out.println("Next: " + item);
                  }
    
                  @Override
                  public void onError(Throwable error) {
                      System.err.println("Error: " + error.getMessage());
                  }
    
                  @Override
                  public void onCompleted() {
                      System.out.println("Sequence complete.");
                  }
              });
    

    运行结果如下:
    Next: 6
    Next: 7
    Sequence complete.


    相关文章

      网友评论

          本文标题:过滤操作符

          本文链接:https://www.haomeiwen.com/subject/ncxmpttx.html