在 Android 中使用 RxJava 经常会用到 observeOn
这个操作符来完成线程的切换,比如网络请求之后切换到“主线程”,通常会这么写:
Observable.observeOn(AndroidSchedulers.mainThread())
然而我遇到一个奇怪的问题,onNext
有时候会丢失,先描述一下场景。我用 Retrofit + RxJava 来进行网络请求,我可以直接拿到一个 Observable
,就像下面这样:
// 定义 Retrofit 接口
public interface FeedApi {
@GET("feeds")
Observable<List<Feed>> feeds();
}
// 省略部分代码
RestApi restApi = retrofit.create(RestApi.class);
Observable<List<Feed>> observable = restApi.feeds()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
observable.subscribe(...);
目前来看一切正常,我为了更好的用户体验,加入了缓存,当网络有问题的时候,会显示缓存数据,用户不会看到一个空白的页面。缓存的方式多种多样,比如 Realm
,同样支持 RxJava,我依然可以拿到一个 Observable
,然后我使用 concat
把来自缓存的 Observable
和来自网络的 Observable
合并为一个 Observable
对外提供,就像下面这样:
Observable<List<Feed>> fromNetwork = ...
Observable<List<Feed>> fromCache = ...
Observable<List<Feed>> observable = Observable.concat(fromCache, fromNetwork)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
observable.subscribe(...);
合并之后的 Observable
继续使用 subscribeOn
和 observeOn
来完成线程的切换。恩,看起来很完美,我想象中运行效果是这样的:
假设已经存在缓存的情况下
网络正常的时候,fromCache
从缓存拿到数据发送给Subscriber
,显示到页面上,然后fromNetwork
从服务器拿到最新的数据,继续发送给Subscriber
,刷新页面为最新的数据,Subscribe
会经历onNext()
->onNext()
->onCompleted()
。
网络异常的情况,fromCache
从缓存拿到数据发送给Subscriber
,显示到页面上,然后fromNetwork
发生异常,Subscriber
收到Error
,可以提示用户网络异常,但是页面上会显示缓存的数据,用户不会看到空白页面,Subscriber
会经历onNext()
->onError()
。
我运行之后,在网络正常的情况下,能显示数据,当我把网络关闭的时候,依然能显示数据,数据是从缓存加载的,还不错,和我想象的一样。但是当我反复多次打开页面的时候,发现一个奇怪的现象,页面有时候会显示空白,有时候会显示缓数据,但是都会提示网络异常的信息,从 Subscriber
的角度讲,就是 Subscriber
有时候会 onNext()
-> onError()
,有时候只会 onError()
,缓存数据的那次 onNext()
丢失了!
为了搞清楚这个问题,我做了这样的测试,在 subscribe
之前各个环节加上 doOnNext
,观察会不会执行。
observable
.doOnNext(s -> log("1 doOnNext: " + s))
.subscribeOn(Schedulers.io())
.doOnNext(s -> log("2 doOnNext: " + s))
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(s -> log("3 doOnNext: " + s))
.subscribe(...);
// 输出结果
1 doOnNext: test
2 doOnNext: test
第三个居然没有打印,说明 observeOn
之后的 onNext()
没有执行,确定了问题范围之后,我尝试从 observeOn
的源码寻找线索。我发现 observeOn
有几个重载的方法,其中有一个参数叫 delayError
,看名字是“延迟错误”。
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError);
注释对这个参数的描述是:
indicates if the onError notification may not cut ahead of onNext notification on the other side of the scheduling boundary. If true a sequence ending in onError will be replayed in the same order as was received from upstream
大概说 delayError
决定了 onError
会不会切断 onNext
通知,貌似和我的问题有些关系,为了弄清楚具体情况,还是看源码。
observeOn
基于 lift
操作符,lift
需要一个 Operator
,Operator
可以将 下游 Subscriber
进行“代理”,返回一个 “代理” Subscriber
对象,“代理”对象可以收到 onNext
、onError
、onCompleted
,然后可以进行一些处理,再选择性的发送给 下游 Subscriber
。
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
// OperatorObserveOn 是关键
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}
我们继续看 OperatorObserveOn
。
public final class OperatorObserveOn<T> implements Operator<T, T> {
...
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
// 创建一个 Subscriber “代理”
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}
// Subscriber 的代理,它会代理 下游的 Subscriber 收到事件
// 然后在 Scheduler 的线程中再转发给 下游的 Subscriber,就完成了线程的切换
static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
@Override
public void onNext(final T t) {
// 代理对象先收到 onNext
if (isUnsubscribed() || finished) {
return;
}
// 把收到的数据放到一个队列中
if (!queue.offer(NotificationLite.next(t))) {
onError(new MissingBackpressureException());
return;
}
// 通知 Scheduler,Scheduler 会回调 call 方法
schedule();
}
@Override
public void onCompleted() {
if (isUnsubscribed() || finished) {
return;
}
// 标记 流 已经结束
finished = true;
// 通知 Scheduler,Scheduler 会回调 call 方法
schedule();
}
@Override
public void onError(final Throwable e) {
if (isUnsubscribed() || finished) {
RxJavaHooks.onError(e);
return;
}
// 保存 error
error = e;
// 标记 流 已经结束
finished = true;
// 通知 Scheduler,Scheduler 会回调 call 方法
schedule();
}
protected void schedule() {
if (counter.getAndIncrement() == 0) {
// 把 this 当做一个 callback,Scheduler 会调用 call 方法。
recursiveScheduler.schedule(this);
}
}
// only execute this from schedule()
@Override
public void call() {
// 从 Scheduler 中调用,此时已经在 Scheduler 的线程中,比如 Android 的主线程中
long missed = 1L;
long currentEmission = emitted;
final Queue<Object> q = this.queue; // 保存 next 数据的队列
final Subscriber<? super T> localChild = this.child; // 下游的 Subscriber
for (;;) {
long requestAmount = requested.get();
while (requestAmount != currentEmission) {
boolean done = finished;
// 从队列中取数据,就是 onNext 的时候放到队列中的
Object v = q.poll();
// 判断是不是 null
boolean empty = v == null;
// 这里是关键,检查是否需要中断 onNext
if (checkTerminated(done, empty, localChild, q)) {
// 中断,最终的 Subscriber 不会收到 onNext
return;
}
if (empty) {
break;
}
// 转发给 下游的 Subscriber
localChild.onNext(NotificationLite.<T>getValue(v));
...
}
}
}
// 检查是否需要中断 onNext
boolean checkTerminated(boolean done, boolean isEmpty, Subscriber<? super T> a, Queue<Object> q) {
if (a.isUnsubscribed()) {
q.clear();
return true;
}
if (done) {
// 如果 流 已经结束,执行了 onCompleted 或者 onError
if (delayError) {
// 如果是延迟错误,delayError 默认是 false
if (isEmpty) {
// 如果没有 onNext 的数据了
Throwable e = error;
try {
if (e != null) {
// 如果存在 error,直接转发给 下游的 Subscriber
a.onError(e);
} else {
// 如果是正常结束,也是直接转发给 下游的 Subscriber
a.onCompleted();
}
} finally {
recursiveScheduler.unsubscribe();
}
}
} else {
// 如果没有延迟错误,这也是默认的情况
Throwable e = error;
if (e != null) {
// **** 如果已经有 error 了,把队列里的 onNext 情况,也就是 onNext 的数据丢失了!
q.clear();
try {
// **** 把 error 发送给 下游的 Subscriber
a.onError(e);
} finally {
recursiveScheduler.unsubscribe();
}
return true;
} else
if (isEmpty) {
// 如果没有 error,并且没有 onNext 的数据了,直接完成
try {
a.onCompleted();
} finally {
recursiveScheduler.unsubscribe();
}
return true;
}
}
}
return false;
}
}
}
通过上面对源码的分析,可以确定这个问题出现的原因了,上面注释中 ****
标记的地方就是问题所在:
if (e != null) {
// **** 如果已经有 error 了,把队列里的 onNext 情况,也就是 onNext 的数据丢失了!
q.clear();
try {
// **** 把 error 发送给 下游的 Subscriber
a.onError(e);
} finally {
recursiveScheduler.unsubscribe();
}
return true;
}
默认 delayError
是 false
,就会走上面这段代码,如果有 error
就会把队列中还未下发给 下游 Subscriber
的 onNext
数据清空,从而导致了 Subscriber
没有收到 onNext
,直接收到了 onError
。
为什么这个问题不是必现的呢,因为这有一个条件,就是出现 onError
的时候,onNext
的数据还没有被消费完,队列中还有数据的情况下才会丢失,也就是 Scheduler
的处理速度跟不上生产速度,这很好理解,因为我们用的是 AndroidSchedulers.mainThread()
,Android 的主线程也就是 Main Lopper
,是一个单线程模型,只能一个个的处理,不能并行,而且 Android 中的各种事件,包括 View 的绘制都要通过 Main Looper
来完成,如果 Looper
中积压的消息太多,那么新消息就不会被及时的处理,这时候我们的 fromCache
把缓存数据发送给 Subscriber
,就会由于 Looper
中积压的消息过多,Scheduler
不能立刻执行 ObserveOnSubscriber
的 call
方法,从而不能立刻被 下游的 Subscriber
接收到,与此同时 fromNetowrk
发生了 error
,当 Looper
处理到消息的时候,也就是 ObserveOnSubscriber
的 call
方法被调用的时候,这时候队列中存在一个 fromCache
的数据,并且 error
也存在,就发生了上面源码分析的情况,导致队列被清空,onNext
没有执行,直接执行了 onError
。
怎么解决这个问题?答案已经明确了,就是显式的传递 delayError
为 true
。
Observable.observeOn(AndroidSchedulers.mainThread(), true)
这个问题之前有人在 GitHub 上提过 Issue,并且在这个 Pull 3544 中解决,也就是增加了 delayError
,在此之前是没有这个参数的。
明白了原因之后,这个问题其实和 concat
没有什么关系,不用 concat
依然可能出现 onNext
丢失,由于 Main Looper
我们不好去控制,可以通过一个简单的 Java 程序来模拟这种情况。
public static void main(String[] args) {
// 用一个 单线程的线程池 来模拟 Android 的主线程
ExecutorService executor = Executors.newSingleThreadExecutor();
Scheduler androidMainThread = Schedulers.from(executor);
Observable<String> observable = Observable.create(subscriber -> {
// 发送数据
subscriber.onNext("text");
// 一秒后发送 error
sleep(1000);
subscriber.onError(new RuntimeException());
});
// 完成线程切换,然后订阅
observable
.subscribeOn(Schedulers.io())
.observeOn(androidMainThread)
.subscribe(s -> {
// 收到 onNext 数据
System.out.println("onNext: " + s);
}, throwable -> {
// 收到 onError
System.out.println("onError: " + throwable);
});
}
static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
输出结果:
onNext: text
onError: java.lang.RuntimeException
输出结果是正常的,既收到了 onNext
又收到了 onError
,因为 androidMainThread
中没有其他的任务要处理,Observable
发送 onNext
之后会 sleep
一秒钟之后发送 onError
,androidMainThread
在这一秒钟之内完全可以把 onNext
的数据消费掉,下面模拟 androidMainThread
中有任务处理的情况。
// 主线程有其他任务处理,会阻塞 两秒
executor.execute(() -> sleep(2000));
// 完成线程切换,然后订阅
observable
.subscribeOn(Schedulers.io())
.observeOn(androidMainThread)
.subscribe(s -> {
// 收到 onNext 数据
System.out.println("onNext: " + s);
}, throwable -> {
// 收到 onError
System.out.println("onError: " + throwable);
});
输出结果:
onError: java.lang.RuntimeException
会发现只有 onError
,onNext
丢失了,因为 Observable
只是 sleep
了一秒就发送了 onError
,而 androidMainThread
的 线程 正在执行一个需要耗时两秒的任务,执行完之后,ObserveOnSubscriber
中的队列存在一个 onNext
数据和一个 error
,这就会发生之前分析的情况,队列被清空,直接发送了 onError
。我们再试试 显式 传递 delayError
的情况。
// 主线程有其他任务处理,会阻塞 两秒
executor.execute(() -> sleep(2000));
// 完成线程切换,然后订阅
observable
.subscribeOn(Schedulers.io())
.observeOn(androidMainThread, true) // delayError 为 true
.subscribe(s -> {
// 收到 onNext 数据
System.out.println("onNext: " + s);
}, throwable -> {
// 收到 onError
System.out.println("onError: " + throwable);
});
输出结果:
onNext: text
onError: java.lang.RuntimeException
会发现 onNext
收到了,这也证明了上面源码分析的逻辑是正确的。
网友评论
bservable<Void> o = Observable.create(new Observable.OnSubscribe<Void>() {
@Override
public void call(Subscriber<? super Void> subscriber) {
// Do the work and call onCompleted when you done,
// no need to call onNext if you have nothing to emit
subscriber.onCompleted();
}
});
o.subscribe(new OnCompletedObserver<Void>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.println("onError " + e.getMessage());
}
});