Outline
[TOC]
前言
在上一节中, 我们提到了Flowable
和Backpressure
背压, 本来这一节的确是想讲这两个东西的,可是写到一半感觉还是差点火候,感觉时机未到, 因此,这里先来做个准备工作, 先带大家学习zip
这个操作符, 这个操作符也是比较牛逼的东西了, 涉及到的东西也比较多, 主要是一些细节上的东西太多, 通过学习这个操作符,可以为我们下一节的Backpressure
做个铺垫.
正题
照惯例我们还是先贴上一下比较正式的解释吧.
Zip
通过一个函数将多个Observable发送的事件结合到一起,然后发送这些组合到一起的事件. 它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable一样多的数据。
我们再用通俗易懂的图片来解释一下:

从这个图中可以看见, 这次上游和以往不同的是, 我们有两根水管了.
其中一根水管负责发送圆形事件
, 另外一根水管负责发送三角形事件
, 通过Zip操作符, 使得圆形事件
和三角形事件
合并为了一个矩形事件
.
下面我们再来看看分解动作:

通过分解动作我们可以看出:
- 组合的过程是
分别从
两根水管里各取出一个事件
来进行组合, 并且一个事件只能被使用一次,
组合的顺序是严格按照事件发送的顺利
来进行的, 也就是说不会出现圆形1
事件和三角形B
事件进行合并, 也不可能出现圆形2
和三角形A
进行合并的情况. - 最终
下游收到的事件数量
是和上游中发送事件最少的那一根水管的事件数量
相同. 这个也很好理解, 因为是从每一根水管
里取一个事件来进行合并,最少的
那个肯定就最先取完
, 这个时候其他的水管尽管还有事件
, 但是已经没有足够的事件来组合了, 因此下游就不会收到剩余的事件了.
分析了大概的原理, 我们还是劳逸结合, 先来看看实际中的代码怎么写吧:
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Log.d(TAG, "emit 4");
emitter.onNext(4);
Log.d(TAG, "emit complete1");
emitter.onComplete();
}
});
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "emit A");
emitter.onNext("A");
Log.d(TAG, "emit B");
emitter.onNext("B");
Log.d(TAG, "emit C");
emitter.onNext("C");
Log.d(TAG, "emit complete2");
emitter.onComplete();
}
});
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
我们分别创建了两个上游水管, 一个发送1,2,3,4,Complete, 另一个发送A,B,C,Complete, 接着用Zip把发出的事件组合, 来看看运行结果吧:
D/TAG: onSubscribe
D/TAG: emit 1
D/TAG: emit 2
D/TAG: emit 3
D/TAG: emit 4
D/TAG: emit complete1
D/TAG: emit A
D/TAG: onNext: 1A
D/TAG: emit B
D/TAG: onNext: 2B
D/TAG: emit C
D/TAG: onNext: 3C
D/TAG: emit complete2
D/TAG: onComplete
结果似乎是对的... 但是总感觉什么地方不对劲...
哪儿不对劲呢, 为什么感觉是水管一发送完了之后, 水管二才开始发送啊? 到底是不是呢, 我们来验证一下:
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Thread.sleep(1000);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Thread.sleep(1000);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Thread.sleep(1000);
Log.d(TAG, "emit 4");
emitter.onNext(4);
Thread.sleep(1000);
Log.d(TAG, "emit complete1");
emitter.onComplete();
}
});
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "emit A");
emitter.onNext("A");
Thread.sleep(1000);
Log.d(TAG, "emit B");
emitter.onNext("B");
Thread.sleep(1000);
Log.d(TAG, "emit C");
emitter.onNext("C");
Thread.sleep(1000);
Log.d(TAG, "emit complete2");
emitter.onComplete();
}
});
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
这次我们在每发送一个事件之后加入了一秒钟的延时, 来看看运行结果吧, 注意这是个GIF图:

