过滤操作符
根据应用场景对过滤操作符进行分类,可分成以下4个类别:
1. 根据"指定条件"过滤事件
操作符 | 作用 |
---|---|
filter | 过滤 特定条件的事件 |
ofType | 过滤 特定数据类型的数据 |
skip/skipLast | 跳过某个事件 |
distinct/distinctUntilChanged | 过滤事件序列中重复的事件 / 连续重复的事件 |
-
filter()
Observable.just("hello", 1, "haha", 2).filter(new Predicate<Serializable>() { @Override public boolean test(Serializable serializable) throws Throwable { return serializable instanceof Integer; } }).subscribe(observer);
结果:
2022-05-05 14:59:17.674 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onSubscribe: 2022-05-05 14:59:17.674 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:1 2022-05-05 14:59:17.675 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:2 2022-05-05 14:59:17.675 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onComplete
-
ofType()
Observable.just(1,2,"hello").ofType(String.class).subscribe(observer);
结果:
2022-05-05 14:58:09.137 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onSubscribe: 2022-05-05 14:58:09.137 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:hello 2022-05-05 14:58:09.137 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onComplete
-
skip()
Observable.just(1,2,3,4,5).skip(2).subscribe(observer);
结果:
2022-05-05 15:00:10.151 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onSubscribe: 2022-05-05 15:00:10.151 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:3 2022-05-05 15:00:10.151 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:4 2022-05-05 15:00:10.152 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:5 2022-05-05 15:00:10.152 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onComplete
-
skipLast()
Observable.just(1,2,3,4,5).skipLast(3).subscribe(observer);
结果:
2022-05-05 15:01:03.328 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onSubscribe: 2022-05-05 15:01:03.328 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:1 2022-05-05 15:01:03.328 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:2 2022-05-05 15:01:03.328 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onComplete
-
distinct()
//过滤事件序列中重复的事件 Observable.just(1,2,2,4,4,5,6, 1).distinct().subscribe(observer);
结果:
2022-05-05 15:01:42.336 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onSubscribe: 2022-05-05 15:01:42.337 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:1 2022-05-05 15:01:42.337 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:2 2022-05-05 15:01:42.337 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:4 2022-05-05 15:01:42.337 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:5 2022-05-05 15:01:42.337 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:6 2022-05-05 15:01:42.337 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onComplete
-
distinctUntilChanged()
//过滤事件序列中连续重复的事件 Observable.just(1,2,2,4,4,5,6, 1).distinctUntilChanged().subscribe(observer);
结果:
2022-05-05 15:02:07.275 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onSubscribe: 2022-05-05 15:02:07.276 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:1 2022-05-05 15:02:07.276 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:2 2022-05-05 15:02:07.276 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:4 2022-05-05 15:02:07.276 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:5 2022-05-05 15:02:07.276 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:6 2022-05-05 15:02:07.276 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:1 2022-05-05 15:02:07.276 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onComplete
2. 根据"指定事件数量"过滤事件
操作符 | 作用 |
---|---|
take | 指定观察者最多能接收到的事件数量 |
takeLast | 指定观察者只能接收到被观察者发送的最后几个事件 |
-
take
Observable.just(1,2,3,4,5).take(2).subscribe(observer);
结果:
2022-05-05 15:06:45.219 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onSubscribe: 2022-05-05 15:06:45.219 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:1 2022-05-05 15:06:45.219 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:2 2022-05-05 15:06:45.219 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onComplete
-
takeLast
Observable.just(1,2,3,4,5).takeLast(1).subscribe(observer);
结果:
2022-05-05 15:06:58.362 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onSubscribe: 2022-05-05 15:06:58.363 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:5 2022-05-05 15:06:58.363 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onComplete
3. 根据"指定时间"过滤事件
操作符 | 作用 |
---|---|
throttleFirst | 在某段时间内,只发送该段时间内第1次事件 |
throttleLast/sample | 在某段时间内,只发送该段时间内最后1次事件 |
throttleWithTimeout / debounce | 发送数据事件时,若2次发送事件的间隔<指定时间,就会丢弃前一次的数据,直到指定时间内都没有新数据发射时才会发送后一次的数据 |
-
throttleFirst
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { // 隔段事件发送时间 e.onNext(1); Thread.sleep(500); e.onNext(2); Thread.sleep(400); e.onNext(3); Thread.sleep(300); e.onNext(4); Thread.sleep(300); e.onNext(5); Thread.sleep(300); e.onNext(6); Thread.sleep(400); e.onNext(7); Thread.sleep(300); e.onNext(8); Thread.sleep(300); e.onNext(9); Thread.sleep(300); e.onComplete(); } }).throttleFirst(1, TimeUnit.SECONDS)//每1秒中采用数据 .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "开始采用subscribe连接"); } @Override public void onNext(Integer value) { Log.d(TAG, "接收到了事件"+ value ); } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件作出响应"); } @Override public void onComplete() { Log.d(TAG, "对Complete事件作出响应"); } });
结果:
2022-05-05 15:27:20.009 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: 开始采用subscribe连接 2022-05-05 15:27:20.009 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: 接收到了事件1 2022-05-05 15:27:21.214 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: 接收到了事件4 2022-05-05 15:27:22.218 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: 接收到了事件7 2022-05-05 15:27:23.121 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: 对Complete事件作出响应
-
throttleLast/sample
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { // 隔段事件发送时间 e.onNext(1); Thread.sleep(500); e.onNext(2); Thread.sleep(400); e.onNext(3); Thread.sleep(300); e.onNext(4); Thread.sleep(300); e.onNext(5); Thread.sleep(300); e.onNext(6); Thread.sleep(400); e.onNext(7); Thread.sleep(300); e.onNext(8); Thread.sleep(300); e.onNext(9); Thread.sleep(300); e.onComplete(); } }).throttleLast(1, TimeUnit.SECONDS)//每1秒中采用数据 .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "开始采用subscribe连接"); } @Override public void onNext(Integer value) { Log.d(TAG, "接收到了事件"+ value ); } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件作出响应"); } @Override public void onComplete() { Log.d(TAG, "对Complete事件作出响应"); } });
结果:
2022-05-05 15:27:23.124 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: 开始采用subscribe连接 2022-05-05 15:27:24.129 3744-3924/com.alsan.rxjavademo D/RxFilterSymbolActivity: 接收到了事件3 2022-05-05 15:27:25.129 3744-3924/com.alsan.rxjavademo D/RxFilterSymbolActivity: 接收到了事件6 2022-05-05 15:27:26.129 3744-3924/com.alsan.rxjavademo D/RxFilterSymbolActivity: 接收到了事件9 2022-05-05 15:27:26.234 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: 对Complete事件作出响应
-
throttleWithTimeout/debounce
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { // 隔段事件发送时间 e.onNext(1); Thread.sleep(500); e.onNext(2); // 1和2之间的间隔小于指定时间1s,所以前1次数据(1)会被抛弃,2会被保留 Thread.sleep(1500); // 因为2和3之间的间隔大于指定时间1s,所以之前被保留的2事件将发出 e.onNext(3); Thread.sleep(1500); // 因为3和4之间的间隔大于指定时间1s,所以3事件将发出 e.onNext(4); Thread.sleep(500); // 因为4和5之间的间隔小于指定时间1s,所以前1次数据(4)会被抛弃,5会被保留 e.onNext(5); Thread.sleep(500); // 因为5和6之间的间隔小于指定时间1s,所以前1次数据(5)会被抛弃,6会被保留 e.onNext(6); Thread.sleep(1500); // 因为6和Complete实践之间的间隔大于指定时间1s,所以之前被保留的6事件将发出 e.onComplete(); } }).throttleWithTimeout(1, TimeUnit.SECONDS)//每1秒中采用数据 .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer value) { Log.d(TAG, "接收到了事件"+ value ); } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件作出响应"); } @Override public void onComplete() { Log.d(TAG, "对Complete事件作出响应"); } });
结果:
2022-05-05 15:21:14.416 3428-3490/com.alsan.rxjavademo D/RxFilterSymbolActivity: 接收到了事件2 2022-05-05 15:21:15.916 3428-3490/com.alsan.rxjavademo D/RxFilterSymbolActivity: 接收到了事件3 2022-05-05 15:21:18.420 3428-3490/com.alsan.rxjavademo D/RxFilterSymbolActivity: 接收到了事件6 2022-05-05 15:21:18.920 3428-3428/com.alsan.rxjavademo D/RxFilterSymbolActivity: 对Complete事件作出响应
4. 根据"指定事件位置"过滤事件
操作符 | 作用 |
---|---|
firstElement | 仅选取第1个元素 |
lastElement | 仅选取最后一个元素 |
elementAt | 指定接收某个元素(通过 索引值 确定) |
elementAtOrError | 在elementAt()的基础上,当出现越界情况(即获取的位置索引 > 发送事件序列长度)时,即抛出异常 |
-
firstElement
Observable.just("hello", 1, "haha", 2).firstElement().subscribe((Consumer<Serializable>) serializable -> { Log.d(TAG, serializable.toString()); });
结果:
2022-05-05 15:38:09.536 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: hello
-
lastElement
Observable.just("hello", 1, "haha", 2).lastElement().subscribe((Consumer<Serializable>) serializable -> { Log.d(TAG, serializable.toString()); });
结果:
2022-05-05 15:38:22.242 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: 2
-
elementAt
Observable.just(1,2,2,4,4,5,6, 1).elementAt(1).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Throwable { Log.d(TAG, "accept:" + integer); } });
结果:
2022-05-05 15:39:14.759 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: accept:2
-
elementAtOrError
Observable.just(1,2,3,4,5).elementAtOrError(9).subscribe(new SingleObserver<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { Log.d(TAG, "onSubscribe..."); } @Override public void onSuccess(@NonNull Integer integer) { Log.d(TAG, "onSuccess..." + integer); } @Override public void onError(@NonNull Throwable e) { Log.d(TAG, "onError:" + Log.getStackTraceString(e)); } });
结果:
022-05-05 15:41:16.713 4285-4285/com.alsan.rxjavademo D/RxFilterSymbolActivity: onSubscribe... 2022-05-05 15:41:16.715 4285-4285/com.alsan.rxjavademo D/RxFilterSymbolActivity: onError:java.util.NoSuchElementException at io.reactivex.rxjava3.internal.operators.observable.ObservableElementAtSingle$ElementAtObserver.onComplete(ObservableElementAtSingle.java:115) at io.reactivex.rxjava3.internal.operators.observable.ObservableFromArray$FromArrayDisposable.run(ObservableFromArray.java:112) at io.reactivex.rxjava3.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java:38) at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13095) at io.reactivex.rxjava3.internal.operators.observable.ObservableElementAtSingle.subscribeActual(ObservableElementAtSingle.java:37) at io.reactivex.rxjava3.core.Single.subscribe(Single.java:4813) at com.alsan.rxjavademo.RxFilterSymbolActivity.lambda$onCreate$17$RxFilterSymbolActivity(RxFilterSymbolActivity.java:311) at com.alsan.rxjavademo.-$$Lambda$RxFilterSymbolActivity$bbHK8cGk2DGgxkmEzj5endgLd8A.onClick(Unknown Source:2) at android.view.View.performClick(View.java:6291) at com.google.android.material.button.MaterialButton.performClick(MaterialButton.java:967) at android.view.View$PerformClick.run(View.java:24931) at android.os.Handler.handleCallback(Handler.java:808) at android.os.Handler.dispatchMessage(Handler.java:101) at android.os.Looper.loop(Looper.java:166) at android.app.ActivityThread.main(ActivityThread.java:7529) at java.lang.reflect.Method.invoke(Native Method) at com.android.internal.os.Zygote$MethodAndArgsCaller.run(Zygote.java:245) at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:921)
网友评论