美文网首页安卓精华教程RxJavaAndroid开发经验谈
RxJava2 实战知识梳理(5) - 简单及进阶的轮询操作

RxJava2 实战知识梳理(5) - 简单及进阶的轮询操作

作者: 泽毛 | 来源:发表于2017-08-29 08:35 被阅读2915次

    RxJava2 实战系列文章

    RxJava2 实战知识梳理(1) - 后台执行耗时操作,实时通知 UI 更新
    RxJava2 实战知识梳理(2) - 计算一段时间内数据的平均值
    RxJava2 实战知识梳理(3) - 优化搜索联想功能
    RxJava2 实战知识梳理(4) - 结合 Retrofit 请求新闻资讯
    RxJava2 实战知识梳理(5) - 简单及进阶的轮询操作
    RxJava2 实战知识梳理(6) - 基于错误类型的重试请求
    RxJava2 实战知识梳理(7) - 基于 combineLatest 实现的输入表单验证
    RxJava2 实战知识梳理(8) - 使用 publish + merge 优化先加载缓存,再读取网络数据的请求过程
    RxJava2 实战知识梳理(9) - 使用 timer/interval/delay 实现任务调度
    RxJava2 实战知识梳理(10) - 屏幕旋转导致 Activity 重建时恢复任务
    RxJava2 实战知识梳理(11) - 检测网络状态并自动重试请求
    RxJava2 实战知识梳理(12) - 实战讲解 publish & replay & share & refCount & autoConnect
    RxJava2 实战知识梳理(13) - 如何使得错误发生时不自动停止订阅关系
    RxJava2 实战知识梳理(14) - 在 token 过期时,刷新过期 token 并重新发起请求
    RxJava2 实战知识梳理(15) - 实现一个简单的 MVP + RxJava + Retrofit 应用


    一、示例

    1.1 应用场景

    今天,我们介绍一种新的场景,轮询操作。也就是说,我们会尝试间隔一段时间就向服务器发起一次请求,在使用RxJava之前,该需求的实现一般有两种方式:

    • 通过Handler发送延时消息,在handleMessage中请求服务器之后,再次发送一个延时消息,直到达到循环次数为止。
    • 使用Java提供的定时器Timer

    我们尝试使用RxJava2提供的操作符来实现这一需求,这里演示两种方式的轮询,并将单次访问的次数限制在5次:

    • 固定时延:使用intervalRange操作符,每间隔3s执行一次任务。
    • 变长时延:使用repeatWhen操作符实现,第一次执行完任务后,等待4s再执行第二次任务,在第二次任务执行完成后,等待5s,依次递增。

    2.2 示例

    public class PollingActivity extends AppCompatActivity {
    
        private static final String TAG = PollingActivity.class.getSimpleName();
    
        private TextView mTvSimple;
        private TextView mTvAdvance;
        private CompositeDisposable mCompositeDisposable;
    
        @Override
        protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.activity_polling);
            mTvSimple = (TextView) findViewById(R.id.tv_simple);
            mTvSimple.setOnClickListener(new View.OnClickListener() {
    
                @Override
                public void onClick(View v) {
                    startSimplePolling();
                }
    
            });
            mTvAdvance = (TextView) findViewById(R.id.tv_advance);
            mTvAdvance.setOnClickListener(new View.OnClickListener() {
    
                @Override
                public void onClick(View v) {
                    startAdvancePolling();
                }
    
            });
            mCompositeDisposable = new CompositeDisposable();
        }
    
        private void startSimplePolling() {
            Log.d(TAG, "startSimplePolling");
            Observable<Long> observable = Observable.intervalRange(0, 5, 0, 3000, TimeUnit.MILLISECONDS).take(5).doOnNext(new Consumer<Long>() {
    
                @Override
                public void accept(Long aLong) throws Exception {
                    doWork(); //这里使用了doOnNext,因此DisposableObserver的onNext要等到该方法执行完才会回调。
                }
    
            });
            DisposableObserver<Long> disposableObserver = getDisposableObserver();
            observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
            mCompositeDisposable.add(disposableObserver);
        }
    
        private void startAdvancePolling() {
            Log.d(TAG, "startAdvancePolling click");
            Observable<Long> observable = Observable.just(0L).doOnComplete(new Action() {
    
                @Override
                public void run() throws Exception {
                    doWork();
                }
    
            }).repeatWhen(new Function<Observable<Object>, ObservableSource<Long>>() {
    
                private long mRepeatCount;
    
                @Override
                public ObservableSource<Long> apply(Observable<Object> objectObservable) throws Exception {
                    //必须作出反应,这里是通过flatMap操作符。
                    return objectObservable.flatMap(new Function<Object, ObservableSource<Long>>() {
    
                        @Override
                        public ObservableSource<Long> apply(Object o) throws Exception {
                            if (++mRepeatCount > 4) {
                                //return Observable.empty(); //发送onComplete消息,无法触发下游的onComplete回调。
                                return Observable.error(new Throwable("Polling work finished")); //发送onError消息,可以触发下游的onError回调。
                            }
                            Log.d(TAG, "startAdvancePolling apply");
                            return Observable.timer(3000 + mRepeatCount * 1000, TimeUnit.MILLISECONDS);
                        }
    
                    });
                }
    
            });
            DisposableObserver<Long> disposableObserver = getDisposableObserver();
            observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
            mCompositeDisposable.add(disposableObserver);
        }
    
        private DisposableObserver<Long> getDisposableObserver() {
    
            return new DisposableObserver<Long>() {
    
                @Override
                public void onNext(Long aLong) {}
    
                @Override
                public void onError(Throwable throwable) {
                    Log.d(TAG, "DisposableObserver onError, threadId=" + Thread.currentThread().getId() + ",reason=" + throwable.getMessage());
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "DisposableObserver onComplete, threadId=" + Thread.currentThread().getId());
                }
            };
        }
    
        private void doWork() {
            long workTime = (long) (Math.random() * 500) + 500;
            try {
                Log.d(TAG, "doWork start,  threadId=" + Thread.currentThread().getId());
                Thread.sleep(workTime);
                Log.d(TAG, "doWork finished");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        protected void onDestroy() {
            super.onDestroy();
            mCompositeDisposable.clear();
        }
    }
    

    startSimplePolling对应于固定时延轮询:


    startAdvancePolling对应于变长时延轮询:

    三、示例解析

    下面,就让我们一起来分析一下上面这两个例子中涉及到的知识点。

    3.1 intervalRange & doOnNext 实现固定时延轮询

    对于固定时延轮询的需求,采用的是intervalRange的方式来实现,它是一个创建型操作符,该Observable第一次先发射一个特定的数据,之后间隔一段时间再发送一次,它是intervalrange的结合体,这两个操作符的原理图为:

    interval 原理图
    range 原理图
    该操作符的优势在于:
    • interval相比,它可以指定第一个发送数据项的时延、指定发送数据项的个数。
    • range相比,它可以指定两项数据之间发送的时延。

    intervalRange的接收参数的含义为:

    • start:发送数据的起始值,为Long型。
    • count:总共发送多少项数据。
    • initialDelay:发送第一个数据项时的起始时延。
    • period:两项数据之间的间隔时间。
    • TimeUnit:时间单位。

    在轮询操作中一般会进行一些耗时的网络请求,因此我们选择在doOnNext进行处理,它会在下游的onNext方法被回调之前调用,但是它的运行线程可以通过subscribeOn指定,下游的运行线程再通过observerOn切换会主线程,通过打印对应的线程ID可以验证结果。

    当要求的数据项都发送完毕之后,最后会回调onComplete方法。

    3.2 repeatWhen 实现变长时延轮询

    3.2.1 使用 repeatWhen 实现重订阅

    之所以可以通过repeatWhen来实现轮询,是因为它为我们提供了重订阅的功能,而重订阅有两点要素:

    • 上游告诉我们一次订阅已经完成,这就需要上游回调onComplete函数。
    • 我们告诉上游是否需要重订阅,通过repeatWhenFunction函数所返回的Observable确定,如果该Observable发送了onComplete或者onError则表示不需要重订阅,结束整个流程;否则触发重订阅的操作。

    其原理图如下所示:

    repeatWhen 原理图
    repeatWhen的难点在于如何定义它的Function参数:
    • Function的输入是一个Observable<Object>,输出是一个泛型ObservableSource<?>
    • 如果输出的Observable发送了onComplete或者onError则表示不需要重订阅,结束整个流程;否则触发重订阅的操作。也就是说,它 仅仅是作为一个是否要触发重订阅的通知onNext发送的是什么数据并不重要。
    • 对于每一次订阅的数据流 Function 函数只会回调一次,并且是在onComplete的时候触发,它不会收到任何的onNext事件。
    • Function函数中,必须对输入的 Observable<Object>进行处理,这里我们使用的是flatMap操作符接收上游的数据,对于flatMap的解释,大家可以参考 RxJava2 实战知识梳理(4) - 结合 Retrofit 请求新闻资讯

    而当我们不需要重订阅时,有两种方式:

    • 返回Observable.empty(),发送onComplete消息,但是DisposableObserver并不会回调onComplete
    • 返回Observable.error(new Throwable("Polling work finished"))DisposableObserveronError会被回调,并接受传过去的错误信息。

    3.2.2 使用 Timer 实现两次订阅之间的时延

    以上就是对于repeatWhen的解释,与repeatWhen相类似的还有retryWhen操作符,这个我们在下一篇文章中再介绍,接下来,我们看一下如何实现两次事件的时延。

    前面我们分析过,重订阅触发的时间是在返回的ObservableSource发送了onNext事件之后,那么我们通过该ObservableSource延迟发送一个事件就可以实现相应的需求,这里使用的是time操作符,它的原理图如下所示,也就是,在订阅完成后,等待指定的时间它才会发送消息。

    timer 原理图

    3.2.3 使用 doOnComplete 完成轮询的耗时操作

    由于在订阅完成时会发送onComplete消息,那么我们就可以在doOnComplete中进行轮询所要进行的具体操作,它所运行的线程通过subscribeOn指定。


    更多文章,欢迎访问我的 Android 知识梳理系列:

    相关文章

      网友评论

      • 5db78a623301:好像不能从下游接受到的onNext里获取请求到的服务器指定轮询时间,然后再来通过repeatwhen来控制下次延时间隔
      • 无言_9b85:感谢大佬
      • 贝壳里的屋子:这个系列很赞,支持
        泽毛:@贝壳里的屋子 😁谢谢
      • 请叫我田胖子:大神! 给你跪了,分得这么的清楚,整理得这么的详尽,好文,我打算把你的文章统统拿来看看
        泽毛:@请叫我田胖子 😁谢谢你的支持,有问题随时讨论
      • 哈喽jv:大佬, 被观察者 可以暂停发送事件吗?
      • 烧伤的火柴:simple方式不需要使用take操作,intervalRange已经指定你要取值的次数了。
      • 海上漂泊的码农:大佬,为什么你的变长时延轮询,里面注释写到//发送onComplete消息,无法触发下游的onComplete回调。//发送onError消息,可以触发下游的onError回调。请问下这是为什么。
        泽毛:@贝壳里的屋子 👍偶遇大神,改天我也去看看
        贝壳里的屋子:@泽毛 并不是这个原因,而是因为repeatWhen使用的是flatMap操作符,flatMap变换后得到的每个子数据流中的completed事件并不会加入到合并后的事件流中,只有flatMap的源事件流中的completed事件会加入到合并后的事件流中。这里如果返回Observable.Empty(),相当于是子事件流中的completed事件,所以不会触发最终的下游的onComplete回调。但是error事件不同,任何一个flatMap的子数据流中的error都会中断最终的合并事件流并且被下游接收到。
        泽毛:@海上漂泊的码农 现象是这样的,具体的源码没有看,猜想是因为 repeatWhen 本来就是由 onComplete 触发的,所以它不能再去触发 onComplete 事件了。
      • Kent_Lee:大佬写的很不错。有点问题问一下大佬,repeatWhen 怎么优雅的获取最后一次OnNext的数据,还有new Function 的apply方法传递的参数不知道有没有什么意义呢?
        泽毛:@Kent_Lee 如果你那边有需要用到这方面的具体业务的场景可以和我说一下,咱们一起讨论讨论,看看有没有好的方法来实现。
        泽毛:@Kent_Lee 在 reapeatWhen 里面是获取不到 onNext 发射的数据的,因为它只有在上游发送了 onComplete 才会触发。看了下源码,apply(Observable<Object> objectObservable) 中的 objectObservable 其实就是一个发射 0 的 Observable,和上游发送了什么并没有关系。

      本文标题:RxJava2 实战知识梳理(5) - 简单及进阶的轮询操作

      本文链接:https://www.haomeiwen.com/subject/lmrndxtx.html