zip 操作符
zip 的字面意思就是压缩的含义,它可以将两个事件源 Observable 发送的事件通过 zip 操作将结果发送给订阅者 Observer 上。
举个例子:
假如一个页面是负责展示商品的信息的,这个商品的信息包括普通信息(例如价格,参数),还有商品的评价信息。这两类信息来源于两个接口,需求就是在两个接口信息都返回之后,才展示该商品信息。
基于上面这种需求,我们可以将商品普通信息作为一个接口去请求,评价信息作为一个接口去请求。这样就是两个 Observable 对象,通过 zip 操作符将两个 Observable 请求的结果转化成合并成一个商品信息的 Java Bean 之后发送给对应的订阅者,那么这样就完成了需求了。
zip操作符.png
zip 操作
从 Observable 的 zip 操作方法来看,它定义了 3 个泛型
- T1 第一个 Observable 发送的数据类型;
- T2 第二个 Observable 发送的数据类型;
- R 需要通过 zip 操作符转化后的数据类型;
在 Function 接口中提供了将一种数据类型转化为另一个种数据类型的功能,而 BiFunction 接口种提供了将两种数据类型转化为另一种数据类型的功能。
Functions.toFunction(zipper) 目的就是將两个数据转化为为一个数据的操作,转变为将一个 Object[] 转换为一个数据,实际上就是将 BitFunction 的功能转为 Function 的功能。
public static <T1, T2, R> Observable<R> zip(
ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2,
BiFunction<? super T1, ? super T2, ? extends R> zipper) {
return zipArray(Functions.toFunction(zipper), false, bufferSize(), source1, source2);
}
zipArray
zipArray 方法返回一个 ObservableZip 对象。 它是如何将多个 Observable 发送的事件汇总到一起发送的呢?
在 ObservableZip 内部有一个 ZipCoodinator 类,它负责汇总数据发送功能。
- row 属性就是用于存放当前需要结合的两个数据的数组。
-
actual 属性就是外界定义的订阅者 Observer
-
zippper 就是上面通过 Functions.toFunction(zipper) 转化而来的接口对象。Function<? super Object[], ? extends R> zipper。
-
observers 它是 ZipObserver 类型的数组。 这是去订阅 Observable 的订阅者数组,有多少个 Observable 那么在 observers 的数组就会创建多少个 ZipObserver 对象。
两个 Observable 事件源是怎么被订阅的?
ZipCoordinator 内部会创建两个 ZipObserver 分别去订阅对应的 Observable。Observable 发送的事件将由 ZipObserver 去接收。
for (int i = 0; i < len; i++) {
if (cancelled) {
return;
}
// 让每一个订阅者 zipObserver 去订阅对应的事件源。
// 注意这里有可能是同步/异步。
sources[i].subscribe(s[i]);
}
订阅者接受事件
每一个事件源发送的事件将会对应的发送到对应的订阅者中去接收。在订阅事件时若是没有做线程切换操作的话,那么 source[i].subscribe[s[i]] 这是同步操作。
@Override
public void onNext(T t) {
queue.offer(t);
parent.drain();
}
@Override
public void onError(Throwable t) {
error = t;
done = true;
parent.drain();
}
@Override
public void onComplete() {
done = true;
parent.drain();
}
同步操作订阅者接受事件
什么是同步订阅?
同步的意思就是多个 Observable 发送事件是按顺序发送的,因为他们是处于同一个线程中,因此会出现一种现象,就是第一个 Observable 的数据发送完毕之后,第二个 Observable 才能开始发送数据。
按顺序发送数据,那么发送的数据是怎么存储的?
queue 是 ZipObserver 内部维护的存储数据源发送的数据的数据结构。上面提到同步操作,会将第一个 Observable 的数据源发送完毕之后才开始发送第二个 Observable 的数据,那么先前发送的数据就会存放在 queue 中。
- onNext(T t)
接收事件时,会被回调的一个方法,t 就是事件源发送的数据。
@Override
public void onNext(T t) {
//将数据 t 保存起来
queue.offer(t);
//取出操作,取出的数据不一定是数据 t 哦。
parent.drain();
}
内部是通过 writeToQueue(buffer, e, index, offset); 方法将数据写入到 buffer 中的。
parent.drain() 方法取出之前通过 offer 存放的数据
public void drain() {
if (getAndIncrement() != 0) {
return;
}
int missing = 1;
final ZipObserver<T, R>[] zs = observers;
final Observer<? super R> a = actual;
final T[] os = row;
final boolean delayError = this.delayError;
for (;;) {
for (;;) {
int i = 0;
int emptyCount = 0;
for (ZipObserver<T, R> z : zs) {
if (os[i] == null) {
boolean d = z.done;
//取出
T v = z.queue.poll();
boolean empty = v == null;
if (checkTerminated(d, empty, a, delayError, z)) {
return;
}
if (!empty) {
os[i] = v;
} else {
emptyCount++;
}
} else {
if (z.done && !delayError) {
Throwable ex = z.error;
if (ex != null) {
clear();
a.onError(ex);
return;
}
}
}
i++;
}
if (emptyCount != 0) {
break;
}
R v;
try {
v = ObjectHelper.requireNonNull(zipper.apply(os.clone()), "The zipper returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
clear();
a.onError(ex);
return;
}
a.onNext(v);
Arrays.fill(os, null);
}
missing = addAndGet(-missing);
if (missing == 0) {
return;
}
}
}
这个方法调用对应的 ZipObserver.queue.poll() 方法取出一个元素。
T v = z.queue.poll();
我们上面讲到 raw 属性,这个属性就是用存放即将合并发送的两个数据的数据。
os[i] = v;
转换数据并发送数据
///转换数据
R v;//类型 R 就是最重要转化后的类型。
v = ObjectHelper.requireNonNull(zipper.apply(os.clone()), "The zipper returned a null value");
//发送数据
a.onNext(v);
zipper.apply 方法就是上面提到的将一个 Object[] 转化为一个 R 类型的数据。这个方法是由用户去实现的,只有用户才知道怎么实现,就想上面的举例中提到的,它将商品的基本信息和评价信息分别存放到 Object[] 数组中,之后通过 zipper.apply 转化为一个 Goods 对象。
网友评论