讲解zip操作符之前,先来巩固一个概念的区别,比如如何让一个线程睡眠一秒?
通常情况下,我们在Java中会使用Thread.sleep(1000),但是笔者就就这个问题今天就遇到一个坑,不断地出现InterruptedException这个异常,后来通过另外一种方式解决了问题,即SystemClock.sleep(1000);
那么zip操作符是如何定义的呢?
简单来说zip操作符就是合并多个被观察者的数据流, 然后发送(Emit)最终合并的数据。借用网上的一张图,分析的比较透彻,如下:
![](https://img.haomeiwen.com/i2324208/b66d8eff0efb121f.png)
从上游中可以看出,上游有两根水管,其中一根水管负责发送圆形事件 , 另外一根水管负责发送三角形事件 , 通过Zip操作符, 使得圆形事件 和三角形事件 合并为了一个矩形事件 . 拆分过程如下:
![](https://img.haomeiwen.com/i2324208/841b7b585f0ae1b5.png)
通过分解动作我们可以看出:
组合的过程是分别从 两根水管里各取出一个事件 来进行组合, 并且一个事件只能被使用一次, 组合的顺序是严格按照事件发送的顺利 来进行的, 也就是说不会出现圆形1 事件和三角形B 事件进行合并, 也不可能出现圆形2 和三角形A 进行合并的情况.
最终下游收到的事件数量 是和上游中发送事件最少的那一根水管的事件数量 相同. 这个也很好理解, 因为是从每一根水管 里取一个事件来进行合并, 最少的 那个肯定就最先取完 , 这个时候其他的水管尽管还有事件 , 但是已经没有足够的事件来组合了, 因此下游就不会收到剩余的事件了.
下面介绍一下我自己写的一个小demo用来测试zip操作符,如下:
Observable observable1=Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Log.d(TAG,"1");
e.onNext(1);
SystemClock.sleep(1000);
Log.d(TAG,"2");
e.onNext(2);
SystemClock.sleep(1000);
Log.d(TAG,"3");
e.onNext(3);
SystemClock.sleep(1000);
Log.d(TAG,"4");
e.onNext(4);
SystemClock.sleep(1000);
Log.d(TAG,"onComplete");
e.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable observable2=Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.d(TAG,"A");
e.onNext("A");
SystemClock.sleep(1000);
Log.d(TAG,"B");
e.onNext("B");
SystemClock.sleep(1000);
Log.d(TAG,"C");
e.onNext("C");
SystemClock.sleep(1000);
Log.d(TAG,"onComplete");
e.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable.zip(observable1, observable2, new BiFunction<Integer,String,String>() {
@Override
public String apply(Integer a,String b) throws Exception {
return a+b;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String o) throws Exception {
Log.d(TAG,o);
}
});
运行结果如下:
![](https://img.haomeiwen.com/i2324208/2691fe46e3f93c42.png)
则实现了zip所要实现的功能,但实现的过程中,过程艰辛,其中一步就是让线程睡眠一秒,这也是本文为什么在最开始就强化这个概念的原因。还有一点
![](https://img.haomeiwen.com/i2324208/8218e2218c89bfcf.png)
![](https://img.haomeiwen.com/i2324208/0df591f6798dc45f.png)
上图两个红色的矩形块中添加了.subscribeOn(Schedulers.io()),目的是让observable1和observable2处于两个不同的线程中,这样两者同时发送就可以合并,没有先后之分了。倘若不加,那么你会看到observable1的log先打印完毕之后,才会去和observable2的发送的数据合并,因为这样的话observable1和observable2将会处在一个线程中,那就肯定有先后之分了。当然了,读者也可以去掉.subscribeOn(Schedulers.io())看看程序究竟打印出什么样的log。
网友评论