前言
上一篇文章我们学习了flatMap和concatMap,他们两个都是将上游发送的数据都封装成一个个的Observable,再有一个Observable进行发送。本文我们将在学习一个新的操作符-zip
zip
zip专用于合并事件,该合并不是连接(flatMap,concatMap),而是两两配对。它按照严格的顺序应用这个函数。因此它只发射与发射数据项最少的那个Observable对象一样多的数据。
zip.png
通过分解动作我们可以看出:
1、组合的过程是分别从两根水管里严格按照事件的发送顺序各取出一个事件来进行组合, 并且每一个事件只会被使用一次,也就是说不会出现圆形1事件和三角形B事件进行合并,也不可能出现圆形2和三角形A进行合并的情况.
2、最终下游收到的事件数量是和上游中发送事件最少的那一根水管的事件数量相同. 这个也很好理解, 因为是从每一根水管 里取一个事件来进行合并, 最少的 那个肯定就最先取完 , 这个时候其他的水管尽管还有事件 , 但是已经没有足够的事件来组合了, 因此下游就不会收到剩余的事件了。
代码如下:
Observable<Integer> observable_one = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onNext(4);
}
}).subscribeOn(Schedulers.io());
Observable<String> observable_two = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
emitter.onNext("B");
emitter.onNext("C");
}
}).subscribeOn(Schedulers.io());
Observable.zip(observable_one,observable_two, new BiFunction<Integer, String,String>() {
@Override
public String apply(Integer integer, String string) throws Exception {
return integer+string;
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.e("onNext",s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
测试结果如下:
07-02 09:42:35.010 19642-19642/com.zhqy.myrxjava E/onNext: 1A
07-02 09:42:35.010 19642-19642/com.zhqy.myrxjava E/onNext: 2B
07-02 09:42:35.010 19642-19642/com.zhqy.myrxjava E/onNext: 3C
应用范围
zip操作符可以应用在界面所需要的数据需要在两个或以上的接口的数据,当获取到两个接口的数据后再进行展示。
merge
将多个上游合并为一个上游,注意与zip的区别,merge只是将多个上游发送的事件都放在一个上游中进行发送。
代码如下:
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
});
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
emitter.onNext("B");
emitter.onNext("C");
emitter.onNext("D");
}
});
Observable.merge(observable1,observable2)
.subscribe(new Consumer<Serializable>() {
@Override
public void accept(Serializable serializable) throws Exception {
Log.e("accept",serializable+"");
}
});
测试结果如下:
07-02 11:50:51.225 20224-20224/com.zhqy.myrxjava E/accept: 1
07-02 11:50:51.226 20224-20224/com.zhqy.myrxjava E/accept: 2
07-02 11:50:51.226 20224-20224/com.zhqy.myrxjava E/accept: 3
07-02 11:50:51.226 20224-20224/com.zhqy.myrxjava E/accept: A
07-02 11:50:51.226 20224-20224/com.zhqy.myrxjava E/accept: B
07-02 11:50:51.226 20224-20224/com.zhqy.myrxjava E/accept: C
07-02 11:50:51.226 20224-20224/com.zhqy.myrxjava E/accept: D
startWith/startWithArray
在Observable开始发射他们的数据之前,startWith()通过传递一个参数来先发射一个数据序列。
代码如下:
Observable.just("1","2","3","4","5")
.startWith("我是第一个插入的数据")
.startWithArray("我是第二个插入的数据","我是第三个插入的数据")
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("accept",s);
}
});
测试结果如下:
07-02 13:52:47.579 15192-15192/com.zhqy.myrxjava E/accept: 我是第二个插入的数据
07-02 13:52:47.579 15192-15192/com.zhqy.myrxjava E/accept: 我是第三个插入的数据
07-02 13:52:47.580 15192-15192/com.zhqy.myrxjava E/accept: 我是第一个插入的数据
07-02 13:52:47.580 15192-15192/com.zhqy.myrxjava E/accept: 1
07-02 13:52:47.580 15192-15192/com.zhqy.myrxjava E/accept: 2
07-02 13:52:47.580 15192-15192/com.zhqy.myrxjava E/accept: 3
07-02 13:52:47.580 15192-15192/com.zhqy.myrxjava E/accept: 4
07-02 13:52:47.580 15192-15192/com.zhqy.myrxjava E/accept: 5
flatMapIterable
latMapIterable() 和flatMap()功能在流程上大体一致,唯一不同的是,flatMap是转一个Observable转换成多个Observable,每一个Observable最后又返回一个Observable。而flatMapInterable是将一个Observable转换成多个Observable,但是每一个Observable最后返回得是Iterable。Iterable,可以理解成返回一个list
代码如下:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(2);
}
}).flatMapIterable(new Function<Integer, Iterable<String>>() {
@Override
public Iterable<String> apply(Integer integer) throws Exception {
ArrayList list=new ArrayList();
list.add(integer+"a");
list.add(integer+"b");
return list;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("accept",s);
}
});
测试结果
07-03 14:09:00.219 30310-30310/com.zhqy.myrxjava E/accept: 1a
07-03 14:09:00.219 30310-30310/com.zhqy.myrxjava E/accept: 1b
07-03 14:09:00.219 30310-30310/com.zhqy.myrxjava E/accept: 2a
07-03 14:09:00.219 30310-30310/com.zhqy.myrxjava E/accept: 2b
07-03 14:09:00.219 30310-30310/com.zhqy.myrxjava E/accept: 2a
07-03 14:09:00.219 30310-30310/com.zhqy.myrxjava E/accept: 2b
scan
将数据以一定的逻辑聚合起来,并将计算结果发送出去作为下个数据应用函数时的第一个参数使用
代码如下:
Observable.just(1,2,3,4,5)
.scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer+integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("accept",integer+"");
}
});
测试结果
07-03 14:21:30.523 3352-3352/com.zhqy.myrxjava E/accept: 1
07-03 14:21:30.523 3352-3352/com.zhqy.myrxjava E/accept: 3
07-03 14:21:30.523 3352-3352/com.zhqy.myrxjava E/accept: 6
07-03 14:21:30.523 3352-3352/com.zhqy.myrxjava E/accept: 10
07-03 14:21:30.523 3352-3352/com.zhqy.myrxjava E/accept: 15
groupby
将原始Observable发送的数据按照key进行分组,每个分组都会返回一个Observable,这些Observable分别发射其包含的数据。
代码如下:
Observable.just(1,2,3,4,5)
.groupBy(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
return integer%2;
}
}).subscribe(new Consumer<GroupedObservable<Integer, Integer>>() {
@Override
public void accept(final GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) throws Exception {
integerIntegerGroupedObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("accept",integerIntegerGroupedObservable.getKey()+":"+integer);
}
});
}
});
测试结果
07-03 14:31:33.472 6798-6798/com.zhqy.myrxjava E/accept: 1:1
07-03 14:31:33.472 6798-6798/com.zhqy.myrxjava E/accept: 0:2
07-03 14:31:33.472 6798-6798/com.zhqy.myrxjava E/accept: 1:3
07-03 14:31:33.472 6798-6798/com.zhqy.myrxjava E/accept: 0:4
07-03 14:31:33.472 6798-6798/com.zhqy.myrxjava E/accept: 1:5
window
发送指定数量事件时,就将这些事件分为一组。
代码如下:
Observable.just(1,2,3,4,5)
.window(3)
.subscribe(new Observer<Observable<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("onSubscribe","onSubscribe");
}
@Override
public void onNext(Observable<Integer> integerObservable) {
integerObservable.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("onSubscribe","onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e("onNext",integer+"");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.e("onComplete","onComplete");
}
});
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.e("onComplete","onComplete");
}
});
测试结果
07-03 14:39:32.474 9254-9254/com.zhqy.myrxjava E/onSubscribe: onSubscribe
07-03 14:39:32.488 9254-9254/com.zhqy.myrxjava E/onSubscribe: onSubscribe
07-03 14:39:32.488 9254-9254/com.zhqy.myrxjava E/onNext: 1
07-03 14:39:32.488 9254-9254/com.zhqy.myrxjava E/onNext: 2
07-03 14:39:32.488 9254-9254/com.zhqy.myrxjava E/onNext: 3
07-03 14:39:32.488 9254-9254/com.zhqy.myrxjava E/onComplete: onComplete
07-03 14:39:32.488 9254-9254/com.zhqy.myrxjava E/onSubscribe: onSubscribe
07-03 14:39:32.488 9254-9254/com.zhqy.myrxjava E/onNext: 4
07-03 14:39:32.488 9254-9254/com.zhqy.myrxjava E/onNext: 5
07-03 14:39:32.488 9254-9254/com.zhqy.myrxjava E/onComplete: onComplete
07-03 14:39:32.488 9254-9254/com.zhqy.myrxjava E/onComplete: onComplete
网友评论