本文主要目的是梳理Rxjava的核心调用流程,主要涉及以下3个类
- 核心类:
Observable
- 调用者传入的subscriber的包装类:
SafeSubscriber
-
Observable
所持有的onSubscribe
1. 首先是核心类Observable
的源码
Observable通过构造创建的时, 持有了onSubscribe对象
而subscribe()
方法就做了两件事:
- 将传入的
Subscriber
包装为SafeSubscriber
; - 然后调用
onSubscribe.call(safeSubscriber)
;
所以我们传入的subscriber
对象的onNext
、onError
、onComplete
方法在何时何地被如何调用的,关键还是看Observable
创建时传入的onSubscribe
对象 的call()
方法
只看核心流程部分,其他部分(各种静态方法以及操作符)省略
public class Observable<T> {
final OnSubscribe<T> onSubscribe;
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
public final Subscription subscribe(Subscriber<? super T> subscriber) {
// subscriber和onSubscribe非空校验
......
// 一上来就调用subscriber的onstart方法,
// 所以onstart永远在调用者所在的线程内执行,不受各种线程切换的操作符影响
subscriber.onStart();
// 重点:将原始的subscriber包装为SafeSubscriber(源码稍后再看)
if (!(subscriber instanceof SafeSubscriber)) {
subscriber = new SafeSubscriber<T>(subscriber);
}
try {
// 核心逻辑: 调用当前对象持有的onSubscribe的call方法
onSubscribe.call(subscriber);
return subscriber;
} catch (Throwable e) {
// 到此为止,以下是一堆异常处理
......
return Subscriptions.unsubscribed();
}
}
2. onSubscribe
没什么好讲的。。。
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
}
public interface Action1<T> extends Action {
void call(T t);
}
3. SafeSubscriber
这个类主要做了以下几件事,来保证所谓的safe:
- 保证
onError
和onComplete
两个方法只能有一个被调用 - 保证
onError
或者onComplete
执行完毕后会unsubscribe
- catch
onNext
方法执行过程中抛出的所有异常,交由onError(e)
处理 - 对于
onError
或者onComplete
执行过程中产生的任何异常都会直接抛出, 分别抛出OnErrorFailedException
和OnCompletedFailedException
-
onError
方法会对原Subscriber
的onError
是否实现做校验,若没有实现,则抛出OnErrorNotImplementedException
核心源码如下:
public class SafeSubscriber<T> extends Subscriber<T> {
//你传入的那个subscriber
private final Subscriber<? super T> actual;
//toggle, 保证onError和onComplete两个方法只能有一个被调用
boolean done;
public SafeSubscriber(Subscriber<? super T> actual) {
super(actual);
this.actual = actual;
}
@Override
public void onCompleted() {
if (!done) {
done = true;
try {
actual.onCompleted();
} catch (Throwable e) {
...
//将所有throwable包装成OnCompletedFailedException,并抛出
throw new OnCompletedFailedException(e.getMessage(), e);
} finally {
//取消订阅
unsubscribe();
...
}
}
}
@Override
public void onError(Throwable e) {
...
if (!done) {
done = true;
_onError(e);
}
}
@Override
public void onNext(T t) {
try {
if (!done) {
actual.onNext(t);
}
} catch (Throwable e) {
// 原先代码为Exceptions.throwOrReport(e, this); 为更好理解,更改如下:
actual.onError(e);
}
}
protected void _onError(Throwable e) {
...
try {
actual.onError(e);
} catch (OnErrorNotImplementedException e2) {
throw e2;
} catch (Throwable e2) {
// 将所有throwable包装为OnErrorFailedException,并抛出
throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError", new CompositeException(Arrays.asList(e, e2)));
} finally {
...
//取消订阅
unsubscribe();
...
}
}
...
}
网友评论