美文网首页
RxJava基础四-组合操作符

RxJava基础四-组合操作符

作者: 清水杨杨 | 来源:发表于2019-05-02 16:41 被阅读0次

    此篇内容均是来自书籍《RxJava响应式编程》李衍顺 著

    3.4 组合操作符

    组合操作符会将多个Observable发送的数据按照一定的规则组合起来,这在汇总各种结果的时候就显得非常有用了。

    3.4.1 combineLatest

    combineLatest操作符可以将2~9个Observable发送的数据组合起来,然后再发送出来。不过还需要两个前提:

    1. 所有要组合的Observable都发送过数据。所以只要有任何Observable还未发送过数据,combineLastest操作符就不会开始发送组合的数据。
    2. 满足条件1的前提下,任何一个Observable发送一个数据,combineLatest就将所有Observable最新发送的数据按照提供的函数组装起来发送出去。

    RxJava实现combineLatest操作符可以让我们直接将需要组装的Observable对象作为参数传值,也可以将所有的Observable装在一个List里面传进去。

    private Observable<Long> createObserver(int index){
            return Observable.interval(0, 500*index, TimeUnit.MILLISECONDS);
        }
        private Observable<String> combineLatestObserver(){
            return Observable.combineLatest(createObserver(1), createObserver(2),
                    new Func2<Long, Long, String>() {
                        @Override
                        public String call(Long aLong, Long aLong2) {
                            return ("left: "+ aLong + "right: " + aLong2);
                        }
                    });
        }
        private Observable<String> combineListObserver(){
            List<Observable<Long>> list = new ArrayList<Observable<Long>>();
            for(int i=1; i<3; i++){
                list.add(createObserver(i));
            }
            return Observable.combineLatest(list, new FuncN<String>() {
                @Override
                public String call(Object... args) {
                    String temp= "";
                    for(Object i :args){
                        temp = temp + ":" + i;
                    }
                    return temp;
                }
            });
        }
        private void combineLatest(){
            combineLatestObserver().subscribe(new Action1<String>() {
                @Override
                public void call(String s) {
                    System.out.println("Combinelatest: " + s);
                }
            });
            combineListObserver().subscribe(new Action1<String>() {
                @Override
                public void call(String s) {
                    System.out.println("Combinelist:" + s);
                }
            });
    
        }
    
    Combinelatest: left: 1right: 0
    Combinelatest: left: 2right: 0
    Combinelatest: left: 3right: 0
    Combinelatest: left: 3right: 1
    Combinelatest: left: 4right: 1
    Combinelatest: left: 5right: 1
    Combinelatest: left: 5right: 2
    Combinelist::1:0
    Combinelist::2:0
    Combinelist::3:0
    Combinelist::3:1
    Combinelist::4:1
    Combinelist::5:1
    Combinelist::5:2
    (这个是书上的答案,自己写的程序只发送了一次, 都是0)
    

    3.4.2 join 和 groupJoin

    join操作符根据时间窗口来组合两个Observable发送的数据,每个Observable都有一个自己的时间窗口,在这个时间窗口内的数据都是有效的,可以拿来组合。

    RxJava还实现了groupJoin,它和join基本相同,只是通过groupJoin操作符组合后,发送出来的是一个个小的Observable,每个Observable里面包含了一轮组合数据。

    join操作符需要对两个Observable进行操作,为了区别,我们用left和right来区分要组合的这两个Observable,即LeftObservable和RightObservable。使用join操作符需要4个参数,他们分别是:

    • 要被组合到LeftObservable的RightObservable
    • 一个函数,接收从LeftObservable发送来的数据,并返回一个Observable,这个Observable的生命周期决定了LeftObservable发送出来的数据的有效期。
    • 一个函数,接收从RightObservable发送来的数据,并返回一个Observable,这个Observable的生命周期决定了RightObservable发送出来的数据的有效期
    • 一个函数,接收LeftObservable和RightObservable发送来的数据,并返回最终组合完的数据。

    下面我们使用join和groupJoin操作符分别来组合两个Observable对象。其中LeftObservable会一次发送abc三个字母,而RightObservable会一次发送123三个数据。在组合的时候, 我们使用timer操作符创建了临时Observable,给出的时间都是1000毫秒,因为just操作符会很快吧所有的数据都发送出来,所以可以认为在1000毫秒这个窗口期内,所有的数据都是有效的。

    /**
         * join && groupJoin
         */
        private Observable<String> getLeftObservable(){
            return Observable.just("a", "b", "c");
        }
        private Observable<Long> getRightObservable(){
            return Observable.just(1l,2l,3l);
        }
    
        private Observable<String> joinObserver(){
            return getLeftObservable()
            .join(getRightObservable(),
                    new Func1<String, Observable<Long>>() {
                        @Override
                        public Observable<Long> call(String s) {
                            return Observable.timer(1000, TimeUnit.MILLISECONDS);
                        }
                    }, new Func1<Long, Observable<Long>>() {
                        @Override
                        public Observable<Long> call(Long aLong) {
                            return Observable.timer(1000, TimeUnit.MILLISECONDS);
                        }
                    }, new Func2<String, Long, String>() {
                        @Override
                        public String call(String s, Long aLong) {
                            return s + ":" + aLong;
                        }
                    });
        }
        private Observable<Observable<String>> groupJoinObserver() {
            return getLeftObservable().groupJoin(getRightObservable()
                    , new Func1<String, Observable<Long>>() {
                        @Override
                        public Observable<Long> call(String s) {
                            return Observable.timer(1000, TimeUnit.MILLISECONDS);
                        }
                    }, new Func1<Long, Observable<Long>>() {
                        @Override
                        public Observable<Long> call(Long aLong) {
                            return Observable.timer(1000, TimeUnit.MILLISECONDS);
                        }
                    }, new Func2<String, Observable<Long>, Observable<String>>() {
                        @Override
                        public Observable<String> call(final String s, Observable<Long> longObservable) {
                            return longObservable.map(new Func1<Long, String>() {
                                @Override
                                public String call(Long aLong) {
                                    return s + ":" + aLong;
                                }
                            });
                        }
                    });
        }
        private void join_groupJoin(){
            joinObserver().subscribe(new Action1<String>() {
                @Override
                public void call(String s) {
                    System.out.println("join:"+ s);
                }
            });
            groupJoinObserver().first().subscribe(new Action1<Observable<String>>() {
                @Override
                public void call(Observable<String> stringObservable) {
                    System.out.println("groupJoin: "+ stringObservable);
                }
            });
        }
    结果:
    join:a:1
    join:b:1
    join:c:1
    join:a:2
    join:b:2
    join:c:2
    join:a:3
    join:b:3
    join:c:3
    groupJoin: rx.Observable@6adca536
    

    3.4.3 merge 和 mergeDelayError

    merge操作符将多个Observable发送的数据整合起来发送, 对外如同是一个Observable发送的数据一样。但其发送的数据可能是交错的,如果想有交错,可以使用concat操作符。

    当一个Observable发出onError的时候,merge的过程会被停止并将错误分发给Subscriber,如果不想让错误中止merge过程,可以使用mergeDelayError操作符,它会在merge结束后再分发错误。

    /**
         * merge && mergeDelayError
         */
        private void merge_mergeDelayError() {
            Observable.merge(Observable.just(1, 2, 3), Observable.just(4, 5, 6))
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            System.out.println("Merge: " + integer);
                        }
                    });
    
            Observable.mergeDelayError(Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    for (int i = 0; i < 5; i++) {
                        if (i == 3) {
                            subscriber.onError(new Throwable("error"));
                        }
                        subscriber.onNext(i);
                    }
                }
            }), Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    for (int i = 0; i < 5; i++) {
                        subscriber.onNext(5 + i);
                    }
                    subscriber.onCompleted();
                }
            })).subscribe(new Subscriber<Integer>() {
                @Override
                public void onCompleted() {
                    System.out.println("onCompleted");
                }
    
                @Override
                public void onError(Throwable e) {
                    System.out.println("mergeDelayError:" + e);
                }
    
                @Override
                public void onNext(Integer integer) {
                    System.out.println("mergeDelayNext: " + integer);
                }
            });
        }
    结果:
    Merge: 1
    Merge: 2
    Merge: 3
    Merge: 4
    Merge: 5
    Merge: 6
    mergeDelayNext: 0
    mergeDelayNext: 1
    mergeDelayNext: 2
    mergeDelayNext: 3
    mergeDelayNext: 4
    mergeDelayNext: 5
    mergeDelayNext: 6
    mergeDelayNext: 7
    mergeDelayNext: 8
    mergeDelayNext: 9
    mergeDelayError:java.lang.Throwable: error
    

    3.4.4 startWith

    startWith操作符会在源Observable发送的数据前面插入一些数据。startWith不仅可以插入一些数据,还可以将Iterable和Observable插入进去。如果插入的是Observable,则这个Observable发送的数据会插入到源Observable发送的数据前面。创建一个Observable并使用startWith操作符在其前面插入两个数据

    /**
         * startWith
         */
        private void startWith(){
            Observable.just(1,2,3).startWith(-1,0)
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            System.out.println("startWith: "+ integer);
                        }
                    });
        }
    结果:
    startWith: -1
    startWith: 0
    startWith: 1
    startWith: 2
    startWith: 3
    

    3.4.5 switch

    在RxJava中, 源Observable发送出来的数据可能是一个小小的Observable,如果订阅者只对这些小的Observable所发送的数据感兴趣,就需要使用switch操作符。switch操作符在RxJava上的实现为switchOnNext, 用来将源Observable发送出来的多个小Observable组合为一个Observable,然后发送这些多个小Observable所发送的数据。需要注意的是,如果一个小的Observable正在发送数据时,源Observable又发送出一个新的小Observable,则前一个Observable未发生的数据会被抛弃,直接发送小Observable所发送的数据。

    创建一个Observable,使他每隔3秒发送一个小的Observable对象,而这些小的Observable会每隔1秒依次发送0,1,2,3,4这几个数字。

    /**
         * switch
         */
        private Observable<String> createObserver(final Long index) {
            return Observable.interval(1000, 1000, TimeUnit.MILLISECONDS)
                    .take(5)
                    .map(new Func1<Long, String>() {
                        @Override
                        public String call(Long aLong) {
                            return index + "-" + aLong;
                        }
                    });
        }
    
        private Observable<String> switchObserver() {
            return Observable.switchOnNext(Observable
                    .interval(0, 3000, TimeUnit.MILLISECONDS)
                    .take(3)
                    .map(new Func1<Long, Observable<String>>() {
                        @Override
                        public Observable<String> call(Long aLong) {
                            return createObserver(aLong);
                        }
                    }));
        }
    
        private void switchOperator() {
            switchObserver().subscribe(new Action1<String>() {
                @Override
                public void call(String s) {
                    System.out.println("switch: " + s);
                }
            });
        }
    (笔者亲验,跑步起来。估计又是interval用错了,待纠正)
    

    书上结果:
    订阅后输出的结果如下。前面的数字代表了小Observable的序号, 后面的数字代表了小Observable发送的数据。可以看到前面序号是0和1的小Observable都只发送出了两个数据,这是因为它们发送了两个数据后,新的小Observable就被发送出来了,所以序号是0和1的小Observable未发送的数据丢弃了。而对于序号为2的小Observable,因为其后面没有新的小Observable被发送出来,所以它可以将所有的数剧都发送出来。

    switch:0-0
    switch:0-1
    switch:1-0
    switch:1-1
    switch:2-0
    switch:2-1
    switch:2-2
    switch:2-3
    switch:2-4
    

    3.4.6 zip 和 zipWith

    zip操作符将多个Observable发送的数据按顺序组合起来。与join操作符不同的是,join中的每个数据可以组合多次,zip操作符里的每个数据只能组合一次,而且都是有序的。最终组合的数据的数量由发送数据最少的Observable来决定。

    首先创建一个createObserver的方法,根据传入的index的数值返回一个会发送index个数据的Observable对象,然后使用zipWith来组合一个发送两个数据和一个发送三个数据的Observable;使用zip操作符将分别发送二个, 三个和四个数据的三个Observable对象组合起来。

    /**
         * zip && zipWith
         */
        private Observable<String> zipWithObserver(){
            return createObserverZip(2).zipWith(createObserverZip(3),
                    new Func2<String, String, String>() {
                        @Override
                        public String call(String s, String s2) {
                            return s + "-" + s2;
                        }
                    });
        }
        private Observable<String> zipWithIterableObserver(){
            return Observable.zip(createObserverZip(2),
                    createObserverZip(3),
                    createObserverZip(4),
                    new Func3<String, String, String, String>() {
                        @Override
                        public String call(String aLong, String aLong2, String aLong3) {
                            return aLong + "-" + aLong2 + "_" + aLong3;
                         }
                    });
        }
        private Observable<String> createObserverZip(final int index){
            return Observable.interval(100, TimeUnit.MILLISECONDS).take(index)
                    .map(new Func1<Long, String>() {
                        @Override
                        public String call(Long aLong) {
                            return index + ":" + aLong;
                        }
                    });
        }
        private void zip_zipWith(){
            zipWithObserver().subscribe(new Action1<String>() {
                @Override
                public void call(String s) {
                    System.out.println("zipWith: "+ s);
                }
            });
            zipWithIterableObserver().subscribe(new Action1<String>() {
                @Override
                public void call(String s) {
                    System.out.println("zip: " + s);
                }
            });
        }
    (验证又没有结果输出)
    但是书上结果如下:
    zipWith:2:0-3:0
    zipWith:2:1-3:1
    zip:2:0-3:0-4:0
    zip:2:1-3:1-4:1
    

    无论是zipWith还是zip操作符,最终都输出了两个数组后的数据,这是因为他们组合的Observable中包含一个只发送两个数据的Observable,而每个数据最多只能组合一次,所以即使其他的Observable对象会发送多余两个的数据 也不会被组合进来了, 可以用木桶理论来理解zip。

    相关文章

      网友评论

          本文标题:RxJava基础四-组合操作符

          本文链接:https://www.haomeiwen.com/subject/zhinnqtx.html