本文为博主原创文章,未经允许不得转载
前言
本文简析 RxJava2
的 subscribeOn
和 zip
操作符。
术语解释
Single.just().map().flatMap().subscribeOn().observeOn().subscribe();
上述代码中,Single
到 subscribe()
之间的都称为 操作符
,想像一下自己就是其中一个 操作符
,那么位于左边的便称为 上游
,位于右边的则称为 下游
,故上下游其实是相对的。
一、subscribeOn
demo
// Case2: 在非UI线程执行并关注结果
Single.fromCallable(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return generateRandom();
}
}).subscribeOn(Schedulers.io()).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Logger.d(TAG, "test: accept(Integer integer) invoked on %s", Thread.currentThread().getName());
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Logger.d(TAG, "test: accept(Throwable throwable) invoked on %s", Thread.currentThread().getName());
}
});
fromCallable(Callable callable) [-> Single.java]
public static <T> Single<T> fromCallable(final Callable<? extends T> callable) {
ObjectHelper.requireNonNull(callable, "callable is null");
// RxJavaPlugins 里是全局钩子函数,分析源码时无视即可,此处就是返回 SingleFromCallable
return RxJavaPlugins.onAssembly(new SingleFromCallable<T>(callable));
}
SingleFromCallable [-> SingleFromCallable.java]
// 注意继承自 Single,而 Single 实现了 SingleSource 接口,所以也继承了 subscribe() 方法
public final class SingleFromCallable<T> extends Single<T> {
// 回调函数
final Callable<? extends T> callable;
public SingleFromCallable(Callable<? extends T> callable) {
// 保存为全局变量
this.callable = callable;
}
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
// 一个 run() 方法体为空的 RunnableDisposable 对象,用来取消订阅
Disposable d = Disposables.empty();
// 调用下游(本示例此处为SubscribeOnObserver)的 onSubscribe()
observer.onSubscribe(d);
// 已取消订阅的,直接返回,不会发射任何值
if (d.isDisposed()) {
return;
}
T value;
try {
// 调用 callable.call() 获取值
value = ObjectHelper.requireNonNull(callable.call(), "The callable returned a null value");
} catch (Throwable ex) {
// 捕获所有异常,所以使用 rxjava 时,自己写的方法收不到异常通知,需订阅一个 Consumer<Throwable>
Exceptions.throwIfFatal(ex);
if (!d.isDisposed()) {
// 发射一个 error 事件给下游
observer.onError(ex);
} else {
RxJavaPlugins.onError(ex);
}
return;
}
if (!d.isDisposed()) {
// 发射一个 success 事件给下游
observer.onSuccess(value);
}
}
}
subscribeOn(Scheduler scheduler) [-> Single.java]
public final Single<T> subscribeOn(final Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
// 返回 SingleSubscribeOn
return RxJavaPlugins.onAssembly(new SingleSubscribeOn<T>(this, scheduler));
}
SingleSubscribeOn [-> SingleSubscribeOn.java]
// 注意继承自 Single,而 Single 实现了 SingleSource 接口,所以也继承了 subscribe() 方法
public final class SingleSubscribeOn<T> extends Single<T> {
// 上游
final SingleSource<? extends T> source;
// 线程调度器
final Scheduler scheduler;
public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) {
// 保存上游为 this.source
this.source = source;
// 保存线程调度器为 this.scheduler
this.scheduler = scheduler;
}
@Override
protected void subscribeActual(final SingleObserver<? super T> s) {
// 将下游和上游包装为 SubscribeOnObserver
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s, source);
// 调用下游的 onSubscribe(),此时还没有切换线程,所以 onSubscribe() 是在原线程执行的
s.onSubscribe(parent);
// 将 SubscribeOnObserver 扔到线程调度器中执行,此处就是 IoScheduler,内部实现基于 jdk 的 ExecutorService、FutureTask 和 Future
Disposable f = scheduler.scheduleDirect(parent);
// 将调度器返回的 Disposable(一个实现了 Disposable 和 Runnable 接口的 DisposeTask) 对象设置给 SubscribeOnObserver 的 task,用来取消订阅、中断线程执行
parent.task.replace(f);
}
// 继承自 AtomicReference,实现了 SingleObserver、Disposable、Runnable接口
// SingleObserver:当作下游
// Disposable:传给下游以便下游用来取消订阅
// Runnable:用来提交给 ExecutorService
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
private static final long serialVersionUID = 7000911171163930287L;
// 下游
final SingleObserver<? super T> actual;
// 用来取消订阅、中断线程执行
final SequentialDisposable task;
// 上游
final SingleSource<? extends T> source;
SubscribeOnObserver(SingleObserver<? super T> actual, SingleSource<? extends T> source) {
// 将下游保存为 this.actual
this.actual = actual;
// 将上游保存为 this.source
this.source = source;
// 一个继承自 AtomicReference 实现了 Disposable 接口的对象,用来取消订阅、中断线程执行
this.task = new SequentialDisposable();
}
@Override
public void onSubscribe(Disposable d) {
// 因为继承自 AtomicRefrence,此处将取消订阅的句柄(本示例中此处为 Disposables.empty())设置给内部的对象引用,用于取消对上游的订阅
DisposableHelper.setOnce(this, d);
}
@Override
public void onSuccess(T value) {
// 上游调用 observer.onSuccess() 时,会调用到这里,此处继续调用下游的 onSuccess() 将值向下传递
actual.onSuccess(value);
}
@Override
public void onError(Throwable e) {
// 上游调用 observer.onError() 时,会调用到这里,此处继续调用下游的 onError() 将错误向下传递
actual.onError(e);
}
@Override
public void dispose() {
// 取消对上游的订阅
DisposableHelper.dispose(this);
// 中断线程执行
task.dispose();
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
@Override
public void run() {
// 在线程池中执行 source.subscribe(),本示例会触发:
// SingleFromCallable.subscribe()->
// SingleFromCallable.suscribeActual()->
// this.onSuccess(callable.call())
source.subscribe(this);
}
}
}
subscribe() [-> Single.java]
public final Disposable subscribe(final Consumer<? super T> onSuccess, final Consumer<? super Throwable> onError) {
ObjectHelper.requireNonNull(onSuccess, "onSuccess is null");
ObjectHelper.requireNonNull(onError, "onError is null");
// 将 successConsumer、throwableConsumer 包装成 ConsumerSingleObserver,作为观察者
ConsumerSingleObserver<T> s = new ConsumerSingleObserver<T>(onSuccess, onError);
// 调用 subscribe(SingleObserver subscriber)
subscribe(s);
// ConsumerSingleObserver 实现了 Disposable 接口,持有它,可以用来取消订阅
return s;
}
subscribe(SingleObserver subscriber) [-> Single.java]
public final void subscribe(SingleObserver<? super T> subscriber) {
ObjectHelper.requireNonNull(subscriber, "subscriber is null");
subscriber = RxJavaPlugins.onSubscribe(this, subscriber);
ObjectHelper.requireNonNull(subscriber, "subscriber returned by the RxJavaPlugins hook is null");
try {
// 继续调用抽象方法 subscribeActual(SingleObserver subscriber),即调用子类的 subscribeActual(SingleObserver subscriber)
// 本示例中,此处子类为 SingleSubscribeOn,源码分析见上方
subscribeActual(subscriber);
} catch (NullPointerException ex) {
throw ex;
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
NullPointerException npe = new NullPointerException("subscribeActual failed");
npe.initCause(ex);
throw npe;
}
}
ConsumerSingleObserver [->ConsumerSingleObserver .java]
public final class ConsumerSingleObserver<T> extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, LambdaConsumerIntrospection {
private static final long serialVersionUID = -7012088219455310787L;
// successConsumer
final Consumer<? super T> onSuccess;
// throwableConsumer
final Consumer<? super Throwable> onError;
public ConsumerSingleObserver(Consumer<? super T> onSuccess, Consumer<? super Throwable> onError) {
// 将 successConsumer 保存为全局变量 this.onSuccess
this.onSuccess = onSuccess;
// 将 throwableConsumer 保存为全局变量 this.onError
this.onError = onError;
}
@Override
public void onError(Throwable e) {、
// 最后设置为已取消订阅
lazySet(DisposableHelper.DISPOSED);
try {
// 回调 throwableConsumer
onError.accept(e);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(new CompositeException(e, ex));
}
}
@Override
public void onSubscribe(Disposable d) {
// 本示例中,d 为 SubscribeOnObserver
DisposableHelper.setOnce(this, d);
}
@Override
public void onSuccess(T value) {
// 最后设置为已取消订阅
lazySet(DisposableHelper.DISPOSED);
try {
// 回调 successConsumer
onSuccess.accept(value);
} catch (Throwable ex) {
// 异常被 rxjava 捕获,所以自己写的 successConsumer 收不到异常
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(ex);
}
}
@Override
public void dispose() {
// 取消订阅
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return get() == DisposableHelper.DISPOSED;
}
@Override
public boolean hasCustomOnError() {
return onError != Functions.ON_ERROR_MISSING;
}
}
时序图
Sequence Diagram二、zip
Demo
// Case6: 并发读取不同数据源,转换成同类型后,合并
Single<IBook> novel = Single.fromCallable(new Callable<Novel>() {
@Override
public Novel call() throws Exception {
return getNovel();
}
}).map(new Function<Novel, IBook>() {
@Override
public IBook apply(Novel novel) throws Exception {
return new NovelAdapter(novel);
}
}).subscribeOn(Schedulers.io());
Single<IBook> rxJava2Tutorial = Single.fromCallable(new Callable<RxJava2Tutorial>() {
@Override
public RxJava2Tutorial call() throws Exception {
return getRxJava2Tutorial();
}
}).map(new Function<RxJava2Tutorial, IBook>() {
@Override
public IBook apply(RxJava2Tutorial rxJava2Tutorial) throws Exception {
return new RxJava2TutorialAdapter(rxJava2Tutorial);
}
}).subscribeOn(Schedulers.io());
// 注意此处调用的是合并两个 SingleSource 的方法,zip 操作符的重载方法很多,从 2~9 都有,相应的变换函数也有从 2~9,无语啊~
Single.zip(novel, rxJava2Tutorial, new BiFunction<IBook, IBook, List<IBook>>() {
@Override
public List<IBook> apply(IBook iBook, IBook iBook2) throws Exception {
List<IBook> books = new ArrayList<>(2);
books.add(iBook);
books.add(iBook2);
return books;
}
}).subscribe(new Consumer<List<IBook>>() {
@Override
public void accept(List<IBook> iBooks) throws Exception {
Logger.d(TAG, "test: books are " + iBooks);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Logger.d(TAG, "test: get books error.", throwable);
}
});
上述代码分别读取 Novel
和 RxJava2Tutorial
两种不同类型书籍,再分别转化为 IBook
类型,然后添加到同一数组中,最后发射给下游。
zip(SingleSource source1, SingleSource source2, BiFunction zipper) [-> Single.java]
public static <T1, T2, R> Single<R> zip(
SingleSource<? extends T1> source1, SingleSource<? extends T2> source2,
BiFunction<? super T1, ? super T2, ? extends R> zipper
) {
ObjectHelper.requireNonNull(source1, "source1 is null");
ObjectHelper.requireNonNull(source2, "source2 is null");
return zipArray(Functions.toFunction(zipper), source1, source2);
}
toFunction(BiFunction f) [-> Functions.java]
public static <T1, T2, R> Function<Object[], R> toFunction(final BiFunction<? super T1, ? super T2, ? extends R> f) {
ObjectHelper.requireNonNull(f, "f is null");
// 注意因为上面调用的是合并两个 SingleSource 的方法,所以这里调用的就是 Array2Func,2表示合并个数,像这样的还有 Array3Func、Array4Func、... Array9Func
// 作用就是把多参数的 BiFunction 统一转化为一个参数(Object[])的 Function 对象,调用的时候再把参数从 Object[] 里取出来即可
return new Array2Func<T1, T2, R>(f);
}
Array2Func [-> Functions::Array2Func]
static final class Array2Func<T1, T2, R> implements Function<Object[], R> {
final BiFunction<? super T1, ? super T2, ? extends R> f;
Array2Func(BiFunction<? super T1, ? super T2, ? extends R> f) {
this.f = f;
}
@SuppressWarnings("unchecked")
@Override
public R apply(Object[] a) throws Exception {
if (a.length != 2) {
throw new IllegalArgumentException("Array of size 2 expected but got " + a.length);
}
// 从 Object[] 中取出实参,然后调用实际的合并函数
return f.apply((T1)a[0], (T2)a[1]);
}
}
zipArray(Function zipper, SingleSource... sources) [-> Single.java]
public static <T, R> Single<R> zipArray(Function<? super Object[], ? extends R> zipper, SingleSource<? extends T>... sources) {
ObjectHelper.requireNonNull(zipper, "zipper is null");
ObjectHelper.requireNonNull(sources, "sources is null");
if (sources.length == 0) {
return error(new NoSuchElementException());
}
return RxJavaPlugins.onAssembly(new SingleZipArray<T, R>(sources, zipper));
}
SingleZipArray [-> SingleZipArray.java]
// 继承自 Single,而 Single 实现了 SingleSource 接口,所以也继承了 subscribe() 方法
public final class SingleZipArray<T, R> extends Single<R> {
// 用来保存要合并的 SingleSource
final SingleSource<? extends T>[] sources;
// 用来保存合并函数
final Function<? super Object[], ? extends R> zipper;
public SingleZipArray(SingleSource<? extends T>[] sources, Function<? super Object[], ? extends R> zipper) {
// 将 SingleSource 保存为 this.sources
this.sources = sources;
// 将合并函数保存为 this.zipper
this.zipper = zipper;
}
// 通过上面 subscribeOn 的源码分析可知,调用 subscribe() 时,便会调用到上游的 subscribeActual()
// 此处的 observer 同样也是 ConsumerSingleObserver
@Override
protected void subscribeActual(SingleObserver<? super R> observer) {
SingleSource<? extends T>[] sources = this.sources;
int n = sources.length;
// 本示例合并的 SingleSource 个数为 2,即 n=2
if (n == 1) {
sources[0].subscribe(new SingleMap.MapSingleObserver<T, R>(observer, new SingletonArrayFunc()));
return;
}
// 将 ConsumerSingleObserver、SingleSource个数、合并函数封装为 ZipCoordinator,用来等待所有
// SingleSource 都处理完,然后对其发射的值应用合并函数
ZipCoordinator<T, R> parent = new ZipCoordinator<T, R>(observer, n, zipper);
// 调用 ConsumerSingleObserver 的 onSubscribe()
observer.onSubscribe(parent);
// 一个 for 循环,挨个调用 SingleSource 的 subscribe(),触发生产者开始生产
for (int i = 0; i < n; i++) {
if (parent.isDisposed()) {
return;
}
SingleSource<? extends T> source = sources[i];
if (source == null) {
parent.innerError(new NullPointerException("One of the sources is null"), i);
return;
}
source.subscribe(parent.observers[i]);
}
}
......
}
ZipCoordinator [-> SingleZipArray::ZipCoordinator]
// 合并函数协调器,注意继承自 AtomicInteger,以便采用计数法检测是否所有的 SingleSource 都发射完毕
static final class ZipCoordinator<T, R> extends AtomicInteger implements Disposable {
private static final long serialVersionUID = -5556924161382950569L;
// 保存下游观察者,本示例此处为 ConsumerSingleObserver
final SingleObserver<? super R> actual;
// 保存合并函数
final Function<? super Object[], ? extends R> zipper;
// SingleZipArray 的直接观察者,用来分别接收每个 SingleSource 发射的结果
// 每收到一个值 (即每回调一次 ZipSingleObserver 的 onSuccess()),计数值-1,直至计数值为0,说明全部发射完毕
final ZipSingleObserver<T>[] observers;
// 保存每个 SingleSource 发射的结果
final Object[] values;
@SuppressWarnings("unchecked")
ZipCoordinator(SingleObserver<? super R> observer, int n, Function<? super Object[], ? extends R> zipper) {
// 因为继承自 AtomicInteger,所以调用父类构造器,设置计数值
super(n);
// 将下游观察者 ConsumerSingleObserver 保存为 this.actual,用来接收合并后的结果
this.actual = observer;
// 将合并函数保存为 this.zipper
this.zipper = zipper;
// 根据 SingleSource 的个数,生成相应个数的 SingleObserver,然后保存为 this.observers
ZipSingleObserver<T>[] o = new ZipSingleObserver[n];
for (int i = 0; i < n; i++) {
o[i] = new ZipSingleObserver<T>(this, i);
}
this.observers = o;
// 根据 SingleSource 的个数,生成相应长度的 Object[],用来保存它们发射的结果
this.values = new Object[n];
}
@Override
public boolean isDisposed() {
return get() <= 0;
}
@Override
public void dispose() {
if (getAndSet(0) > 0) {
for (ZipSingleObserver<?> d : observers) {
d.dispose();
}
}
}
// 上游调用 ZipSingleObserver::onSuccess() 时,便会调用该方法,触发计数值-1
void innerSuccess(T value, int index) {
values[index] = value;
// 判断计数值是否已减至零
if (decrementAndGet() == 0) {
// 计数值为0,说明 SingleSource 全部发射完毕,可以调用合并函数了
R v;
try {
// 调用合并函数,获得合并后的结果
v = ObjectHelper.requireNonNull(zipper.apply(values), "The zipper returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
actual.onError(ex);
return;
}
// 将合并后的结果发射给下游,即 ConsumerSingleObserver
actual.onSuccess(v);
}
}
void disposeExcept(int index) {
ZipSingleObserver<T>[] observers = this.observers;
int n = observers.length;
for (int i = 0; i < index; i++) {
observers[i].dispose();
}
for (int i = index + 1; i < n; i++) {
observers[i].dispose();
}
}
void innerError(Throwable ex, int index) {
if (getAndSet(0) > 0) {
disposeExcept(index);
actual.onError(ex);
} else {
RxJavaPlugins.onError(ex);
}
}
}
ZipSingleObserver [-> SingleZipArray::ZipSingleObserver]
// 用来接受 SingleSource 发射的结果
static final class ZipSingleObserver<T> extends AtomicReference<Disposable>
implements SingleObserver<T> {
private static final long serialVersionUID = 3323743579927613702L;
// zip 协调器,用来触发计数值-1、计数值为0时调用合并函数并发射合并结果
final ZipCoordinator<T, ?> parent;
// 接受第几个 SingleSource 的结果
final int index;
ZipSingleObserver(ZipCoordinator<T, ?> parent, int index) {
this.parent = parent;
this.index = index;
}
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this, d);
}
@Override
public void onSuccess(T value) {
// SingleSource 发射结果时,调用到这里
// 调用 ZipCoordinator::innerSuccess()
parent.innerSuccess(value, index);
}
@Override
public void onError(Throwable e) {
// SingleSource 发射错误时,调用到这里
// 调用 ZipCoordinator::innerError
parent.innerError(e, index);
}
}
总结
此情此景,我想吟诗一首:
《源码分析》
--尼古拉斯·Yulo
RxJava 真牛逼,
你且看我来分析。
链式调用很神奇,
线程调度也随意。
码农工作不容易,
下班还得把学习。
要问源码哪里有?
还得简书看Yulo。
奥,类图忘了贴了:
Class Diagram总结:
调用时序简化版除了最上层的被观察者和最下层的观察者,中间的 Single 子类必有一与之对应的 SingleObserver 实现类,总结起来就是:
- 我的下游的下游不是我的下游
- 我的上游的上游不是我的上游
- 我只能访问我的直接上游和直接下游
嗯,这大概可以起名叫 异步责任链模式
。。。
别搜了,这名字是我自己想的!
网友评论