美文网首页
6. Rxjava2 : 筛选操作符

6. Rxjava2 : 筛选操作符

作者: 0青衣小褂0 | 来源:发表于2019-02-12 17:31 被阅读74次

    1. RxJava2 : 什么是观察者模式
    2. RxJava2 : 创建操作符(无关时间)
    3. Rxjava2 : 创建操作符(有关时间)
    4. Rxjava2 : 变换操作符
    5. Rxjava2 : 判断操作符
    6. Rxjava2 : 筛选操作符
    7. Rxjava2 : 合并操作符
    8. Rxjava2 : do操作符
    9. Rxjava2 : error处理
    10. Rxjava2 : 重试
    11. Rxjava2 : 线程切换

    api use
    filter {{filter}}
    ofType {{ofType}}
    elementAt {{elementAt}}
    firstElement & lastElement {{firstElement}}
    distinct {{distinct}}
    distinctUntilChanged {{distinctUntilChanged}}
    take / takeLast {{take}}
    skip / skipLast {{skip}}
    throttleFirst / throttleLast {{throttleFirst}}
    throttleWithTimeout / debounce {{throttleWithTimeout}}

    filter

    • 筛选
    Observable.just(1, 2, 3, 4, 5, 6).filter(new Predicate<Integer>() {
                @Override
                public boolean test(Integer integer) throws Exception {
                    return integer != 4;
                }
            })
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.d(TAG, "integer:" + integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "onError");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete");
                        }
                    });
    

    log

    02-11 17:46:46.206 17513-17513/... D/SplashActivity: integer:1
    02-11 17:46:46.206 17513-17513/... D/SplashActivity: integer:2
    02-11 17:46:46.206 17513-17513/... D/SplashActivity: integer:3
    02-11 17:46:46.206 17513-17513/... D/SplashActivity: integer:5
    02-11 17:46:46.206 17513-17513/... D/SplashActivity: integer:6
    02-11 17:46:46.206 17513-17513/... D/SplashActivity: onComplete
    

    ofType

    • 筛选出指定类型
    Observable.just(1, "one", 2, "tow", 3, "three")
                    .ofType(Integer.class)
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.d(TAG, "integer:" + integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "onError");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete");
                        }
                    });
    

    log

    02-11 17:54:15.436 19570-19570/... D/SplashActivity: integer:1
    02-11 17:54:15.436 19570-19570/... D/SplashActivity: integer:2
    02-11 17:54:15.436 19570-19570/... D/SplashActivity: integer:3
    02-11 17:54:15.436 19570-19570/... D/SplashActivity: onComplete
    

    elementAt

    • 选择元素
      1.获取指定位置的元素
      2.准许越界,如果越界,支持指定默认值
    //这是越界的情况
     Disposable subscribe = Observable.just(1, 2, 3, 4)
                    .elementAt(5,10)
                    .subscribe(integer ->
                            Log.d(TAG, "integer:" + integer));
    

    log

    02-12 09:37:34.093 13153-13153/... D/SplashActivity: integer:10
    

    firstElement / lastElement

    • firstElement
      仅取第一个
    • lastElement
      仅取最后一个
    Disposable subscribe = Observable.just(1, 2, 3, 4)
    .firstElement()
    .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, "integer:" + integer);
                }
            });
    

    log

    02-12 09:28:26.753 10462-10462/... D/SplashActivity: integer:1
    

    distinct

    • 过滤重复的,只要出现过得就不会再出现
    Observable.just(1, 2,3,1,3)
                    .distinct()
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.d(TAG, "integer:" + integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "onError");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete");
                        }
                    });
    

    log

    02-11 18:00:08.836 20056-20056/... D/SplashActivity: integer:1
    02-11 18:00:08.836 20056-20056/... D/SplashActivity: integer:2
    02-11 18:00:08.836 20056-20056/... D/SplashActivity: integer:3
    02-11 18:00:08.836 20056-20056/... D/SplashActivity: onComplete
    

    distinctUntilChanged

    • 过滤重复的,但过滤掉的必须是连续重复的
    Observable.just(1, 2, 3, 1, 3, 3, 4, 4)
                    .distinctUntilChanged()
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.d(TAG, "integer:" + integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "onError");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete");
                        }
                    });
    

    log

    02-11 18:02:59.656 20334-20334/... D/SplashActivity: integer:1
    02-11 18:02:59.656 20334-20334/... D/SplashActivity: integer:2
    02-11 18:02:59.656 20334-20334/... D/SplashActivity: integer:3
    02-11 18:02:59.656 20334-20334/... D/SplashActivity: integer:1
    02-11 18:02:59.656 20334-20334/... D/SplashActivity: integer:3
    02-11 18:02:59.656 20334-20334/... D/SplashActivity: integer:4
    02-11 18:02:59.656 20334-20334/... D/SplashActivity: onComplete
    

    take / takeLast

    • take
      取指定数目的,如果小于指定数目,则会进入 error(从头取)
    • takeLast
      取指定数目的,如果小于指定数目,则会进入 error(从尾取)
    Observable.just(1, 2, 3, 4, 5, 6)
                    .take(2)
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.d(TAG, "integer:" + integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "onError");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete");
                        }
                    });
    

    log

    02-11 18:35:23.196 21766-21766/... D/SplashActivity: integer:1
    02-11 18:35:23.196 21766-21766/... D/SplashActivity: integer:2
    02-11 18:35:23.196 21766-21766/... D/SplashActivity: onComplete
    

    skip

    • skip
      正序跳过指定 count,如果不足,则会进入 onError
    • skipLast
      倒序跳过指定 count,如果不足,则会进入 onError
    Observable.just(1, 2, 3, 4).skip(2).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "integer:" + integer);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "onError");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
    

    log

    02-12 09:19:47.243 9691-9691/... D/SplashActivity: integer:3
    02-12 09:19:47.243 9691-9691/... D/SplashActivity: integer:4
    02-12 09:19:47.243 9691-9691/... D/SplashActivity: onComplete
    

    throttleFirst / throttleLast

    • throttleFirst
      一段时间间隔的第一个(接到第一个之后开始计时,当超过指定时间之后去接下一个)
    • throttleLast
      一段时间间隔的最后一个
    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                    e.onNext(1);
    
                    Thread.sleep(500);
                    e.onNext(2);
    
                    Thread.sleep(1600);
                    e.onNext(3);
    
                    Thread.sleep(2100);
                    e.onNext(4);
                    e.onComplete();
                }
            }).throttleFirst(2, TimeUnit.SECONDS)
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.d(TAG, "integer:" + integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "onError");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "complete");
                        }
                    });
    

    log

    02-12 10:01:16.243 14629-14629/... D/SplashActivity: integer:1
    02-12 10:01:18.343 14629-14629/... D/SplashActivity: integer:3
    02-12 10:01:20.443 14629-14629/... D/SplashActivity: integer:4
    02-12 10:01:20.443 14629-14629/... D/SplashActivity: complete
    

    throttleWithTimeout / debounce

    • throttleWithTimeout
      每次都是两个数据一同判断,如果第二次的接收到的数据和第一次接收到的数据时间间隔小于指定间隔,则会丢弃第一次数据,之后,第二次接收的数据成为第一次数据,而第三次数据变为第二次数据,以此类推,决定是否接收
    • debounce
    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                    e.onNext(1);
    
                    Thread.sleep(500);
                    e.onNext(2);
    
                    Thread.sleep(500);
                    e.onNext(3);
    
                    Thread.sleep(500);
                    e.onNext(4);
    
                    Thread.sleep(500);
                    e.onNext(5);
    
                    Thread.sleep(500);
                    e.onNext(6);
                    e.onComplete();
                }
            }).throttleWithTimeout(1, TimeUnit.SECONDS)
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.d(TAG, "integer:" + integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "onError");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete");
                        }
                    });
    
    

    log

    02-12 10:15:56.573 16357-16357/... D/SplashActivity: integer:6
    02-12 10:15:56.573 16357-16357/... D/SplashActivity: onComplete
    
    

    相关文章

      网友评论

          本文标题:6. Rxjava2 : 筛选操作符

          本文链接:https://www.haomeiwen.com/subject/mgvkeqtx.html