异常信息
io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling
如何复现
fun main(args: Array<String>) {
repeat(1000) {
Single.zip(single(1), single(2),
BiFunction<Int, Int, String> { t1, t2 ->
StringBuilder()
.append(t1)
.append(t2)
.toString()
})
.subscribe({
println(it)
}, {
it.printStackTrace(System.out)
})
Thread.sleep(50)
}
}
private fun single(value: Int): Single<Int> = Single.just(value).delay(10, TimeUnit.MILLISECONDS)
.map { throw RuntimeException(it.toString()) }
分析错误发生的原因
- google了一下,发现了这个issues https://github.com/ReactiveX/RxJava/issues/6249
大概意思是说Single.zip() 只能处理一个数据源的异常,如果有多个数据源都发生异常,那么除了第一个异常,发送到onError方法中,剩下的异常,都不会被分发到Subscirbtion中,而是分发到RxJavaPlugins.onError
代码分析
io.reactivex.internal.operators.single.SingleZipArray.ZipCoordinator#innerError
void innerError(Throwable ex, int index) {
// 第一次,调用下游的观察者的.onError方法
if (getAndSet(0) > 0) {
disposeExcept(index);
downstream.onError(ex);
} else {
// 第二次,就调用RxJavaPlugins分发异常了
RxJavaPlugins.onError(ex);
}
}
怎么解决呢?
简单的方案是把Single换成Observable或者是Flowable
- Flowable或者Observable 的策略是,发送一系列事件,当发生一个异常时,事件流中断 。
Observable相关代码:io.reactivex.internal.operators.observable.ObservableZip.ZipCoordinator#drain
public void drain() {
if (getAndIncrement() != 0) {
return;
}
int missing = 1;
final ZipObserver<T, R>[] zs = observers;
final Observer<? super R> a = downstream;
final T[] os = row;
final boolean delayError = this.delayError;
for (;;) {
for (;;) {
int i = 0;
int emptyCount = 0;
for (ZipObserver<T, R> z : zs) {
if (os[i] == null) {
boolean d = z.done;
T v = z.queue.poll();
boolean empty = v == null;
if (checkTerminated(d, empty, a, delayError, z)) {
return;
}
if (!empty) {
os[i] = v;
} else {
emptyCount++;
}
} else {
if (z.done && !delayError) {
Throwable ex = z.error;
if (ex != null) {
cancelled = true;
cancel();
a.onError(ex);
return;
}
}
}
i++;
}
if (emptyCount != 0) {
break;
}
R v;
try {
v = ObjectHelper.requireNonNull(zipper.apply(os.clone()), "The zipper returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
cancel();
a.onError(ex);
return;
}
a.onNext(v);
Arrays.fill(os, null);
}
missing = addAndGet(-missing);
if (missing == 0) {
return;
}
}
}
boolean checkTerminated(boolean d, boolean empty, Observer<? super R> a, boolean delayError, ZipObserver<?, ?> source) {
if (cancelled) {
cancel();
return true;
}
if (d) {
if (delayError) {
if (empty) {
Throwable e = source.error;
cancelled = true;
cancel();
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
return true;
}
} else {
Throwable e = source.error;
if (e != null) {
// 发生异常这里会执行
cancelled = true;
cancel();
a.onError(e);
return true;
} else
if (empty) {
cancelled = true;
cancel();
a.onComplete();
return true;
}
}
}
return false;
}
}
产生这个问题的反思?
为什么会产生这个问题呢?
- 我并没有完全理解Single 的行为方式,错误的以为,只要我设置了onError方法,异常一定会捕获。
- Single在被设计的目的是处理单个事件,所有不能捕获多个异常。
网友评论