美文网首页Android-RxJavaAndroid开发Android知识
Rxjava操作符讲解(4)Combining 结合操作

Rxjava操作符讲解(4)Combining 结合操作

作者: Forrest_周高民 | 来源:发表于2017-01-06 09:18 被阅读123次

    之前没有了解过Rxjava的童鞋,建议先阅读《Rxjava 从入门到开发》这篇文章,对入门比较有帮助。

    StartWith操作符

    作用:在数据序列的开头增加一项数据

    Observable.just(1,2).startWith(Observable.just(-1)).observeOn(Schedulers.io()).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer aLong) {
                Logger.i(String.valueOf(aLong));
            }
        });
    

    输出结果:

    例子说明:在Startwith的作用下-1输出在结果的最前面。

    CombineLatest操作符

    作用:当两个Observables中的任何一个发射了数据时,使用一个函数结合每个Observable发射的最近数据项,并且基于这个函数的结果发射数据。

         //产生0,5,10数列
        Observable<Long> observable1 = Observable.interval(1000, TimeUnit.MILLISECONDS)
                .map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong * 5;
                    }
                }).take(3);
    
        //产生0,10,20数列
        Observable<Long> observable2 = Observable.interval(500, TimeUnit.MILLISECONDS)
                .map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong * 10;
                    }
                }).take(3);
    
    
        subscription=Observable.combineLatest(observable1, observable2, new Func2<Long, Long, Long>() {
            @Override
            public Long call(Long aLong, Long aLong2) {
                System.out.println("aLong: " + aLong+"  aLong2: "+aLong2);
    
                return aLong+aLong2;
            }
        }).subscribe(new Subscriber<Long>() {
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }
    
            @Override
            public void onError(Throwable e) {
                System.err.println("Error: " + e.getMessage());
            }
    
            @Override
            public void onNext(Long aLong) {
                System.out.println("Next: " + aLong);
            }
        });
    

    输出结果:

    例子说明:observable1输出的是0,5,10;observable2输出的是0,10,20,输出结果的关系如下图:


    数字的间隔就是时间差。

    Join操作符

    作用:join操作符把类似于combineLatest操作符,也是两个Observable产生的结果进行合并,合并的结果组成一个新的Observable,但是join操作符可以控制每个Observable产生结果的生命周期,在每个结果的生命周期内,可以与另一个Observable产生的结果按照一定的规则进行合并。

    join方法有四个参数,具体定义如下:
    observableA.join(observableB, observableA产生结果生命周期控制函数, observableB产生结果生命周期控制函数, observableA产生的结果与observableB产生的结果的合并规则)

        //产生0,5数列
        Observable<Long> observable1 = Observable.interval(1000, TimeUnit.MILLISECONDS)
                .map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong * 5;
                    }
                }).take(3);
    
        //产生0,10,20数列
        Observable<Long> observable2 = Observable.interval(1000, TimeUnit.MILLISECONDS)
                .map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong * 10;
                    }
                }).take(3);
    
        subscription=observable1.join(observable2, new Func1<Long, Observable<Long>>() {
            @Override
            public Observable<Long> call(Long aLong) {
                return Observable.just(aLong).delay(500, TimeUnit.MILLISECONDS);
            }
        }, new Func1<Long, Observable<Long>>() {
            @Override
            public Observable<Long> call(Long aLong) {
                return Observable.just(aLong).delay(1000, TimeUnit.MILLISECONDS);
            }
        }, new Func2<Long, Long, Long>() {
            @Override
            public Long call(Long aLong, Long aLong2) {
                System.out.println("aLong: " + aLong+"  aLong2: "+aLong2);
                return aLong + aLong2;
            }
        }).subscribe(new Subscriber<Long>() {
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }
    
            @Override
            public void onError(Throwable e) {
                System.err.println("Error: " + e.getMessage());
            }
    
            @Override
            public void onNext(Long aLong) {
                System.out.println("Next: " + aLong);
            }
        });
    

    输出结果:

    例子说明:其实输出排列方式和combineLatest差不多,主要差别在通过join可以进一步控制observable结果的输出时间周期,另外顺便一提的是如果observable1输出的数据只有两个,那么即使observable2输出3个数据,但最终输出的也仅有两个。

    Merge操作符

    作用:将多个Observable合并为一个。

        //产生0,5数列
        Observable<Long> observable1 = Observable.interval(1000, TimeUnit.MILLISECONDS)
                .map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong * 5;
                    }
                }).take(2);
    
        //产生0,10,20数列
        Observable<Long> observable2 = Observable.interval(1000, TimeUnit.MILLISECONDS)
                .map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong * 10;
                    }
                }).take(3);
    
        subscription=Observable.merge(observable1,observable2).subscribe(new Action1<Long>() {
            @Override
            public void call(Long aLong) {
                System.out.println("out: " + aLong);
            }
        });
    

    输出结果:

    例子说明:把observable1,observable2的数据合并输出。

    switchOnNext操作符

    作用:把一组Observable转换成一个Observable,转换规则为:对于这组Observable中的每一个Observable所产生的结果,如果在同一个时间内存在两个或多个Observable提交的结果,只取最后一个Observable提交的结果给订阅者。

    Observable<Observable<Long>> observable = Observable.interval(500, TimeUnit.MILLISECONDS).map(new Func1<Long, Observable<Long>>() {
            @Override
            public Observable<Long> call(Long aLong) {
                //每隔200毫秒产生一组数据(0,10,20,30,40)
                return Observable.interval(200, TimeUnit.MILLISECONDS).map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong * 10;
                    }
                }).take(3);
            }
        }).take(2);
    
        subscription=Observable.switchOnNext(observable).subscribe(new Action1<Long>() {
            @Override
            public void call(Long aLong) {
                System.out.println("Next:" + aLong);
            }
        });
    

    输出结果:

    Paste_Image.png

    例子说明:没人Switch的作用下,输出结果为“0,10,20,0,10,20”,但是在Switch的作用下,结果为“0,10,0,10,20”,少了“20”,这就是switch的作用。

    zip操作符

    作用:把两个observable提交的结果,严格按照顺序进行合并。

       Observable<Long> observable1=  Observable.interval(1, TimeUnit.SECONDS).take(3);
        Observable<Long> observable2=  Observable.interval(1, TimeUnit.SECONDS).take(4);
        subscription=Observable.zip(observable1, observable2, new Func2<Long, Long, Integer>() {
    
            @Override
            public Integer call(Long aLong, Long aLong2) {
                System.out.println("aLong:" + aLong+"   aLong2:  "+aLong2);
    
                return (int) (aLong+aLong);
            }
        }).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                System.out.println("result:" + integer);
            }
        });
    

    输出结果:

    例子说明:最终输出为“0,2,4”,两个Observable输出的数据一一按顺序对应,无法对应的数据就抛弃,例子中的observable2本应该输出4个数据take(4)但是由于操作符zip的作用下,最终只输出3个数据。

    以上便是结合操作符的主要内容了。rxjava的主要操作符讲了差不多,有啥问题欢迎大家留言交流下😊,欢迎关注。

    附录:
    文章demo

    参考:
    ReactiveX/RxJava文档中文版
    Android RxJava使用介绍(四) RxJava的操作符

    相关文章

      网友评论

        本文标题:Rxjava操作符讲解(4)Combining 结合操作

        本文链接:https://www.haomeiwen.com/subject/knsovttx.html