美文网首页RxJava
RxJava出错重连

RxJava出错重连

作者: shiyuzhe | 来源:发表于2019-01-10 17:46 被阅读19次

    retryWhen

      模拟网络请求出错重连
    
     /**
         * 重连三次
         * 测试:第三次重连时模拟请求成功,前两次发送错误
         */
        fun retry() {
            var num = 0//记录重连次数
            Observable.timer(1, TimeUnit.SECONDS).doOnSubscribe {
                System.out.println("subscribing")
            }.map {
                if (++num > 2)
                    return@map 1
                throw  RuntimeException()
            }.retryWhen {
                val counter = AtomicInteger()
                it.takeWhile {
                    counter.getAndIncrement() != 3
                }.flatMap {
                    System.out.println("delay retry by " + counter.get() + " second(s)")
                    Observable.timer(counter.toLong(), TimeUnit.SECONDS)
                }
            }.observeOn(AndroidSchedulers.mainThread())
                    .subscribeOn(Schedulers.io())
                    .subscribeBy {
                        System.out.println("subscribeBy$it")
                    }
        }
    

    解决多次调用retry(),在subscribe之前dispose掉之前的

      private var disposable: Disposable? = null
        /**
         * 重连三次
         * 测试:第三次重连时模拟请求成功,前两次发送错误
         */
        fun retry() {
            var num = 0//记录重连次数
            disposable = Observable.timer(1, TimeUnit.SECONDS)
                    .map {
                        if (++num > 2)
                            return@map 1
                        throw  RuntimeException()
                    }.retryWhen {
                        val counter = AtomicInteger()
                        it.takeWhile {
                            counter.getAndIncrement() != 3
                        }.flatMap {
                            System.out.println("delay retry by " + counter.get() + " second(s)")
                            Observable.timer(counter.toLong(), TimeUnit.SECONDS)
                        }
                    }.observeOn(AndroidSchedulers.mainThread())
                    .subscribeOn(Schedulers.io())
                    .doOnSubscribe {
                        //retryWhen之后调用一次,之前每次重连都调用
                        disposable?.dispose()
                        System.out.println("subscribing")
                    }.subscribeBy {
                        System.out.println("subscribeBy$it")
                    }
        }
    

    将重连放到扩展函数中

    fun ret() {
            var num = 0//记录重连次数
            disposable = Observable.timer(1, TimeUnit.SECONDS)
                    .map {
                        if (++num > 2)
                            return@map 1
                        throw  RuntimeException()
                    }.subscribeByThreadRetry(disposable) {
                        System.out.println("subscribeByThreadRetry:$it")
                    }
        }
    
    fun <T : Any> Observable<T>.subscribeByThreadRetry(
    disposable: Disposable?, 
    onErrStub: (Throwable) -> Unit = {}, 
    onNextStub: (T) -> Unit = {}): Disposable = this.retryWhen {
        val counter = AtomicInteger()
        it.takeWhile {
            counter.getAndIncrement() != 3
        }.flatMap {
            Observable.timer(counter.toLong(), TimeUnit.SECONDS)
        }
    }.observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.io())
            .doOnSubscribe {
                //retryWhen之后调用一次,之前每次重连都调用
                disposable?.dispose()
            }.subscribeBy(onErrStub, {}, onNextStub)
    
    

    相关文章

      网友评论

        本文标题:RxJava出错重连

        本文链接:https://www.haomeiwen.com/subject/cvnarqtx.html