美文网首页
Rxjava Single.zip 抛出Undeliverabl

Rxjava Single.zip 抛出Undeliverabl

作者: Xigong | 来源:发表于2019-11-23 13:32 被阅读0次

    异常信息

    io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling

    如何复现

    fun main(args: Array<String>) {
        repeat(1000) {
    
            Single.zip(single(1), single(2),
                    BiFunction<Int, Int, String> { t1, t2 ->
    
                        StringBuilder()
                                .append(t1)
                                .append(t2)
                                .toString()
                    })
                    .subscribe({
                        println(it)
                    }, {
                        it.printStackTrace(System.out)
                    })
    
            Thread.sleep(50)
    
        }
    }
    
    private fun single(value: Int): Single<Int> = Single.just(value).delay(10, TimeUnit.MILLISECONDS)
            .map { throw RuntimeException(it.toString()) }
    

    分析错误发生的原因

    • google了一下,发现了这个issues https://github.com/ReactiveX/RxJava/issues/6249
      大概意思是说Single.zip() 只能处理一个数据源的异常,如果有多个数据源都发生异常,那么除了第一个异常,发送到onError方法中,剩下的异常,都不会被分发到Subscirbtion中,而是分发到RxJavaPlugins.onError

    代码分析

    io.reactivex.internal.operators.single.SingleZipArray.ZipCoordinator#innerError

            void innerError(Throwable ex, int index) {
                // 第一次,调用下游的观察者的.onError方法
                if (getAndSet(0) > 0) {
                    disposeExcept(index);
                    downstream.onError(ex);
                } else {
                // 第二次,就调用RxJavaPlugins分发异常了
                    RxJavaPlugins.onError(ex);
                }
            }
    
    

    怎么解决呢?

    简单的方案是把Single换成Observable或者是Flowable

    • Flowable或者Observable 的策略是,发送一系列事件,当发生一个异常时,事件流中断 。
      Observable相关代码:io.reactivex.internal.operators.observable.ObservableZip.ZipCoordinator#drain
           public void drain() {
                if (getAndIncrement() != 0) {
                    return;
                }
    
                int missing = 1;
    
                final ZipObserver<T, R>[] zs = observers;
                final Observer<? super R> a = downstream;
                final T[] os = row;
                final boolean delayError = this.delayError;
    
                for (;;) {
    
                    for (;;) {
                        int i = 0;
                        int emptyCount = 0;
                        for (ZipObserver<T, R> z : zs) {
                            if (os[i] == null) {
                                boolean d = z.done;
                                T v = z.queue.poll();
                                boolean empty = v == null;
    
                                if (checkTerminated(d, empty, a, delayError, z)) {
                                    return;
                                }
                                if (!empty) {
                                    os[i] = v;
                                } else {
                                    emptyCount++;
                                }
                            } else {
                                if (z.done && !delayError) {
                                    Throwable ex = z.error;
                                    if (ex != null) {
                                        cancelled = true;
                                        cancel();
                                        a.onError(ex);
                                        return;
                                    }
                                }
                            }
                            i++;
                        }
    
                        if (emptyCount != 0) {
                            break;
                        }
    
                        R v;
                        try {
                            v = ObjectHelper.requireNonNull(zipper.apply(os.clone()), "The zipper returned a null value");
                        } catch (Throwable ex) {
                            Exceptions.throwIfFatal(ex);
                            cancel();
                            a.onError(ex);
                            return;
                        }
    
                        a.onNext(v);
    
                        Arrays.fill(os, null);
                    }
    
                    missing = addAndGet(-missing);
                    if (missing == 0) {
                        return;
                    }
                }
            }
    
            boolean checkTerminated(boolean d, boolean empty, Observer<? super R> a, boolean delayError, ZipObserver<?, ?> source) {
                if (cancelled) {
                    cancel();
                    return true;
                }
    
                if (d) {
                    if (delayError) {
                        if (empty) {
                            Throwable e = source.error;
                            cancelled = true;
                            cancel();
                            if (e != null) {
                                a.onError(e);
                            } else {
                                a.onComplete();
                            }
                            return true;
                        }
                    } else {
                        Throwable e = source.error;
                        if (e != null) {
                            // 发生异常这里会执行
                            cancelled = true;
                            cancel();
                            a.onError(e);
                            return true;
                        } else
                        if (empty) {
                            cancelled = true;
                            cancel();
                            a.onComplete();
                            return true;
                        }
                    }
                }
    
                return false;
            }
        }
    

    产生这个问题的反思?

    为什么会产生这个问题呢?

    • 我并没有完全理解Single 的行为方式,错误的以为,只要我设置了onError方法,异常一定会捕获。
    • Single在被设计的目的是处理单个事件,所有不能捕获多个异常。

    相关文章

      网友评论

          本文标题:Rxjava Single.zip 抛出Undeliverabl

          本文链接:https://www.haomeiwen.com/subject/ynfhwctx.html