(贴心的我怕大家看不清楚, 特意调成了老年字体呢)
阿西吧, 好像真的是先发送的水管一再发送的水管二呢, 为什么会有这种情况呢? 因为我们两根水管都是运行在同一个线程里, 同一个线程里执行代码肯定有先后顺序呀.
因此我们来稍微改一下, 不让他们在同一个线程, 不知道怎么切换线程的, 请掉头看前面几节.
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Thread.sleep(1000);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Thread.sleep(1000);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Thread.sleep(1000);
Log.d(TAG, "emit 4");
emitter.onNext(4);
Thread.sleep(1000);
Log.d(TAG, "emit complete1");
emitter.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "emit A");
emitter.onNext("A");
Thread.sleep(1000);
Log.d(TAG, "emit B");
emitter.onNext("B");
Thread.sleep(1000);
Log.d(TAG, "emit C");
emitter.onNext("C");
Thread.sleep(1000);
Log.d(TAG, "emit complete2");
emitter.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
好了, 这次我们让水管都在IO线程里发送事件, 再来看看运行结果:
D/TAG: onSubscribe
D/TAG: emit A
D/TAG: emit 1
D/TAG: onNext: 1A
D/TAG: emit B
D/TAG: emit 2
D/TAG: onNext: 2B
D/TAG: emit C
D/TAG: emit 3
D/TAG: onNext: 3C
D/TAG: emit complete2
D/TAG: onComplete
GIF图:

