美文网首页
RxJava之zip操作符

RxJava之zip操作符

作者: 加油码农 | 来源:发表于2017-10-09 13:57 被阅读4351次

讲解zip操作符之前,先来巩固一个概念的区别,比如如何让一个线程睡眠一秒?
通常情况下,我们在Java中会使用Thread.sleep(1000),但是笔者就就这个问题今天就遇到一个坑,不断地出现InterruptedException这个异常,后来通过另外一种方式解决了问题,即SystemClock.sleep(1000);
那么zip操作符是如何定义的呢?
简单来说zip操作符就是合并多个被观察者的数据流, 然后发送(Emit)最终合并的数据。借用网上的一张图,分析的比较透彻,如下:


image.png

从上游中可以看出,上游有两根水管,其中一根水管负责发送圆形事件 , 另外一根水管负责发送三角形事件 , 通过Zip操作符, 使得圆形事件 和三角形事件 合并为了一个矩形事件 . 拆分过程如下:


image.png
通过分解动作我们可以看出:
组合的过程是分别从 两根水管里各取出一个事件 来进行组合, 并且一个事件只能被使用一次, 组合的顺序是严格按照事件发送的顺利 来进行的, 也就是说不会出现圆形1 事件和三角形B 事件进行合并, 也不可能出现圆形2 和三角形A 进行合并的情况.
最终下游收到的事件数量 是和上游中发送事件最少的那一根水管的事件数量 相同. 这个也很好理解, 因为是从每一根水管 里取一个事件来进行合并, 最少的 那个肯定就最先取完 , 这个时候其他的水管尽管还有事件 , 但是已经没有足够的事件来组合了, 因此下游就不会收到剩余的事件了.
下面介绍一下我自己写的一个小demo用来测试zip操作符,如下:
Observable observable1=Observable.create(new ObservableOnSubscribe<Integer>() {

@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Log.d(TAG,"1");
e.onNext(1);
SystemClock.sleep(1000);
Log.d(TAG,"2");
e.onNext(2);
SystemClock.sleep(1000);
Log.d(TAG,"3");
e.onNext(3);
SystemClock.sleep(1000);
Log.d(TAG,"4");
e.onNext(4);
SystemClock.sleep(1000);
Log.d(TAG,"onComplete");
e.onComplete();
}
}).subscribeOn(Schedulers.io());

            Observable observable2=Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> e) throws Exception {
                    Log.d(TAG,"A");
                    e.onNext("A");
                    SystemClock.sleep(1000);
                    Log.d(TAG,"B");
                    e.onNext("B");
                    SystemClock.sleep(1000);
                    Log.d(TAG,"C");
                    e.onNext("C");
                    SystemClock.sleep(1000);
                    Log.d(TAG,"onComplete");
                    e.onComplete();
                }
            }).subscribeOn(Schedulers.io());

            Observable.zip(observable1, observable2, new BiFunction<Integer,String,String>() {
                @Override
                public String apply(Integer a,String b) throws Exception {
                    return a+b;
                }
            }).subscribe(new Consumer<String>() {
                @Override
                public void accept(String o) throws Exception {
                    Log.d(TAG,o);
                }
            });

运行结果如下:


image.png

则实现了zip所要实现的功能,但实现的过程中,过程艰辛,其中一步就是让线程睡眠一秒,这也是本文为什么在最开始就强化这个概念的原因。还有一点


image.png
image.png
上图两个红色的矩形块中添加了.subscribeOn(Schedulers.io()),目的是让observable1和observable2处于两个不同的线程中,这样两者同时发送就可以合并,没有先后之分了。倘若不加,那么你会看到observable1的log先打印完毕之后,才会去和observable2的发送的数据合并,因为这样的话observable1和observable2将会处在一个线程中,那就肯定有先后之分了。当然了,读者也可以去掉.subscribeOn(Schedulers.io())看看程序究竟打印出什么样的log。

相关文章

  • RxJava之zip操作符

    讲解zip操作符之前,先来巩固一个概念的区别,比如如何让一个线程睡眠一秒?通常情况下,我们在Java中会使用Thr...

  • zip操作符的error处理

    熟悉rxjava的同学肯定对操作符不会陌生,比如我们使用map操作符处理数据,使用zip操作符合并多个请求,这里演...

  • RxJava操作符---zip

    简介 zip操作符用于将多个数据源合并,并生成一个新的数据源。新生成的数据源严格按照合并前的数据源的数据发射顺序,...

  • RxJava操作符系列四

    RxJava操作符系列传送门 RxJava操作符源码RxJava操作符系列一RxJava操作符系列二RxJava操...

  • RxJava操作符系列五

    RxJava操作符系列传送门 RxJava操作符源码RxJava操作符系列一RxJava操作符系列二RxJava操...

  • RxJava操作符系列六

    RxJava操作符系列传送门 RxJava操作符源码RxJava操作符系列一RxJava操作符系列二RxJava操...

  • 阅读 Subscriber 的实现中关于 backpressu

    rxjava 中最具有挑战性的设计就是 backpresure 。例如 zip 操作符,合并两个 Observab...

  • Android拾萃 - RxJava2之变换操作符及其demo

    Android拾萃 - RxJava2操作符汇总Android拾萃 - RxJava2之创建操作符及其demo 一...

  • RxJava操作符系列三

    RxJava操作符系列传送门 RxJava操作符源码RxJava操作符系列一RxJava操作符系列二 前言 在之前...

  • 【RxJava】- 连接操作符源码分析

    目录 【RxJava】- 创建操作符源码分析【RxJava】- 变换操作符源码分析【RxJava】- 过滤操作符源...

网友评论

      本文标题:RxJava之zip操作符

      本文链接:https://www.haomeiwen.com/subject/rozuyxtx.html