美文网首页
RxJava基础七-条件操作符

RxJava基础七-条件操作符

作者: 清水杨杨 | 来源:发表于2019-05-02 16:41 被阅读0次

    此篇内容均是来自书籍《RxJava响应式编程》李衍顺 著

    3.7 条件操作

    3.7.1 all

    all操作符根据一个函数对源Observable发送的所有数据进行判断,最终返回的结果就是这个判断结果。这个函数使用源Observable发送的数据作为参数,内部判断所有的数据是否满足我们定义好的判断条件,如果全部都满足则返回true,否则就返回false。

    /**
         * all
         */
        private Observable<Boolean> allObserver(){
            Observable<Integer> just = Observable.just(1,2,3,4,5);
            return just.all(new Func1<Integer, Boolean>() {
                @Override
                public Boolean call(Integer integer) {
                    return integer < 6;
                }
            });
        }
        private Observable<Boolean> notAllObserver(){
            Observable<Integer> just = Observable.just(1,2,3,4,5,6);
            return just.all(new Func1<Integer, Boolean>() {
                @Override
                public Boolean call(Integer integer) {
                    return integer < 6;
                }
            });
        }
        private void allTest(){
            allObserver().subscribe(new Action1<Boolean>() {
                @Override
                public void call(Boolean aBoolean) {
                    log("all:" + aBoolean);
                }
            });
            notAllObserver().subscribe(new Action1<Boolean>() {
                @Override
                public void call(Boolean aBoolean) {
                    log("not all: "+ aBoolean);
                }
            });
        }
    结果
    all:true
    not all: false
    

    3.7.2 amb

    amb操作符可以将至多9个Observable结合起来, 让他们竞争。哪个Observable首先发送了数据(包括onError和OnComplete),就继续发送这个Observable数据,其它Observable发送的数据就会被丢弃。

    /**
         * amb
         */
        private Observable<Integer> ambObserver(){
            Observable<Integer> delay3 = Observable.just(7,8,9).delay(3000, TimeUnit.MILLISECONDS, Schedulers.trampoline());
            Observable<Integer> delay2 = Observable.just(4,5,6).delay(2000, TimeUnit.MILLISECONDS, Schedulers.trampoline());
            Observable<Integer> delay1 = Observable.just(1,2,3).delay(1000, TimeUnit.MILLISECONDS, Schedulers.trampoline());
            return Observable.amb(delay1, delay2, delay3);
        }
        private void ambTest(){
            ambObserver().subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    log("amb: " + integer);
                }
            });
        }
    结果:
    amb: 1
    amb: 2
    amb: 3
    

    3.7.3 contains

    contains操作符用来判断源Observable所发送的所有数据是否包含某一个数据,如果包含则返回true;如果源Observable已经结束了却还没有发送这个数据, 则返回false。所以在Observable没发送所有的数据之前, contains是不会有返回数据的。

    /**
         * contains
         */
        private void containsTest(){
            Observable.just(1,2,3).contains(3).subscribe(new Action1<Boolean>() {
                @Override
                public void call(Boolean aBoolean) {
                    log("contains: " + aBoolean);
                }
            });
            Observable.just(1,2,3).contains(4).subscribe(new Action1<Boolean>() {
                @Override
                public void call(Boolean aBoolean) {
                    log("not contains : " + aBoolean);
                }
            });
        }
    结果:
    contains: true
    not contains : false
    

    3.7.4 isEmpty

    isEmpty操作符用来判断源Observable是否发送过数据,如果发送过就返回false;如果源Observable已经结束了都还没有发送过这个数据,则返回true。

    /**
         * isEmpty
         */
        private void isEmptyTest() {
            Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    subscriber.onCompleted();
                }
            }).isEmpty()
                    .subscribe(new Action1<Boolean>() {
                        @Override
                        public void call(Boolean aBoolean) {
                            log("isEmpty: " + aBoolean);
                        }
                    });
        }
    结果:
    isEmpty: true
    

    3.7.5 defaultIfEmpty

    defaultIfEmpty操作符会判断源Observable是否发送了数据,如果源Observable发送了数据,则正常发送这些数据;否则发送一个默认的数据

    /**
         * defaultIfEmpty
         */
        private void defaultIfEmptyTest(){
            Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    subscriber.onCompleted();
                }
            }).defaultIfEmpty(10).subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    log("empty: " + integer);
                }
            });
    
            Observable.just(1).defaultIfEmpty(10).subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    log("notEmpty :" + integer);
                }
            });
        }
    结果:
    empty: 10
    notEmpty :1
    

    3.7.6 sequenceEqual

    sequenceEqual操作符用来判断两个Observable发送的数据序列是否相同(发送的数据相同,数据的序列相同,结束的状态相同),如果全部相同则返回true,否则返回false

    /**
         * sequenceEqual
         */
        private void sequenceEqualTest(){
            Observable.sequenceEqual(Observable.just(1,2,3), Observable.just(1,2,3))
                    .subscribe(new Action1<Boolean>() {
                        @Override
                        public void call(Boolean aBoolean) {
                            log("equal: " + aBoolean);
                        }
                    });
            Observable.sequenceEqual(Observable.just(1,2,3), Observable.just(4,5,6))
                    .subscribe(new Action1<Boolean>() {
                        @Override
                        public void call(Boolean aBoolean) {
                            log("not equal :" + aBoolean);
                        }
                    });
        }
    结果:
    equal: true
    not equal :false
    

    3.7.7 skipUntil 和 skipWhile

    这两个操作符都是根据条件来跳过一些数据,不同之处在于skipUntil是根据一个标志Observable来判断的, 当这个标志Observable没有发生数据的时候,所有源Observable发送的数据都会被跳过;当标志Observable发送了一个数据后,则开始正常的发送数据。

    而skipWhile则是根据一个函数来判断是否跳过数据,如果函数返回值为true,则一直跳过源Observable发送的数据;如果函数返回false,则开始正常发送数据

    /**
         * skipUntil && skipWhile
         */
        private void skipUntil_SkipWhile(){
            Observable.interval(1, TimeUnit.SECONDS)
                    .skipUntil(Observable.timer(3, TimeUnit.SECONDS))
                    .take(5)
                    .subscribe(new Action1<Long>() {
                        @Override
                        public void call(Long aLong) {
                            log("skipUntil: "+ aLong);
                        }
                    });
    
            Observable.interval(1, TimeUnit.SECONDS, Schedulers.trampoline()).skipWhile(new Func1<Long, Boolean>() {
                @Override
                public Boolean call(Long aLong) {
                    return aLong<5;
                }
            }).take(5).subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    log("skipWhile: " + aLong);
                }
            });
        }
    

    订阅后的输出结果如下。对于第一个Observable,我们采用了timer操作符来创建标志Observable,所以跳过了源Observable的前两个数据;对于第二个Observable,我们的条件是小于5的数据都跳过,所以最终的数据是过了5秒后从5开始发送出去。

    skipUntil: 2
    skipUntil: 3
    skipUntil: 4
    skipUntil: 5
    skipWhile: 5
    skipUntil: 6
    skipWhile: 6
    skipWhile: 7
    skipWhile: 8
    skipWhile: 9
    

    3.7.8 takeUntil 和 takeWhile

    takeUntil 和takeWhile 操作符分别和skipUntil 和 skipWhile操作符完全相反的功能。takeUntil也是使用一个标志Observable是否发送数据来进行判断;当标志Observable没有发送数据时,正常发送数据,而一旦标志Observable发送过数据,则后面的数据都会被丢弃。

    /**
         * takeUntil && takeWhile
         */
        private void takeUntil_TakeWhile(){
            Observable.interval(1, TimeUnit.SECONDS, Schedulers.trampoline())
                    .takeUntil(Observable.timer(3, TimeUnit.SECONDS))
                    .subscribe(new Action1<Long>() {
                        @Override
                        public void call(Long aLong) {
                            log("takeUntil: "+ aLong);
                        }
                    });
            Observable.interval(1, TimeUnit.SECONDS, Schedulers.trampoline())
                    .takeWhile(new Func1<Long, Boolean>() {
                        @Override
                        public Boolean call(Long aLong) {
                            return aLong < 5;
                        }
                    }).subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    log("takeWhile: " + aLong);
                }
            });
        }
    

    订阅后输出的结果如下。可以看到第一个Observable因为使用timer创建标志Observable,所以只取了前两个数;第二个Observable只能把小于5的数据发送出来,大于5的数据都被丢弃了

    takeUntil: 0
    takeUntil: 1
    takeWhile: 0
    takeWhile: 1
    takeWhile: 2
    takeWhile: 3
    takeWhile: 4
    

    相关文章

      网友评论

          本文标题:RxJava基础七-条件操作符

          本文链接:https://www.haomeiwen.com/subject/flinnqtx.html