诶! 这下就对了嘛, 两根水管同时开始发送, 每发送一个, Zip就组合一个, 再将组合结果发送给下游.
不对呀! 可能细心点的朋友又看出端倪了, 第一根水管明明发送了四个数据+一个Complete, 之前明明还有的, 为啥到这里没了呢?
这是因为我们之前说了, zip发送的事件数量跟上游中发送事件最少的那一根水管的事件数量是有关的, 在这个例子里我们第二根水管只发送了三个事件然后就发送了Complete, 这个时候尽管第一根水管还有事件4
和事件Complete
没有发送, 但是它们发不发送还有什么意义呢? 所以本着节约是美德的思想, 就干脆打断它的狗腿, 不让它发了.
至于前面的例子为什么会发送, 刚才不是已经说了是!在!同!一!个!线!程!里!吗!!!!再问老子打死你!
有好事的程序员可能又要问了, 那我不发送Complete呢? 答案是显然的, 上游会继续发送事件, 但是下游仍然收不到那些多余的事件. 不信你可以试试.
实践
学习了Zip的基本用法, 那么它在Android有什么用呢, 其实很多场景都可以用到Zip. 举个例子.
比如一个界面需要展示用户的一些信息, 而这些信息分别要从两个服务器接口中获取, 而只有当两个都获取到了之后才能进行展示, 这个时候就可以用Zip了:
首先分别定义这两个请求接口:
public interface Api {
@GET
Observable<UserBaseInfoResponse> getUserBaseInfo(@Body UserBaseInfoRequest request);
@GET
Observable<UserExtraInfoResponse> getUserExtraInfo(@Body UserExtraInfoRequest request);
}
接着用Zip来打包请求:
Observable<UserBaseInfoResponse> observable1 =
api.getUserBaseInfo(new UserBaseInfoRequest()).subscribeOn(Schedulers.io());
Observable<UserExtraInfoResponse> observable2 =
api.getUserExtraInfo(new UserExtraInfoRequest()).subscribeOn(Schedulers.io());
Observable.zip(observable1, observable2,
new BiFunction<UserBaseInfoResponse, UserExtraInfoResponse, UserInfo>() {
@Override
public UserInfo apply(UserBaseInfoResponse baseInfo,
UserExtraInfoResponse extraInfo) throws Exception {
return new UserInfo(baseInfo, extraInfo);
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<UserInfo>() {
@Override
public void accept(UserInfo userInfo) throws Exception {
//do something;
}
});
好了, 本次的教程就到这里吧. 又到周末鸟, 下周见.
网友评论
08-07 09:30:39.946 25656-25677/com.example.hasee.rxjavademo1 D/233: emit 1
08-07 09:30:39.947 25656-25678/com.example.hasee.rxjavademo1 D/233: emit A
onNext1A
08-07 09:30:40.947 25656-25677/com.example.hasee.rxjavademo1 D/233: emit 2
08-07 09:30:40.947 25656-25678/com.example.hasee.rxjavademo1 D/233: emit B
08-07 09:30:40.947 25656-25677/com.example.hasee.rxjavademo1 D/233: onNext2B
08-07 09:30:41.948 25656-25678/com.example.hasee.rxjavademo1 D/233: emit C
08-07 09:30:41.948 25656-25677/com.example.hasee.rxjavademo1 D/233: emit 3
08-07 09:30:41.949 25656-25678/com.example.hasee.rxjavademo1 D/233: onNext3C
08-07 09:30:42.949 25656-25677/com.example.hasee.rxjavademo1 D/233: emit 4
08-07 09:30:42.949 25656-25678/com.example.hasee.rxjavademo1 D/233: emit complete2
08-07 09:30:42.950 25656-25678/com.example.hasee.rxjavademo1 D/233: oncomplete
//为啥和上面讲的不一样啊,,不是说长水管就不发了啊
08-05 14:56:35.901 4455-4475/com.cat.rxjavademo E/copycat: 长水管 - 1 RxCachedThreadScheduler-1
08-05 14:56:35.903 4455-4476/com.cat.rxjavademo E/copycat: 短水管 - A RxCachedThreadScheduler-2
onNext - 结合:1A
08-05 14:56:36.903 4455-4475/com.cat.rxjavademo E/copycat: 长水管 - 2 RxCachedThreadScheduler-1
08-05 14:56:36.903 4455-4476/com.cat.rxjavademo E/copycat: 短水管 - B RxCachedThreadScheduler-2
onNext - 结合:2B
08-05 14:56:37.904 4455-4475/com.cat.rxjavademo E/copycat: 长水管 - 3 RxCachedThreadScheduler-1
08-05 14:56:37.904 4455-4476/com.cat.rxjavademo E/copycat: 短水管 - onComplete RxCachedThreadScheduler-2
08-05 14:56:37.905 4455-4476/com.cat.rxjavademo E/copycat: onComplete
08-05 14:56:38.905 4455-4475/com.cat.rxjavademo E/copycat: 长水管 - 4 RxCachedThreadScheduler-1
08-05 14:56:39.906 4455-4475/com.cat.rxjavademo E/copycat: 长水管 - onComplete RxCachedThreadScheduler-1
请问这里的打断狗腿的机制是什么?第二个水管发了Complete后怎么会影响到第一根水管事件的发送呢?楼主之前的文章里曾经提到过即使上游发送了Complete事件后,上游应该还是可以继续发送事件的,只是下游不再接受事件而已。与此处的说法有点矛盾,还请楼主进一步解释一下。
RxJavaText onSubscribe
RxJavaText emit 1
RxJavaText emit A
RxJavaText onNext :1A
12-22 16:08:06.836 8384-8384/com.example.my_rxjava E/LogUtilsTAG: 接收源发送事件:_____onSubscribe
12-22 16:08:06.846 8384-8406/com.example.my_rxjava D/LogUtilsTAG: 发射源发送事件:_____1
12-22 16:08:06.846 8384-8407/com.example.my_rxjava E/LogUtilsTAG: 接收源发送事件:_____onNext_1A
12-22 16:08:06.846 8384-8407/com.example.my_rxjava D/LogUtilsTAG: 发射源发送事件:_____A
12-22 16:08:07.846 8384-8406/com.example.my_rxjava D/LogUtilsTAG: 发射源发送事件:_____2
12-22 16:08:07.846 8384-8407/com.example.my_rxjava E/LogUtilsTAG: 接收源发送事件:_____onNext_2B
12-22 16:08:07.846 8384-8407/com.example.my_rxjava D/LogUtilsTAG: 发射源发送事件:_____B
12-22 16:08:08.846 8384-8406/com.example.my_rxjava D/LogUtilsTAG: 发射源发送事件:_____3
12-22 16:08:08.846 8384-8407/com.example.my_rxjava E/LogUtilsTAG: 接收源发送事件:_____onNext_3C
12-22 16:08:08.846 8384-8407/com.example.my_rxjava D/LogUtilsTAG: 发射源发送事件:_____C
12-22 16:08:09.846 8384-8406/com.example.my_rxjava D/LogUtilsTAG: 发射源发送事件:_____4
12-22 16:08:09.846 8384-8407/com.example.my_rxjava E/LogUtilsTAG: 接收源发送事件:_____onComplete
12-22 16:08:09.846 8384-8407/com.example.my_rxjava D/LogUtilsTAG: 发射源发送事件:_____onComplete_2
发送源发送事件4 还是打印了,
a发射源发射了A
a发射源发射了B
a发射源发射了C
11-30 18:05:59.954 11158-11158/map.zym.com.rxandroiddemo I/ZipActivity: a发射源发射了onComplete
第1次组合数据
第1次接收数据A1
b发射源发射了1
第2次组合数据
第2次接收数据B2
b发射源发射了2
b发射源发射了onComplete
’11-30 18:05:59.953 11158-11158/map.zym.com.rxandroiddemo I/ZipActivity: a发射源发射了A
11-30 18:05:59.953 11158-11158/map.zym.com.rxandroiddemo I/ZipActivity: a发射源发射了B
11-30 18:05:59.953 11158-11158/map.zym.com.rxandroiddemo I/ZipActivity: a发射源发射了C
11-30 18:05:59.954 11158-11158/map.zym.com.rxandroiddemo I/ZipActivity: a发射源发射了onComplete
11-30 18:05:59.954 11158-11158/map.zym.com.rxandroiddemo I/ZipActivity: 第1次组合数据
11-30 18:05:59.954 11158-11158/map.zym.com.rxandroiddemo I/ZipActivity: 第1次接收数据A1
11-30 18:05:59.954 11158-11158/map.zym.com.rxandroiddemo I/ZipActivity: b发射源发射了1
11-30 18:05:59.954 11158-11158/map.zym.com.rxandroiddemo I/ZipActivity: 第2次组合数据
11-30 18:05:59.954 11158-11158/map.zym.com.rxandroiddemo I/ZipActivity: 第2次接收数据B2
11-30 18:05:59.954 11158-11158/map.zym.com.rxandroiddemo I/ZipActivity: b发射源发射了2
11-30 18:05:59.954 11158-11158/map.zym.com.rxandroiddemo I/ZipActivity: b发射源发射了onComplete
rxjava2demo I/MainActivity: emit 1
rxjava2demo D/MainActivity: emit A
rxjava2demo D/MainActivity: onNext:----1A
rxjava2demo I/MainActivity: emit 2
rxjava2demo D/MainActivity: emit B
rxjava2demo D/MainActivity: onNext:----2B
rxjava2demo I/MainActivity: emit 3
rxjava2demo D/MainActivity: emit C
rxjava2demo D/MainActivity: onNext:----3C
rxjava2demo I/MainActivity: emit 4
rxjava2demo D/MainActivity: emit complete2
rxjava2demo D/MainActivity: onComplete
这里是会打印emit 4的,跟文中:
zip发送的事件数量跟上游中发送事件最少的那一根水管的事件数量是有关的, 在这个例子里我们第二根水管只发送了三个事件然后就发送了Complete, 这个时候尽管第一根水管还有事件4 和事件Complete 没有发送, 但是它们发不发送还有什么意义呢? -------感觉有点不符,请问可以解答一下吗
1)相同线程的情况下:按发射源在代码中的顺序执行
下游收到发射源1发送onComplete的话,会接收发射源2的前3个事件,第4个onError会忽略,最后会触发onComplete
下游收到发射源1发送onError的话,会立即触发下游的onError事件,同时会忽略发射源2发送的所有事件
2)不同线程的话其实和相同线程的现象基本一致
上游先发送onComplete或者onError,下游就触发哪个,不同的是下游触发了onComplete或者onError之后,没有发送的上游事件都不会再发送了
作者他是把两个发射源都放在io线程里面(.subscribeOn(Schedulers.io())),你这种情况是一个在主线程,一个在子线程
你可以把你同步请求的observable的.subscribeOn(Schedulers.io())直接去掉或者改为.subscribeOn(AndroidSchedulers.mainThread())就行了
Process: com.beijing.RxJavaTest, PID: 13487
java.lang.InterruptedException
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:379)
at java.lang.Thread.sleep(Thread.java:321)
at com.beijing.RxJavaTest.rxjava_test.Rxjava_1$7.subscribe(Rxjava_1.java:157)
Observable.zip(observable2, observable1, new BiFunction<String, Integer, String>()
结果:
D/rxjava: onSubscribe
D/rxjava: emit A RxCachedThreadScheduler-1
D/rxjava: emit 1 RxCachedThreadScheduler-2
D/rxjava: emit B RxCachedThreadScheduler-1
D/rxjava: onNext: A+1
D/rxjava: emit C RxCachedThreadScheduler-1
D/rxjava: emit 2 RxCachedThreadScheduler-2
D/rxjava: emit complete2
D/rxjava: onNext: B+2
D/rxjava: emit 3 RxCachedThreadScheduler-2
D/rxjava: onNext: C+3
D/rxjava: onComplete
D/rxjava: emit 4 RxCachedThreadScheduler-2
D/rxjava: emit complete1
08-09 16:26:19.430 26773-26854/ivan.rich.guardian D/TestActivity: [subscribe] observable1 emit 0
08-09 16:26:19.430 26773-26855/ivan.rich.guardian D/TestActivity: [subscribe] observable2 emit a
08-09 16:26:19.430 26773-26854/ivan.rich.guardian D/TestActivity: [subscribe] observable1 emit 1
08-09 16:26:19.430 26773-26854/ivan.rich.guardian D/TestActivity: [subscribe] observable1 emit 2
08-09 16:26:19.430 26773-26854/ivan.rich.guardian D/TestActivity: [subscribe] observable1 emit 3
08-09 16:26:19.430 26773-26855/ivan.rich.guardian D/TestActivity: [accept] 0----a
08-09 16:26:19.430 26773-26854/ivan.rich.guardian D/TestActivity: [subscribe] observable1 onComplete
08-09 16:26:19.430 26773-26855/ivan.rich.guardian D/TestActivity: [subscribe] observable2 emit b
08-09 16:26:19.430 26773-26855/ivan.rich.guardian D/TestActivity: [accept] 1----b
08-09 16:26:19.430 26773-26855/ivan.rich.guardian D/TestActivity: [subscribe] observable2 emit c
08-09 16:26:19.430 26773-26855/ivan.rich.guardian D/TestActivity: [accept] 2----c
08-09 16:26:19.430 26773-26855/ivan.rich.guardian D/TestActivity: [subscribe] observable2 onComplete
api.getUserBaseInfo(new UserBaseInfoRequest()).subscribeOn(Schedulers.io());
这里在使用Retrofit发起网络请求的时候规定被观察者发送数据流所在的线程是不是多余的?Retrofit已经定义了,而且这里使用subscribeOn(Schedulers.io())不会生效。
=》针对这句不太理解,是你主动打断了?为什么我的demo还是会发送。按我的理解,complete之后只是接收方不会去接收,发送方继续发送才对吧。请楼主指导。
/? D/DemoRx: onSubscribe
/? D/DemoRx: emit 1
/? D/DemoRx: emit A
/? D/DemoRx: onNext: 1A
/com.example.sphinx.rxjavate D/DemoRx: emit 2
/com.example.sphinx.rxjavate D/DemoRx: emit B
/com.example.sphinx.rxjavate D/DemoRx: onNext: 2B
/com.example.sphinx.rxjavate D/DemoRx: emit 3
/com.example.sphinx.rxjavate D/DemoRx: emit C
/com.example.sphinx.rxjavate D/DemoRx: onNext: 3C
/com.example.sphinx.rxjavate D/DemoRx: emit 4
/com.example.sphinx.rxjavate D/DemoRx: emit complete2
/com.example.sphinx.rxjavate D/DemoRx: onComplete
异常出现在observable1中第四个Thread.sleep(1000);InterruptedException
@Override
public void accept(Throwable throwable) throws Exception {
if (throwable instanceof InterruptedIOException) {
Log.d(TAG, "Io interrupted");
}
}
});
}
如果请求一很快就请求完了 请求二隔了一段时间才请求好 这时是不是最终zip的时机是取决于 请求二完成的时机?
假如请求二 请求失败了是不是也算整个zip操作失败?
如果我不想整个操作失败 我就去用flatmap来替代zip 来实现 两个请求揉合成一个请求的 这种需求?