美文网首页RxJava相关
Rxjava3使用教程:操作符-过滤

Rxjava3使用教程:操作符-过滤

作者: Alsan_L3 | 来源:发表于2022-05-05 15:44 被阅读0次

    过滤操作符

    根据应用场景对过滤操作符进行分类,可分成以下4个类别:

    1. 根据"指定条件"过滤事件
    操作符 作用
    filter 过滤 特定条件的事件
    ofType 过滤 特定数据类型的数据
    skip/skipLast 跳过某个事件
    distinct/distinctUntilChanged 过滤事件序列中重复的事件 / 连续重复的事件
    • filter()

      Observable.just("hello", 1, "haha", 2).filter(new Predicate<Serializable>() {
          @Override
          public boolean test(Serializable serializable) throws Throwable {
              return serializable instanceof Integer;
          }
      }).subscribe(observer);
      

      结果:

      2022-05-05 14:59:17.674 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onSubscribe:
      2022-05-05 14:59:17.674 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:1
      2022-05-05 14:59:17.675 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:2
      2022-05-05 14:59:17.675 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onComplete
      
    • ofType()

      Observable.just(1,2,"hello").ofType(String.class).subscribe(observer);
      

      结果:

      2022-05-05 14:58:09.137 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onSubscribe:
      2022-05-05 14:58:09.137 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:hello
      2022-05-05 14:58:09.137 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onComplete
      
    • skip()

      Observable.just(1,2,3,4,5).skip(2).subscribe(observer);
      

      结果:

      2022-05-05 15:00:10.151 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onSubscribe:
      2022-05-05 15:00:10.151 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:3
      2022-05-05 15:00:10.151 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:4
      2022-05-05 15:00:10.152 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:5
      2022-05-05 15:00:10.152 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onComplete
      
    • skipLast()

      Observable.just(1,2,3,4,5).skipLast(3).subscribe(observer);
      

      结果:

      2022-05-05 15:01:03.328 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onSubscribe:
      2022-05-05 15:01:03.328 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:1
      2022-05-05 15:01:03.328 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:2
      2022-05-05 15:01:03.328 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onComplete
      
    • distinct()

      //过滤事件序列中重复的事件
      Observable.just(1,2,2,4,4,5,6, 1).distinct().subscribe(observer);
      

      结果:

      2022-05-05 15:01:42.336 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onSubscribe:
      2022-05-05 15:01:42.337 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:1
      2022-05-05 15:01:42.337 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:2
      2022-05-05 15:01:42.337 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:4
      2022-05-05 15:01:42.337 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:5
      2022-05-05 15:01:42.337 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:6
      2022-05-05 15:01:42.337 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onComplete
      
    • distinctUntilChanged()

      //过滤事件序列中连续重复的事件
      Observable.just(1,2,2,4,4,5,6, 1).distinctUntilChanged().subscribe(observer);
      

      结果:

      2022-05-05 15:02:07.275 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onSubscribe:
      2022-05-05 15:02:07.276 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:1
      2022-05-05 15:02:07.276 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:2
      2022-05-05 15:02:07.276 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:4
      2022-05-05 15:02:07.276 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:5
      2022-05-05 15:02:07.276 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:6
      2022-05-05 15:02:07.276 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:1
      2022-05-05 15:02:07.276 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onComplete
      
    2. 根据"指定事件数量"过滤事件
    操作符 作用
    take 指定观察者最多能接收到的事件数量
    takeLast 指定观察者只能接收到被观察者发送的最后几个事件
    • take

      Observable.just(1,2,3,4,5).take(2).subscribe(observer);
      

      结果:

      2022-05-05 15:06:45.219 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onSubscribe:
      2022-05-05 15:06:45.219 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:1
      2022-05-05 15:06:45.219 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:2
      2022-05-05 15:06:45.219 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onComplete
      
    • takeLast

      Observable.just(1,2,3,4,5).takeLast(1).subscribe(observer);
      

      结果:

      2022-05-05 15:06:58.362 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onSubscribe:
      2022-05-05 15:06:58.363 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:5
      2022-05-05 15:06:58.363 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onComplete
      
    3. 根据"指定时间"过滤事件
    操作符 作用
    throttleFirst 在某段时间内,只发送该段时间内第1次事件
    throttleLast/sample 在某段时间内,只发送该段时间内最后1次事件
    throttleWithTimeout / debounce 发送数据事件时,若2次发送事件的间隔<指定时间,就会丢弃前一次的数据,直到指定时间内都没有新数据发射时才会发送后一次的数据
    • throttleFirst

      Observable.create(new ObservableOnSubscribe<Integer>() {
          @Override
          public void subscribe(ObservableEmitter<Integer> e) throws Exception {
              // 隔段事件发送时间
              e.onNext(1);
              Thread.sleep(500);
      
              e.onNext(2);
              Thread.sleep(400);
      
              e.onNext(3);
              Thread.sleep(300);
      
              e.onNext(4);
              Thread.sleep(300);
      
              e.onNext(5);
              Thread.sleep(300);
      
              e.onNext(6);
              Thread.sleep(400);
      
              e.onNext(7);
              Thread.sleep(300);
              e.onNext(8);
      
              Thread.sleep(300);
              e.onNext(9);
      
              Thread.sleep(300);
              e.onComplete();
          }
      }).throttleFirst(1, TimeUnit.SECONDS)//每1秒中采用数据
              .subscribe(new Observer<Integer>() {
                  @Override
                  public void onSubscribe(Disposable d) {
                      Log.d(TAG, "开始采用subscribe连接");
                  }
      
                  @Override
                  public void onNext(Integer value) {
                      Log.d(TAG, "接收到了事件"+ value  );
                  }
      
                  @Override
                  public void onError(Throwable e) {
                      Log.d(TAG, "对Error事件作出响应");
                  }
      
                  @Override
                  public void onComplete() {
                      Log.d(TAG, "对Complete事件作出响应");
                  }
              });
      

      结果:

      2022-05-05 15:27:20.009 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: 开始采用subscribe连接
      2022-05-05 15:27:20.009 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: 接收到了事件1
      2022-05-05 15:27:21.214 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: 接收到了事件4
      2022-05-05 15:27:22.218 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: 接收到了事件7
      2022-05-05 15:27:23.121 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: 对Complete事件作出响应
      
    • throttleLast/sample

      Observable.create(new ObservableOnSubscribe<Integer>() {
          @Override
          public void subscribe(ObservableEmitter<Integer> e) throws Exception {
              // 隔段事件发送时间
              e.onNext(1);
              Thread.sleep(500);
      
              e.onNext(2);
              Thread.sleep(400);
      
              e.onNext(3);
              Thread.sleep(300);
      
              e.onNext(4);
              Thread.sleep(300);
      
              e.onNext(5);
              Thread.sleep(300);
      
              e.onNext(6);
              Thread.sleep(400);
      
              e.onNext(7);
              Thread.sleep(300);
              e.onNext(8);
      
              Thread.sleep(300);
              e.onNext(9);
      
              Thread.sleep(300);
              e.onComplete();
          }
      }).throttleLast(1, TimeUnit.SECONDS)//每1秒中采用数据
              .subscribe(new Observer<Integer>() {
                  @Override
                  public void onSubscribe(Disposable d) {
                      Log.d(TAG, "开始采用subscribe连接");
                  }
      
                  @Override
                  public void onNext(Integer value) {
                      Log.d(TAG, "接收到了事件"+ value  );
                  }
      
                  @Override
                  public void onError(Throwable e) {
                      Log.d(TAG, "对Error事件作出响应");
                  }
      
                  @Override
                  public void onComplete() {
                      Log.d(TAG, "对Complete事件作出响应");
                  }
              });
      

      结果:

      2022-05-05 15:27:23.124 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: 开始采用subscribe连接
      2022-05-05 15:27:24.129 3744-3924/com.alsan.rxjavademo D/RxFilterSymbolActivity: 接收到了事件3
      2022-05-05 15:27:25.129 3744-3924/com.alsan.rxjavademo D/RxFilterSymbolActivity: 接收到了事件6
      2022-05-05 15:27:26.129 3744-3924/com.alsan.rxjavademo D/RxFilterSymbolActivity: 接收到了事件9
      2022-05-05 15:27:26.234 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: 对Complete事件作出响应
      
    • throttleWithTimeout/debounce

      Observable.create(new ObservableOnSubscribe<Integer>() {
                  @Override
                  public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                      // 隔段事件发送时间
                      e.onNext(1);
                      Thread.sleep(500);
                      e.onNext(2); // 1和2之间的间隔小于指定时间1s,所以前1次数据(1)会被抛弃,2会被保留
                      Thread.sleep(1500);  // 因为2和3之间的间隔大于指定时间1s,所以之前被保留的2事件将发出
                      e.onNext(3);
                      Thread.sleep(1500);  // 因为3和4之间的间隔大于指定时间1s,所以3事件将发出
                      e.onNext(4);
                      Thread.sleep(500); // 因为4和5之间的间隔小于指定时间1s,所以前1次数据(4)会被抛弃,5会被保留
                      e.onNext(5);
                      Thread.sleep(500); // 因为5和6之间的间隔小于指定时间1s,所以前1次数据(5)会被抛弃,6会被保留
                      e.onNext(6);
                      Thread.sleep(1500); // 因为6和Complete实践之间的间隔大于指定时间1s,所以之前被保留的6事件将发出
      
                      e.onComplete();
                  }
              }).throttleWithTimeout(1, TimeUnit.SECONDS)//每1秒中采用数据
                      .subscribe(new Observer<Integer>() {
                          @Override
                          public void onSubscribe(Disposable d) {
      
                          }
      
                          @Override
                          public void onNext(Integer value) {
                              Log.d(TAG, "接收到了事件"+ value  );
                          }
      
                          @Override
                          public void onError(Throwable e) {
                              Log.d(TAG, "对Error事件作出响应");
                          }
      
                          @Override
                          public void onComplete() {
                              Log.d(TAG, "对Complete事件作出响应");
                          }
                      });
      

      结果:

      2022-05-05 15:21:14.416 3428-3490/com.alsan.rxjavademo D/RxFilterSymbolActivity: 接收到了事件2
      2022-05-05 15:21:15.916 3428-3490/com.alsan.rxjavademo D/RxFilterSymbolActivity: 接收到了事件3
      2022-05-05 15:21:18.420 3428-3490/com.alsan.rxjavademo D/RxFilterSymbolActivity: 接收到了事件6
      2022-05-05 15:21:18.920 3428-3428/com.alsan.rxjavademo D/RxFilterSymbolActivity: 对Complete事件作出响应
      
    4. 根据"指定事件位置"过滤事件
    操作符 作用
    firstElement 仅选取第1个元素
    lastElement 仅选取最后一个元素
    elementAt 指定接收某个元素(通过 索引值 确定)
    elementAtOrError 在elementAt()的基础上,当出现越界情况(即获取的位置索引 > 发送事件序列长度)时,即抛出异常
    • firstElement

      Observable.just("hello", 1, "haha", 2).firstElement().subscribe((Consumer<Serializable>) serializable -> {
          Log.d(TAG, serializable.toString());
      });
      

      结果:

      2022-05-05 15:38:09.536 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: hello
      
    • lastElement

      Observable.just("hello", 1, "haha", 2).lastElement().subscribe((Consumer<Serializable>) serializable -> {
        Log.d(TAG, serializable.toString());
      });
      

      结果:

      2022-05-05 15:38:22.242 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: 2
      
    • elementAt

      Observable.just(1,2,2,4,4,5,6, 1).elementAt(1).subscribe(new Consumer<Integer>() {
          @Override
          public void accept(Integer integer) throws Throwable {
              Log.d(TAG, "accept:" + integer);
          }
      });
      

      结果:

      2022-05-05 15:39:14.759 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: accept:2
      
    • elementAtOrError

      Observable.just(1,2,3,4,5).elementAtOrError(9).subscribe(new SingleObserver<Integer>() {
          @Override
          public void onSubscribe(@NonNull Disposable d) {
              Log.d(TAG, "onSubscribe...");
          }
      
          @Override
          public void onSuccess(@NonNull Integer integer) {
              Log.d(TAG, "onSuccess..." + integer);
          }
      
          @Override
          public void onError(@NonNull Throwable e) {
              Log.d(TAG, "onError:" + Log.getStackTraceString(e));
          }
      });
      

      结果:

      022-05-05 15:41:16.713 4285-4285/com.alsan.rxjavademo D/RxFilterSymbolActivity: onSubscribe...
      2022-05-05 15:41:16.715 4285-4285/com.alsan.rxjavademo D/RxFilterSymbolActivity: onError:java.util.NoSuchElementException
              at io.reactivex.rxjava3.internal.operators.observable.ObservableElementAtSingle$ElementAtObserver.onComplete(ObservableElementAtSingle.java:115)
              at io.reactivex.rxjava3.internal.operators.observable.ObservableFromArray$FromArrayDisposable.run(ObservableFromArray.java:112)
              at io.reactivex.rxjava3.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java:38)
              at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13095)
              at io.reactivex.rxjava3.internal.operators.observable.ObservableElementAtSingle.subscribeActual(ObservableElementAtSingle.java:37)
              at io.reactivex.rxjava3.core.Single.subscribe(Single.java:4813)
              at com.alsan.rxjavademo.RxFilterSymbolActivity.lambda$onCreate$17$RxFilterSymbolActivity(RxFilterSymbolActivity.java:311)
              at com.alsan.rxjavademo.-$$Lambda$RxFilterSymbolActivity$bbHK8cGk2DGgxkmEzj5endgLd8A.onClick(Unknown Source:2)
              at android.view.View.performClick(View.java:6291)
              at com.google.android.material.button.MaterialButton.performClick(MaterialButton.java:967)
              at android.view.View$PerformClick.run(View.java:24931)
              at android.os.Handler.handleCallback(Handler.java:808)
              at android.os.Handler.dispatchMessage(Handler.java:101)
              at android.os.Looper.loop(Looper.java:166)
              at android.app.ActivityThread.main(ActivityThread.java:7529)
              at java.lang.reflect.Method.invoke(Native Method)
              at com.android.internal.os.Zygote$MethodAndArgsCaller.run(Zygote.java:245)
              at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:921)
      

    相关文章

      网友评论

        本文标题:Rxjava3使用教程:操作符-过滤

        本文链接:https://www.haomeiwen.com/subject/fpysyrtx.html