RxJava操作符系列六

作者: Code4Android | 来源:发表于2016-12-19 20:27 被阅读763次
    RxJava

    RxJava操作符系列传送门

    RxJava操作符源码
    RxJava操作符系列一
    RxJava操作符系列二
    RxJava操作符系列三
    RxJava操作符系列四
    RxJava操作符系列五

    前言

    在上几篇文章我们介绍了一些RxJava创建,转换,过滤,组合,辅助的一些操作符,今天我们继续去学习RxJava的一些条件和布尔操作符,在此感谢那些阅读文章并提出建议的同学,其实写技术文章主要是提升自己的知识面以及对知识的理解程度,也有人说有些地方描述还不够清晰,文笔有待提高,是的,确实是这样,描述不清晰我感觉不仅是文笔有待提高(提升空间很大),对技术的理解也需要提高(技术渣蜕变中...)。不过我相信,只要我能写出来,这就是一种进步,就是一个积累的过程,我会努力,加油。好了,不扯了,进入正题。

    All

    该操作符是判断Observable发射的所有数据是否都满足某一条件,它接收一个Fun1方法参数,此方法接收原始数据,并返回一个布尔类型值,如果原始Observable正常终止并且每一项数据都满足条件,就返回true;如果原始Observable的任何一项数据不满足条件就返回False。当某数据不满足条件(返回false)时之后的数据不再发射。如下示例代码

     Observable.just(1,2,3,4).all(new Func1<Integer, Boolean>() {
                @Override
                public Boolean call(Integer integer) {
                    Log.e(TAG, "call: "+integer );
                    return integer<2;
                }
            }).subscribe(new Subscriber<Boolean>() {
                @Override
                public void onCompleted() {
                    Log.e(TAG, "onCompleted: ");
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "onError: " );
                }
    
                @Override
                public void onNext(Boolean aBoolean) {
                    Log.e(TAG, "onNext: "+aBoolean );
                }
            });
    

    输出日志信息

    call: 1
    call: 2
    onNext: false
    onCompleted: 
    

    Amb

    当我们传递多个Observable(可接收2到9个Observable)给amb时,它只发射其中一个Observable的数据和通知:首先发送通知给amb的那个,不管发射的是一项数据还是一个onError或onCompleted通知。amb将忽略和丢弃其它所有Observables的发射物。


    这里写图片描述
     Observable observable= Observable.just(1,2,3).delay(500,TimeUnit.MILLISECONDS).subscribeOn(Schedulers.newThread());
           Observable observable1= Observable.just(4,5,6).delay(100,TimeUnit.MILLISECONDS).subscribeOn(Schedulers.newThread());
            Observable.amb(observable,observable1).subscribe(new Subscriber<Integer>() {
                @Override
                public void onCompleted() {
                    Log.e(TAG, "onCompleted: ");
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "onError: ");
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.e(TAG, "  onNext: "+integer);
                }
            });
    

    输出日志信息

    onNext: 4
    onNext: 5
    onNext: 6
    onCompleted:
    

    有一个类似的对象方法ambWith。如上示例中Observable.amb(observable,observable1)改为observable.ambWith(observable1)是等价的。

    Contains

    我们可以给Contains传一个指定的值,如果原始Observable发射了那个值,它返回的Observable将发射true,否则发射false,即Observable发射的数据是否包含某一对象。
    示例代码

    Observable.just(1,2,3,4).contains(2)
                    .subscribe(new Subscriber<Boolean>() {
                        @Override
                        public void onCompleted() {
                            Log.e(TAG, "onCompleted: " );
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.e(TAG, "onError: " );
                        }
    
                        @Override
                        public void onNext(Boolean aBoolean) {
                            Log.e(TAG, "onNext: "+aBoolean);
                        }
                    });
    

    输出日志信息

    onNext: true
    onCompleted: 
    

    isEmpty也是一种判断是否包含的操作符,不同的是它判断原始Observable是否没有发射任何数据。

    exists操作符与contains操作符作用相同,但是它接收的是一个Func1函数,可以指定一个判断条件,当发射的数据有满足判断条件(返回true)就发射true,否则为false。

    DefaultIfEmpty

    该操作符简单的精确地发射原始Observable的值,如果原始Observable没有发射任何数据正常终止(以onCompletedd的形式),DefaultIfEmpty返回的Observable就发射一个我们提供的默认值。

    Observable.empty().defaultIfEmpty(1).subscribe(new Subscriber<Object>() {
    
                @Override
                public void onCompleted() {
                    Log.e(TAG, "onCompleted: " );
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "onError: ");
                }
    
                @Override
                public void onNext(Object object) {
                    Log.e(TAG, "onNext: "+object);
                }
            });
    

    输出日志信息

    onNext: 1
    onCompleted: 
    

    defaultIfEmpty只有在没有发射数据时才会有效果,若发射的有数据,和不使用此操作符效果一样。

    SequenceEqual

    该操作符会比较两个Observable的发射物,如果两个序列是相同的(相同的数据,相同的顺序,相同的终止状态),它就发射true,否则发射false

      Observable observable=Observable.just(1,2,3);
            Observable observable1=Observable.just(1,3,2);
            Observable.sequenceEqual(observable,observable1)
                    .subscribe(new Subscriber<Boolean>() {
                        @Override
                        public void onCompleted() {
                            Log.e(TAG, "onCompleted: " );
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.e(TAG, "onError: " );
                        }
    
                        @Override
                        public void onNext(Boolean aBoolean) {
                            Log.e(TAG, "onNext: " +aBoolean);
                        }
                    });
    

    执行后输出

    onNext: false
    onCompleted:
    

    SkipUntil

    SkipUntil订阅原始的Observable,但是忽略它的发射物,直到第二个Observable发射了一项数据那一刻,它开始发射原始Observable。


    这里写图片描述
    Observable observable = Observable.just(1, 2, 3,4,5,6).delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.newThread());
            Observable observable1 = Observable.just(20, 21, 22).delay(130, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.newThread());
            observable.skipUntil(observable1)
                   .subscribe(new Subscriber() {
                       @Override
                       public void onCompleted() {
                           Log.e(TAG, "onCompleted: " );
                       }
    
                       @Override
                       public void onError(Throwable e) {
                           Log.e(TAG, "onError: " );
                       }
    
                       @Override
                       public void onNext(Object o) {
                           Log.e(TAG, "onNext: "+o );
                       }
                   });
    

    输出日志信息

    onNext: 4
    onNext: 5
    onNext: 6
    onCompleted: 
    

    由于上面两个Observable都是在一个新线程中,时间不可控,每次运行的结果一般不会相同。但是都会符合上面所述规则,可参考上图。

    SkipWhile

    该操作符也是忽略它的发射物,直到我们指定的某个条件变为false的那一刻,它开始发射原始Observable。切记是判断条件返回false时开始发射数据。
    示例代码

     Observable.range(1,5).skipWhile(new Func1<Integer, Boolean>() {
                @Override
                public Boolean call(Integer integer) {
                    Log.e(TAG, "call: "+integer);
                    return integer<3;
                }
            }).subscribe(new Subscriber<Integer>() {
                @Override
                public void onCompleted() {
                    Log.e(TAG, "onCompleted: " );
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "onError: " );
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.e(TAG, "onNext: " +integer);
                }
            });
    

    输出日志信息

    call: 1
    call: 2
    call: 3
    onNext: 3
    onNext: 4
    onNext: 5
    onCompleted: 
    

    TakeUntil

    如果我们理解了SkipUntil操作符了,那么这个操作符也就很好理解了,该操作符与SkipUntil有点相反的意思。 这里写图片描述

    通过上图你应该也看出来和SkipUntil的区别,当第二个Observable发射了一项数据或者终止时,丢弃原始Observable发射的任何数据(SkipUntil则是丢弃之前数据,发射之后的数据)。

    Observable observable = Observable.just(1, 2, 3,4,5,6).delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.newThread());
            Observable observable1 = Observable.just(20, 21, 22).delay(120, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.newThread());
            observable.takeUntil(observable1)
                    .subscribe(new Subscriber() {
                        @Override
                        public void onCompleted() {
                            Log.e(TAG, "onCompleted: " );
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.e(TAG, "onError: " );
                        }
    
                        @Override
                        public void onNext(Object o) {
                            Log.e(TAG, "onNext: "+o );
                        }
                    });
    

    输出日志信息(每次执行的输出结果一般不相同)

    onNext: 1
    onNext: 2
    onNext: 3
    onNext: 4
    onCompleted: 
    

    TakeWhile

    该操作符发射原始Observable,直到我们指定的某个条件不成立的那一刻,它停止发射原始Observable(skipWhile此时开始发射),并终止自己的Observable。

     Observable.range(1,5).takeWhile(new Func1<Integer, Boolean>() {
                @Override
                public Boolean call(Integer integer) {
                    Log.e(TAG, "call: "+integer);
                    return integer<3;
                }
            }).subscribe(new Subscriber<Integer>() {
                @Override
                public void onCompleted() {
                    Log.e(TAG, "onCompleted: " );
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "onError: " );
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.e(TAG, "onNext: " +integer);
                }
            });
    

    输出日志信息

    call: 1
    onNext: 1
    call: 2
    onNext: 2
    call: 3
    onCompleted: 
    

    今天的这篇文章到这里就结束了,若文中有错误的地方,欢迎指正。谢谢。

    相关文章

      网友评论

      本文标题:RxJava操作符系列六

      本文链接:https://www.haomeiwen.com/subject/llptvttx.html