RxJava操作符之组合操作符(六)

作者: SherlockXu8013 | 来源:发表于2017-10-24 13:11 被阅读154次

    前言

    上一篇文章我们学习了过滤类操作符,本篇我们将一起来学习RxJava组合类操作符。组合操作符主要是用来同时处理多个Observable,将他们进行组合创建出新的满足我们需求的Observable,一起来看下都有哪些。

    组合操作符

    Merge

    merge操作符,将两个Observable要发射的观测序列合并为一个序列进行发射。按照两个序列每个元素的发射时间先后进行排序,同一时间点发射的元素则是无序的。

    //将一个发送字母的Observable与发送数字的Observable合并发射
    final String[] words = new String[]{"A", "B", "C", "D", "E", "F", "G", "H", "I"};
    //字母Observable,每200ms发射一次
    Observable<String> wordSequence = Observable.interval(200, TimeUnit.MILLISECONDS)
            .map(new Func1<Long, String>() {
                @Override
                public String call(Long position) {
                    return words[position.intValue()];
                }
            })
            .take(words.length);
    //数字Observable,每500ms发射一次
    Observable<Long> numberSequence = Observable.interval(500, TimeUnit.MILLISECONDS).take(4);
    Observable.merge(wordSequence, numberSequence)
            .subscribe(new Action1<Serializable>() {
                @Override
                public void call(Serializable serializable) {
                    Log.e("rx_test", "merge:" + serializable.toString());
                }
            });
    

    输出结果:

    merge:A
    merge:B
    merge:0
    merge:C
    merge:D
    merge:E
    merge:1
    merge:F
    merge:G
    merge:2
    merge:H
    merge:I
    merge:3
    

    原理图:


    merge操作符还有一种入参merge(Observable[]),可传入含有多个Observable的集合,merge操作符也可将这多个Observable的序列合并后发射。

    MergeDelayError

    mergeDelayError操作符,与merge功能类似,都是用来合并Observable的。不同之处在于mergeDelayError操作符在合并过程中发生异常的话不会立即停止合并,而会在所有元素合并发射完毕之后再发射异常。但发生异常的那个Observable就不会发射数据了。

    //字母Observable,每200ms发射一次,模拟过程中产生一个异常
    Observable<String> wordSequence = Observable.interval(200, TimeUnit.MILLISECONDS)
            .map(new Func1<Long, String>() {
                @Override
                public String call(Long position) {
                    Long cache = position;
                    if (cache == 3) {
                        cache = cache / 0;
                    }
                    return words[position.intValue()];
                }
            })
            .take(words.length);
    //数字Observable,每500ms发射一次
    Observable<Long> numberSequence = Observable.interval(500, TimeUnit.MILLISECONDS).take(4);
    Observable.mergeDelayError(wordSequence, numberSequence)
            .subscribe(new Action1<Serializable>() {
                @Override
                public void call(Serializable serializable) {
                    Log.e("rx_test", "mergeDelayError:" + serializable.toString());
                }
            }, new Action1<Throwable>() {
                @Override
                public void call(Throwable throwable) {
                    Log.e("rx_test", "mergeDelayError:" + throwable.getMessage());
                }
            }, new Action0() {
                @Override
                public void call() {
                    Log.e("rx_test", "mergeDelayError:onComplete");
                }
            });
    

    输出结果:

    mergeDelayError:A
    mergeDelayError:B
    mergeDelayError:0
    mergeDelayError:C
    mergeDelayError:1
    mergeDelayError:2
    mergeDelayError:3
    mergeDelayError:divide by zero
    

    由输出结果可看出,wordSequence在发射到C时抛出了一个异常,停止发射其剩下的数据,但合并没有停止。合并完成之后这个异常才被发射了出来。

    原理图:


    Concat

    concat操作符,将多个Obserbavle发射的数据进行合并后发射,类似于merge操作符。但concat操作符是将Observable依次发射,是有序的。

    Observable<String> wordSequence = Observable.just("A", "B", "C", "D", "E");
    Observable<Integer> numberSequence = Observable.just(1, 2, 3, 4, 5);
    Observable<String> nameSequence = Observable.just("Sherlock", "Holmes", "Xu", "Lei");
    Observable.concat(wordSequence, numberSequence, nameSequence)
            .subscribe(new Action1<Serializable>() {
                @Override
                public void call(Serializable serializable) {
                    Log.e("rx_test", "concat:" + serializable.toString());
                }
            });
    

    输出结果:

    concat:A
    concat:B
    concat:C
    concat:D
    concat:E
    concat:1
    concat:2
    concat:3
    concat:4
    concat:5
    concat:Sherlo
    concat:Holmes
    concat:Xu
    concat:Lei
    

    原理图:

    Zip

    zip(Observable, Observable, Func2)操作符,根据Func2中的call()方法规则合并两个Observable的数据项并发射。
    注意:若其中一个Observable数据发送结束或出现异常后,另一个Observable也会停止发射数据。

    Observable<String> wordSequence = Observable.just("A", "B", "C", "D", "E");
    Observable<Integer> numberSequence = Observable.just(1, 2, 3, 4, 5, 6);
    Observable.zip(wordSequence, numberSequence, new Func2<String, Integer, String>() {
        @Override
        public String call(String s, Integer integer) {
            return s + integer;
        }
    }).subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
            Log.e("rx_test", "zip:" + s);
        }
    });
    

    输出结果:

    zip:A1
    zip:B2
    zip:C3
    zip:D4
    zip:E5
    

    由输出结果可看出numberSequence观测序列最后的6并没有发射出来,由于wordSequence观测序列已发射完所有数据,所以组合序列也停止发射数据了。

    原理图:


    StartWith

    startWith操作符,用于在源Observable发射的数据前,插入指定的数据并发射。

    Observable.just(4, 5, 6, 7)
            .startWith(1, 2, 3)
            .subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    Log.e("rx_test", "startWith:" + integer);
                }
            });
    

    输出结果:

    startWith:1
    startWith:2
    startWith:3
    startWith:4
    startWith:5
    startWith:6
    startWith:7
    

    原理图:


    startWith还有两种入参:
    • startWith(Iterable<T>):可在源Observable发射的数据前插入Iterable数据并发射。
    • startWith(Observable<T>):可在源Observable发射的数据前插入另一Observable发射的数据并发射。

    SwitchOnNext

    switchOnNext操作符,用来将一个发射多个小Observable的源Observable转化为一个Observable,然后发射多个小Observable所发射的数据。若小Observable正在发射数据时,源Observable又发射了新的小Observable,则前一个小Observable还未发射的数据会被抛弃,直接发射新的小Observable所发射的数据,上例子。

    //每隔500ms产生一个Observable
    Observable<Observable<Long>> observable = Observable.interval(500, TimeUnit.MILLISECONDS)
            .map(new Func1<Long, Observable<Long>>() {
                @Override
                public Observable<Long> call(Long aLong) {
                    //每隔200毫秒产生一组数据(0,10,20,30,40)
                    return Observable.interval(200, TimeUnit.MILLISECONDS)
                            .map(new Func1<Long, Long>() {
                                @Override
                                public Long call(Long aLong) {
                                    return aLong * 10;
                                }
                            }).take(5);
                    }
            }).take(2);
            
    Observable.switchOnNext(observable)
            .subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    Log.e("rx_test", "switchOnNext:" + aLong);
                }
            });
    

    输出结果:

    switchOnNext:0
    switchOnNext:10
    switchOnNext:0
    switchOnNext:10
    switchOnNext:20
    switchOnNext:30
    switchOnNext:40
    

    由输出结果发现第一个小Observable打印到10则停止了发射数据,说明其发射到10时,新的小Observable被创建了出来,第一个小Observable则被中断发射,开始发射新的小Observable的数据。

    原理图:


    CombineLatest

    combineLatest操作符,用于将两个Observale最近发射的数据以Func2函数的规则进行组合并发射。

    //引用merge的例子
    final String[] words = new String[]{"A", "B", "C", "D", "E", "F", "G", "H", "I"};
    Observable<String> wordSequence = Observable.interval(300, TimeUnit.MILLISECONDS)
            .map(new Func1<Long, String>() {
                @Override
                public String call(Long position) {
                    return words[position.intValue()];
                }
            })
            .take(words.length);
    Observable<Long> numberSequence = Observable.interval(500, TimeUnit.MILLISECONDS)
            .take(5);
    Observable.combineLatest(wordSequence, numberSequence,
            new Func2<String, Long, String>() {
                @Override
                public String call(String s, Long aLong) {
                    return s + aLong;
                }
            })
            .subscribe(new Action1<Serializable>() {
                @Override
                public void call(Serializable serializable) {
                    Log.e("rx_test", "combineLatest:" + serializable.toString());
                }
            });
    

    输出结果:

    combineLatest:A0
    combineLatest:B0
    combineLatest:C0
    combineLatest:C1
    combineLatest:D1
    combineLatest:E1
    combineLatest:E2
    combineLatest:F2
    combineLatest:F3
    combineLatest:G3
    combineLatest:H3
    combineLatest:H4
    combineLatest:I4
    

    如果将wordSequence与numberSequence的入参顺序互换,输出结果也会不同:

    combineLatest:0A
    combineLatest:0B
    combineLatest:0C
    combineLatest:1C
    combineLatest:1D
    combineLatest:2D
    combineLatest:2E
    combineLatest:2F
    combineLatest:3F
    combineLatest:3G
    combineLatest:3H
    combineLatest:4H
    combineLatest:4I
    

    wordSequence每300ms发射一个字符,numberSequence每500ms发射一个数字。可能有些码友不知道这个输出结果怎么来的,这个操作符确实不太好理解。我们来看一下这个原理图就很清楚了。

    原理图:

    Join

    join(Observable, Func1, Func1, Func2)操作符,类似于combineLatest操作符,用于将ObservableA与ObservableB发射的数据进行排列组合。但join操作符可以控制Observable发射的每个数据的生命周期,在每个发射数据的生命周期内,可与另一个Observable发射的数据按照一定规则进行合并,来看下join的几个入参。

    • Observable:需要与源Observable进行组合的目标Observable。
    • Func1:接收从源Observable发射来的数据,并返回一个Observable,这个Observable的声明周期决定了源Obsrvable发射出来的数据的有效期;
    • Func1:接收目标Observable发射来的数据,并返回一个Observable,这个Observable的声明周期决定了目标Obsrvable发射出来的数据的有效期;
    • Func2:接收从源Observable和目标Observable发射出来的数据,并将这两个数据按自定的规则组合后返回。
    //产生字母的序列,周期为1000ms
    String[] words = new String[]{"A", "B", "C", "D", "E", "F", "G", "H"};
    Observable<String> observableA = Observable.interval(1000, TimeUnit.MILLISECONDS)
            .map(new Func1<Long, String>() {
                @Override
                public String call(Long aLong) {
                    return words[aLong.intValue()];
                }
            }).take(8);
    //产0,1,2,3,4,5,6,7的序列,延时500ms发射,周期为1000ms
    Observable<Long> observableB = Observable.interval(500, 1000, TimeUnit.MILLISECONDS)
            .map(new Func1<Long, Long>() {
                @Override
                public Long call(Long aLong) {
                    return aLong;
                }
            }).take(words.length);
    //join
    observableA.join(observableB,
            new Func1<String, Observable<Long>>() {
                @Override
                public Observable<Long> call(String s) {
                    //ObservableA发射的数据有效期为600ms
                    return Observable.timer(600, TimeUnit.MILLISECONDS);
                }
            },
            new Func1<Long, Observable<Long>>() {
                @Override
                public Observable<Long> call(Long aLong) {
                    //ObservableB发射的数据有效期为600ms
                    return Observable.timer(600, TimeUnit.MILLISECONDS);
                }
            },
            new Func2<String, Long, String>() {
                @Override
                public String call(String s, Long aLong) {
                    return s + aLong;
                }
            }
    ).subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
            Log.e("rx_test", "join:" + s);
        }
    });
    

    join操作符的组合方式类似于数学上的排列组合规则,以ObservableA为基准源Observable,按照其自身周期发射数据,且每个发射出来的数据都有其有效期。而ObservableB每发射出来一个数据,都与A发射出来的并且还在有效期内的数据按Func2函数中的规则进行组合,B发射出来的数据也有其有效期。最后再将结果发射给观察者进行处理。

    输出结果:

    join:A0
    join:A1
    join:B1
    join:B2
    join:C2
    join:C3
    join:D3
    join:D4
    join:E4
    join:E5
    join:F5
    join:F6
    join:G6
    join:G7
    join:H7
    

    原理图:

    GroupJoin

    groupJoin操作符,类似于join操作符,区别在于第四个参数Func2的传入函数不同,对join之后的结果包装了一层小的Observable,便于用户再次进行一些过滤转换等操作再发射给Observable。

    observableA.groupJoin(observableB,
            new Func1<String, Observable<Long>>() {
                @Override
                public Observable<Long> call(String s) {
                    return Observable.timer(600, TimeUnit.MILLISECONDS);
                }
            },
            new Func1<Long, Observable<Long>>() {
                @Override
                public Observable<Long> call(Long aLong) {
                    return Observable.timer(600, TimeUnit.MILLISECONDS);
                }
            },
            new Func2<String, Observable<Long>, Observable<String>>() {
                @Override
                public Observable<String> call(final String s, Observable<Long> longObservable) {
                    return longObservable.map(new Func1<Long, String>() {
                        @Override
                        public String call(Long aLong) {
                            return s + aLong;
                        }
                    });
                }
            })
            .subscribe(new Action1<Observable<String>>() {
                @Override
                public void call(Observable<String> stringObservable) {
                    stringObservable.subscribe(new Action1<String>() {
                        @Override
                        public void call(String s) {
                            Log.e("rx_test", "groupJoin:" + s);
                        }
                    });
                }
            });
    

    输出结果:

    groupJoin:A0
    groupJoin:A1
    groupJoin:B1
    groupJoin:B2
    groupJoin:C2
    groupJoin:C3
    groupJoin:D3
    groupJoin:D4
    groupJoin:E4
    groupJoin:E5
    groupJoin:F5
    groupJoin:F6
    groupJoin:G6
    groupJoin:G7
    groupJoin:H7
    

    原理图:


    总结

    到此,本篇关于RxJava的常用组合类操作符就讲解完毕了。通过以上四篇文章对RxJava四类操作符的学习,相信大家已经基本掌握RxJava如何使用了。实践是检验真理的唯一标准,下一篇我们来一起上项目看看实践中如何使用RxJava。

    技术渣一枚,有写的不对的地方欢迎大神们留言指正,有什么疑惑或者建议也可以在我Github上RxJavaDemo项目Issues中提出,我会及时回复。

    附上RxJavaDemo的地址:
    RxJavaDemo

    相关文章

      网友评论

        本文标题:RxJava操作符之组合操作符(六)

        本文链接:https://www.haomeiwen.com/subject/ozutpxtx.html