RxJava2 入门详细笔记(2)

作者: 业志陈 | 来源:发表于2018-10-07 01:07 被阅读15次

    六、过滤操作符

    6.1、filter()

    通过一定逻辑来过滤被观察者发送的事件,如果返回 true 则会发送事件,否则不会发送

            Observable.just(1, 2, 3, 4).filter(new Predicate<Integer>() {
                @Override
                public boolean test(Integer integer) throws Exception {
                    return integer % 2 == 0;
                }
            }).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.e(TAG, "accept : " + integer);
                }
            });
    
    10-06 07:57:48.196 12753-12753/? E/MainActivity: accept : 2
    10-06 07:57:48.196 12753-12753/? E/MainActivity: accept : 4
    

    6.2、ofType()

    过滤不符合该类型的事件

            Observable.just(1, 2, "Hi", 3, 4, "Hello").ofType(Integer.class).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.e(TAG, "accept : " + integer);
                }
            });
    
    10-06 07:59:41.265 12857-12857/leavesc.hello.rxjavademo E/MainActivity: accept : 1
    10-06 07:59:41.265 12857-12857/leavesc.hello.rxjavademo E/MainActivity: accept : 2
    10-06 07:59:41.265 12857-12857/leavesc.hello.rxjavademo E/MainActivity: accept : 3
    10-06 07:59:41.265 12857-12857/leavesc.hello.rxjavademo E/MainActivity: accept : 4
    

    6.3、skip()

    以正序跳过指定数量的事件

            Observable.just(1, 2, 3, 4).skip(2).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.e(TAG, "accept : " + integer);
                }
            });
    
    10-06 08:01:09.183 12971-12971/leavesc.hello.rxjavademo E/MainActivity: accept : 3
    10-06 08:01:09.183 12971-12971/leavesc.hello.rxjavademo E/MainActivity: accept : 4
    

    6.4、skipLast()

    以反序跳过指定数量的事件

            Observable.just(1, 2, 3, 4).skipLast(2).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.e(TAG, "accept : " + integer);
                }
            });
    
    10-06 08:02:00.753 13079-13079/leavesc.hello.rxjavademo E/MainActivity: accept : 1
    10-06 08:02:00.753 13079-13079/leavesc.hello.rxjavademo E/MainActivity: accept : 2
    

    6.5、distinct()

    过滤事件序列中的重复事件

            Observable.just(1, 2, 1, 2, 3, 4, 3).distinct().subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.e(TAG, "accept : " + integer);
                }
            });
    
    10-06 08:03:27.402 13189-13189/leavesc.hello.rxjavademo E/MainActivity: accept : 1
    10-06 08:03:27.402 13189-13189/leavesc.hello.rxjavademo E/MainActivity: accept : 2
    10-06 08:03:27.402 13189-13189/leavesc.hello.rxjavademo E/MainActivity: accept : 3
    10-06 08:03:27.402 13189-13189/leavesc.hello.rxjavademo E/MainActivity: accept : 4
    

    6.6、distinctUntilChanged()

    过滤掉连续重复的事件

            Observable.just(1, 2, 2, 1, 3, 4, 3, 3).distinctUntilChanged().subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.e(TAG, "accept : " + integer);
                }
            });
    
    10-06 08:04:44.531 13294-13294/leavesc.hello.rxjavademo E/MainActivity: accept : 1
    10-06 08:04:44.541 13294-13294/leavesc.hello.rxjavademo E/MainActivity: accept : 2
    10-06 08:04:44.541 13294-13294/leavesc.hello.rxjavademo E/MainActivity: accept : 1
    10-06 08:04:44.541 13294-13294/leavesc.hello.rxjavademo E/MainActivity: accept : 3
    10-06 08:04:44.541 13294-13294/leavesc.hello.rxjavademo E/MainActivity: accept : 4
    10-06 08:04:44.541 13294-13294/leavesc.hello.rxjavademo E/MainActivity: accept : 3
    

    6.7、take()

    控制观察者接收事件的数量

            Observable.just(1, 2, 2, 1, 3, 4, 3, 3).take(3).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.e(TAG, "accept : " + integer);
                }
            });
    
    10-06 08:05:43.520 13397-13397/? E/MainActivity: accept : 1
    10-06 08:05:43.520 13397-13397/? E/MainActivity: accept : 2
    10-06 08:05:43.520 13397-13397/? E/MainActivity: accept : 2
    

    6.8、debounce()

    如果两个事件发送的时间间隔小于设定的时间间隔,则前一件事件不会发送给观察者

     Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                    Thread.sleep(900);
                    emitter.onNext(2);
                }
            }).debounce(1, TimeUnit.SECONDS).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.e(TAG, "accept : " + integer);
                }
            });
    
    10-06 08:08:59.337 13509-13523/leavesc.hello.rxjavademo E/MainActivity: accept : 2
    

    6.9、firstElement() && lastElement()

    firstElement() 取事件序列的第一个元素,lastElement() 取事件序列的最后一个元素

            Observable.just(1, 2, 3, 4, 5).firstElement().subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.e(TAG, "accept : " + integer);
                }
            });
    

    6.10、elementAt() & elementAtOrError()

    elementAt() 可以指定取出事件序列中事件,但是输入的 index 超出事件序列的总数的话就不会触发任何调用,想触发异常信息的话就用 elementAtOrError()

            Observable.just(1, 2, 3, 4, 5).elementAt(5).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.e(TAG, "accept : " + integer);
                }
            });
    

    以上代码不会触发任何

    改用为 elementAtOrError(),则会抛出异常

            Observable.just(1, 2, 3, 4, 5).elementAtOrError(5).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.e(TAG, "accept : " + integer);
                }
            });
    
    Process: leavesc.hello.rxjavademo, PID: 13948
        io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | null
            at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
            at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
            at io.reactivex.internal.observers.ConsumerSingleObserver.onError(ConsumerSingleObserver.java:46)
            at io.reactivex.internal.operators.observable.ObservableElementAtSingle$ElementAtObserver.onComplete(ObservableElementAtSingle.java:115)
            at io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable.run(ObservableFromArray.java:111)
            at io.reactivex.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java:37)
            at io.reactivex.Observable.subscribe(Observable.java:12090)
            at io.reactivex.internal.operators.observable.ObservableElementAtSingle.subscribeActual(ObservableElementAtSingle.java:37)
            at io.reactivex.Single.subscribe(Single.java:3438)
            at io.reactivex.Single.subscribe(Single.java:3424)
    

    七、条件操作符

    7.1、all()

    判断事件序列是否全部满足某个事件,如果都满足则返回 true,反之则返回 false

            Observable.just(1, 2, 3, 4, 5).all(new Predicate<Integer>() {
                @Override
                public boolean test(Integer integer) throws Exception {
                    return integer % 2 == 0;
                }
            }).subscribe(new Consumer<Boolean>() {
                @Override
                public void accept(Boolean aBoolean) throws Exception {
                    Log.e(TAG, "accept: " + aBoolean);
                }
            });
    
    10-06 08:16:10.212 14043-14043/leavesc.hello.rxjavademo E/MainActivity: accept: false
    

    7.2、takeWhile()

    发射原始 Observable,直到指定的某个条件不成立的那一刻,它停止发射原始 Observable,并终止自己的 Observable

    Observable.just(1, 2, 3, 4, 5, 1, 2).takeWhile(new Predicate<Integer>() {
                @Override
                public boolean test(Integer integer) throws Exception {
                    return integer < 4;
                }
            }).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.e(TAG, "accept: " + integer);
                }
            });
    
    10-06 14:03:42.110 20095-20095/leavesc.hello.rxjavademo E/MainActivity: accept: 1
    10-06 14:03:42.110 20095-20095/leavesc.hello.rxjavademo E/MainActivity: accept: 2
    10-06 14:03:42.110 20095-20095/leavesc.hello.rxjavademo E/MainActivity: accept: 3
    

    7.3、skipWhile()

    订阅原始的 Observable,但是忽略它的发射物,直到指定的某个条件变为 false 时才开始发射原始 Observable

                Observable.just(1, 2, 4, 1, 3, 4, 5, 1, 5)
                    .skipWhile(new Predicate<Integer>() {
                        @Override
                        public boolean test(Integer integer) throws Exception {
                            return integer < 3;
                        }
                    })
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            Log.e(TAG, "integer " + integer);
                        }
                    });
    
    10-06 13:59:40.583 19764-19764/leavesc.hello.rxjavademo E/MainActivity: integer 4
    10-06 13:59:40.593 19764-19764/leavesc.hello.rxjavademo E/MainActivity: integer 1
    10-06 13:59:40.593 19764-19764/leavesc.hello.rxjavademo E/MainActivity: integer 3
    10-06 13:59:40.593 19764-19764/leavesc.hello.rxjavademo E/MainActivity: integer 4
    10-06 13:59:40.593 19764-19764/leavesc.hello.rxjavademo E/MainActivity: integer 5
    10-06 13:59:40.593 19764-19764/leavesc.hello.rxjavademo E/MainActivity: integer 1
    10-06 13:59:40.593 19764-19764/leavesc.hello.rxjavademo E/MainActivity: integer 5
    

    7.4、takeUntil()

    用于设置一个条件,当事件满足此条件时,此事件会被发送,但之后的事件就不会被发送了

    Observable.just(1, 2, 4, 1, 3, 4, 5, 1, 5)
                    .takeUntil(new Predicate<Integer>() {
                        @Override
                        public boolean test(Integer integer) throws Exception {
                            return integer > 3;
                        }
                    })
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            Log.e(TAG, "integer " + integer);
                        }
                    });
    
    10-06 08:54:24.833 17208-17208/? E/MainActivity: integer 1
    10-06 08:54:24.833 17208-17208/? E/MainActivity: integer 2
    10-06 08:54:24.833 17208-17208/? E/MainActivity: integer 4
    

    7.5、skipUntil()

    skipUntil() 中的 Observable 发送事件了,原始的 Observable 才会发送事件给观察者

    Observable.intervalRange(1, 6, 0, 1, TimeUnit.SECONDS)
                    .skipUntil(Observable.intervalRange(10, 3, 1, 1, TimeUnit.SECONDS))
                    .subscribe(new Observer<Long>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.e(TAG, "onSubscribe");
                        }
    
                        @Override
                        public void onNext(Long along) {
                            Log.e(TAG, "onNext : " + along);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.e(TAG, "onError");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.e(TAG, "onComplete");
                        }
                    });
    
    10-06 08:51:16.926 16877-16877/leavesc.hello.rxjavademo E/MainActivity: onSubscribe
    10-06 08:51:17.946 16877-16892/leavesc.hello.rxjavademo E/MainActivity: onNext : 2
    10-06 08:51:18.936 16877-16892/leavesc.hello.rxjavademo E/MainActivity: onNext : 3
    10-06 08:51:19.946 16877-16892/leavesc.hello.rxjavademo E/MainActivity: onNext : 4
    10-06 08:51:20.936 16877-16892/leavesc.hello.rxjavademo E/MainActivity: onNext : 5
    10-06 08:51:21.946 16877-16892/leavesc.hello.rxjavademo E/MainActivity: onNext : 6
    10-06 08:51:21.946 16877-16892/leavesc.hello.rxjavademo E/MainActivity: onComplete
    

    7.6、sequenceEqual()

    判断两个 Observable 发送的事件是否相同,如果两个序列是相同的(相同的数据,相同的顺序,相同的终止状态),它就发射 true,否则发射 false

            Observable.sequenceEqual(Observable.just(1, 2, 3), Observable.just(1, 2, 3))
                    .subscribe(new Consumer<Boolean>() {
                        @Override
                        public void accept(Boolean aBoolean) throws Exception {
                            Log.e(TAG, "accept aBoolean : " + aBoolean);
                        }
                    });
    
    10-06 08:46:59.369 16492-16492/leavesc.hello.rxjavademo E/MainActivity: accept aBoolean : true
    

    7.7、contains()

    判断事件序列中是否含有某个元素,如果有则返回 true,如果没有则返回 false

            Observable.just(1, 2, 3, 4).contains(2).subscribe(new Consumer<Boolean>() {
                @Override
                public void accept(Boolean aBoolean) throws Exception {
                    Log.e(TAG, "accept aBoolean : " + aBoolean);
                }
            });
    
    10-06 08:45:58.100 16386-16386/leavesc.hello.rxjavademo E/MainActivity: accept aBoolean : true
    

    7.8、isEmpty()

    判断事件序列是否为空

            Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onComplete();
                }
            }).isEmpty().subscribe(new Consumer<Boolean>() {
                @Override
                public void accept(Boolean aBoolean) throws Exception {
                    Log.e(TAG, "accept aBoolean: " + aBoolean);
                }
            });
    
    10-06 08:43:43.201 16278-16278/leavesc.hello.rxjavademo E/MainActivity: accept aBoolean: true
    

    7.9、amb()

    amb() 接收一个 Observable 集合,但是只会发送最先发送事件的 Observable 中的事件,不管发射的是一项数据还是一个 onErroronCompleted 通知,其余 Observable 将会被丢弃

            List<Observable<Long>> list = new ArrayList<>();
            list.add(Observable.intervalRange(1, 3, 2, 1, TimeUnit.SECONDS));
            list.add(Observable.intervalRange(10, 3, 0, 1, TimeUnit.SECONDS));
            Observable.amb(list).subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    Log.e(TAG, "accept: " + aLong);
                }
            });
    
    10-06 08:41:45.783 16053-16068/leavesc.hello.rxjavademo E/MainActivity: accept: 10
    10-06 08:41:46.783 16053-16068/leavesc.hello.rxjavademo E/MainActivity: accept: 11
    10-06 08:41:47.783 16053-16068/leavesc.hello.rxjavademo E/MainActivity: accept: 12
    

    7.10、defaultIfEmpty()

    如果 Observable 没有发射任何值,则可以利用这个方法发送一个默认值

            Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onComplete();
                }
            }).defaultIfEmpty(100).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.e(TAG, "accept: " + integer);
                }
            });
    
    10-06 08:40:04.754 15945-15945/leavesc.hello.rxjavademo E/MainActivity: accept: 100
    

    更多的学习笔记看这里:Java_Android_Learn

    相关文章

      网友评论

        本文标题:RxJava2 入门详细笔记(2)

        本文链接:https://www.haomeiwen.com/subject/gmpkaftx.html