美文网首页Android-RxJavaAndroid-Rxjava&retrofit&dagger
RxJava——基础学习(十),条件和布尔,连接操作符

RxJava——基础学习(十),条件和布尔,连接操作符

作者: 英勇青铜5 | 来源:发表于2017-01-04 17:54 被阅读95次

    学习资料:

    1.布尔操作

    操作符结果自然全部为boolean

    1.1 All

    判断所有的数据项是否符合同一个条件

    简单使用:

        /**
         * List内所有的元素,是否都是偶数
         */
        private static void all() {
            Observable
                    .just(Arrays.asList(2, 4, 6, 8, 10))
                    .all(new Func1<List<Integer>, Boolean>() {
                        @Override
                        public Boolean call(List<Integer> integers) {
                            return integers.stream().allMatch(new Predicate<Integer>() {
                                @Override
                                public boolean test(Integer integer) {
                                    return integer % 2 == 0;
                                }
                            });
                        }
                    })
                    .subscribe(System.out::println);
        }
    

    输出结果:true


    1.2 Contains

    判断数据项是否含有某个特定的值

    contains(Object element),参数是一个Object

    简单使用:

        /**
         * 判断是否包含整数11
         */
        private static void contains() {
            Observable
                    .just(9, 10, 11, 12)
                    .contains(11)
                    .subscribe(System.out::println);
        }
    

    输出结果:true


    1.3 SequenceEqual

    SequenceEqual(Observable1,Observable2),需要两个Observable

    两个Observable的发送数据项序列完全相同。只有在,相同的数据,相同的顺序,相同的终止状态时,就判定为两个序列相同。

    简单使用:

        /**
         * 比较两个Observable发送的数据项
         */
        private static void sequenceEqual() {
            Observable
                    .sequenceEqual(Observable.just(1, 2, 3), Observable.just(2, 3, 4))
                    .subscribe(System.out::println);
        }
    

    输出结果:false


    2. 条件操作符

    根据条件,进行发射或变换Observables操作

    2.1 Amb

    首先发出数据项或者通知的Observable

    简单使用:

        /**
         * 将首先发出的 Second 打印出来
         */
        private static void ambDemo() {
            Observable
                    .amb(
                            //在computation计算线程
                            Observable.just("First").delay(200, TimeUnit.MILLISECONDS),
                            Observable.just("Second").delay(10, TimeUnit.MILLISECONDS),
                            //在main线程执行
                            Observable.just("Third").delay(250, TimeUnit.MILLISECONDS,Schedulers.immediate())
                    )
                    .subscribe(System.out::println);
        }
    

    输出结果:Second

    注意:amb内,第3个Observable使用了Schedulers.immediate()

    Java下,上面的delay(long,TimeUnit)这种形式,默认是在computation线程中,main线程结束时,Rx的一系列操作也就结束了。使用Schedulers.immediate()指定在当前线程,也就是main线程中,这样main线程执行完最后一行代码,也不会立即结束

    Rx 的线程调度机制并不清楚,暂时深入不了


    2.2 SkipUntil

    丢弃原始Observable发送的数据项,直到第2个Observable发出一个通知信号

    默认不在任何特定的调度器上执行

    简单使用:

        /**
         * 跳过2s前的数据项
         */
        private static void skipUtil() {
            Observable
                    .interval(100, TimeUnit.MILLISECONDS,Schedulers.immediate())
                    .skipUntil(Observable.timer(2,TimeUnit.SECONDS))
                    .limit(30)
                    .buffer(10)
                    .subscribe(System.out::println);
        }
    

    运行结果:

    [19, 20, 21, 22, 23, 24, 25, 26, 27, 28]
    [29, 30, 31, 32, 33, 34, 35, 36, 37, 38]
    [39, 40, 41, 42, 43, 44, 45, 46, 47, 48]
    

    2s之前发送的数据项就被丢弃


    2.3 SkipWhile

    skipWhile

    丢弃数据项,直到, 一旦指定的条件不成立,返回false

    默认不在任何特定的调度器上执行

    简单使用:

        /**
         * 小于 5 就跳过
         */
        private static void skipWhile() {
            Observable
                    .interval(100, TimeUnit.MILLISECONDS, Schedulers.immediate())
                    .skipWhile(new Func1<Long, Boolean>() {
                        @Override
                        public Boolean call(Long aLong) {
                            return  aLong < 5;
                        }
                    })
                    .limit(30)
                    .buffer(10)
                    .subscribe(System.out::println);
        }
    

    运行结果:

    [5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
    [15, 16, 17, 18, 19, 20, 21, 22, 23, 24]
    [25, 26, 27, 28, 29, 30, 31, 32, 33, 34]
    

    将条件改为aLong > 5,查看输出结果,有助于理解


    3. 连接操作符

    3.1 Connect

    将一个可连接的Observable,也就是ConnectableObservable,开始发送数据项给订阅者

    一个普通的Observable使用publish()就可以变成一个ConnectableObservable,在订阅的时候并不会开始向订阅者发送数据项,使用connect()后,才开始发送数据项

    refCount()将一个可连接的Observable转换成为普通的Observable

    connect()ConnectableObservable接口的一个方法,返回一个Subscription对象,可以在恰当的时机,调用unsubscribe()方法让Observable停止发射数据给观察者

    即使没有任何订阅者订阅Observable,也可以使用connect()方法让一个Observable开始发射数据(或者开始生成待发射的数据)。这样,就将一个Observable变为

    简单使用:

        private static void connectAction() {
            //创建一个 ConnectableObservable
            ConnectableObservable <Long> connectableObservable = Observable
                    .interval(100, TimeUnit.MILLISECONDS)//不要使用 Schedulers.immediate()
                    .publish();
            //订阅 此时并不会发送数据
            connectableObservable
                    .delaySubscription(2, TimeUnit.SECONDS)//延迟2s,发送数据项
                    .buffer(10)
                    .subscribe(new Subscriber<List<Long>>() {
                        @Override
                        public void onCompleted() {
                            System.out.println("connectableObservable --> onCompleted");
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            System.out.println("connectableObservable --> "+e.getMessage());
                        }
    
                        @Override
                        public void onNext(List<Long> longs) {
                            System.out.println("connectableObservable --> "+longs);
                        }
                    });
            //进行连接
            Subscription subscription = connectableObservable.connect();
    
            Observable
                    .interval(100, TimeUnit.MILLISECONDS, Schedulers.immediate())
                    .limit(50)
                    .buffer(10)
                    .subscribe(new Subscriber<List<Long>>() {
                        @Override
                        public void onCompleted() {
                            System.out.println("Observable --> onCompleted");
                            subscription.unsubscribe();//connectableObservable 取消订阅
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            System.out.println("Observable --> "+e.getMessage());
                        }
    
                        @Override
                        public void onNext(List<Long> longs) {
                            System.out.println("Observable --> "+longs);
                        }
                    });
        }
    

    运行结果:

    Observable --> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
    Observable --> [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
    connectableObservable --> [19, 20, 21, 22, 23, 24, 25, 26, 27, 28]
    Observable --> [20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
    connectableObservable --> [29, 30, 31, 32, 33, 34, 35, 36, 37, 38]
    Observable --> [30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
    connectableObservable --> [39, 40, 41, 42, 43, 44, 45, 46, 47, 48]
    Observable --> [40, 41, 42, 43, 44, 45, 46, 47, 48, 49]
    Observable --> onCompleted
    

    4. 最后

    东西不是很多,也不算难理解。有错误请指出

    共勉 : )

    相关文章

      网友评论

      • 10Buns:可以更新到rxjava2了
        英勇青铜5: @10Buns 😀😀😀打算学学okhttp,换换口味,然后继续学rxjava2

      本文标题:RxJava——基础学习(十),条件和布尔,连接操作符

      本文链接:https://www.haomeiwen.com/subject/qkfavttx.html