RxJava_zip操作符操作流程源码

作者: 未见哥哥 | 来源:发表于2017-06-16 15:55 被阅读115次

    zip 操作符

    zip 的字面意思就是压缩的含义,它可以将两个事件源 Observable 发送的事件通过 zip 操作将结果发送给订阅者 Observer 上。

    举个例子:

    假如一个页面是负责展示商品的信息的,这个商品的信息包括普通信息(例如价格,参数),还有商品的评价信息。这两类信息来源于两个接口,需求就是在两个接口信息都返回之后,才展示该商品信息。

    基于上面这种需求,我们可以将商品普通信息作为一个接口去请求,评价信息作为一个接口去请求。这样就是两个 Observable 对象,通过 zip 操作符将两个 Observable 请求的结果转化成合并成一个商品信息的 Java Bean 之后发送给对应的订阅者,那么这样就完成了需求了。


    zip操作符.png

    zip 操作

    从 Observable 的 zip 操作方法来看,它定义了 3 个泛型

    • T1 第一个 Observable 发送的数据类型;
    • T2 第二个 Observable 发送的数据类型;
    • R 需要通过 zip 操作符转化后的数据类型;

    在 Function 接口中提供了将一种数据类型转化为另一个种数据类型的功能,而 BiFunction 接口种提供了将两种数据类型转化为另一种数据类型的功能。

    Functions.toFunction(zipper) 目的就是將两个数据转化为为一个数据的操作,转变为将一个 Object[] 转换为一个数据,实际上就是将 BitFunction 的功能转为 Function 的功能。

    public static <T1, T2, R> Observable<R> zip(
            ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2,
            BiFunction<? super T1, ? super T2, ? extends R> zipper) {
        return zipArray(Functions.toFunction(zipper), false, bufferSize(), source1, source2);
    }
    

    zipArray

    zipArray 方法返回一个 ObservableZip 对象。 它是如何将多个 Observable 发送的事件汇总到一起发送的呢?

    在 ObservableZip 内部有一个 ZipCoodinator 类,它负责汇总数据发送功能。

    • row 属性就是用于存放当前需要结合的两个数据的数组。
    • actual 属性就是外界定义的订阅者 Observer

    • zippper 就是上面通过 Functions.toFunction(zipper) 转化而来的接口对象。Function<? super Object[], ? extends R> zipper。

    • observers 它是 ZipObserver 类型的数组。 这是去订阅 Observable 的订阅者数组,有多少个 Observable 那么在 observers 的数组就会创建多少个 ZipObserver 对象。

    两个 Observable 事件源是怎么被订阅的?

    ZipCoordinator 内部会创建两个 ZipObserver 分别去订阅对应的 Observable。Observable 发送的事件将由 ZipObserver 去接收。

    for (int i = 0; i < len; i++) {
        if (cancelled) {
            return;
        }
       // 让每一个订阅者 zipObserver 去订阅对应的事件源。
      // 注意这里有可能是同步/异步。
        sources[i].subscribe(s[i]);
    }
    

    订阅者接受事件

    每一个事件源发送的事件将会对应的发送到对应的订阅者中去接收。在订阅事件时若是没有做线程切换操作的话,那么 source[i].subscribe[s[i]] 这是同步操作。

    @Override
    public void onNext(T t) {
        queue.offer(t);
        parent.drain();
    }
    @Override
    public void onError(Throwable t) {
        error = t;
        done = true;
        parent.drain();
    }
    @Override
    public void onComplete() {
        done = true;
        parent.drain();
    }
    

    同步操作订阅者接受事件

    什么是同步订阅?

    同步的意思就是多个 Observable 发送事件是按顺序发送的,因为他们是处于同一个线程中,因此会出现一种现象,就是第一个 Observable 的数据发送完毕之后,第二个 Observable 才能开始发送数据。

    按顺序发送数据,那么发送的数据是怎么存储的?

    queue 是 ZipObserver 内部维护的存储数据源发送的数据的数据结构。上面提到同步操作,会将第一个 Observable 的数据源发送完毕之后才开始发送第二个 Observable 的数据,那么先前发送的数据就会存放在 queue 中。

    • onNext(T t)

    接收事件时,会被回调的一个方法,t 就是事件源发送的数据。

    @Override
    public void onNext(T t) {
        //将数据 t 保存起来
        queue.offer(t);
        //取出操作,取出的数据不一定是数据 t 哦。
        parent.drain();
    }
    

    内部是通过 writeToQueue(buffer, e, index, offset); 方法将数据写入到 buffer 中的。

    parent.drain() 方法取出之前通过 offer 存放的数据

    public void drain() {
        if (getAndIncrement() != 0) {
            return;
        }
        int missing = 1;
        final ZipObserver<T, R>[] zs = observers;
        final Observer<? super R> a = actual;
        final T[] os = row;
        final boolean delayError = this.delayError;
        for (;;) {
            for (;;) {
                int i = 0;
                int emptyCount = 0;
                for (ZipObserver<T, R> z : zs) {
                    if (os[i] == null) {
                        boolean d = z.done;
                        //取出
                        T v = z.queue.poll();
                        boolean empty = v == null;
                        if (checkTerminated(d, empty, a, delayError, z)) {
                            return;
                        }
                        if (!empty) {
                            os[i] = v;
                        } else {
                            emptyCount++;
                        }
                    } else {
                        if (z.done && !delayError) {
                            Throwable ex = z.error;
                            if (ex != null) {
                                clear();
                                a.onError(ex);
                                return;
                            }
                        }
                    }
                    i++;
                }
                if (emptyCount != 0) {
                    break;
                }
                R v;
                try {
                    v = ObjectHelper.requireNonNull(zipper.apply(os.clone()), "The zipper returned a null value");
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    clear();
                    a.onError(ex);
                    return;
                }
                a.onNext(v);
                Arrays.fill(os, null);
            }
            missing = addAndGet(-missing);
            if (missing == 0) {
                return;
            }
        }
    }
    

    这个方法调用对应的 ZipObserver.queue.poll() 方法取出一个元素。

    T v = z.queue.poll();
    

    我们上面讲到 raw 属性,这个属性就是用存放即将合并发送的两个数据的数据。

    os[i] = v;
    

    转换数据并发送数据

    
    ///转换数据
    R v;//类型 R 就是最重要转化后的类型。
    v = ObjectHelper.requireNonNull(zipper.apply(os.clone()), "The zipper returned a null value");
    
    //发送数据
    a.onNext(v);
    

    zipper.apply 方法就是上面提到的将一个 Object[] 转化为一个 R 类型的数据。这个方法是由用户去实现的,只有用户才知道怎么实现,就想上面的举例中提到的,它将商品的基本信息和评价信息分别存放到 Object[] 数组中,之后通过 zipper.apply 转化为一个 Goods 对象。

    相关文章

      网友评论

        本文标题:RxJava_zip操作符操作流程源码

        本文链接:https://www.haomeiwen.com/subject/stcmqxtx.html