RxJava2.x实现定时器

作者: 奔跑的佩恩 | 来源:发表于2017-06-29 16:30 被阅读1589次

    在之前我已经写过一篇关于Rxjava1.x的定时器功能,想了解的同学可以移步
    Rxjava1.x实现定时器

    前言

    由于现在网络层已经升级到RxJava2.x相关的了,所以需要做些调整。虽然RxJava1.x和RxJava2.x同属RxJava系列,但由于RxJava2.x部分代码的重写,导致RxJava2.x与RxJava1.x已是两个不同的版本,RxJava2.x在性能上更优,尤其在背压支持上。当然,此篇重点不在Rx版本上的区别,有兴趣的同学可以自行研究。当然,2.x之于1.x的区别之一是2.x中已经没有 Subscription mSubscription, Observable.create()等方法也不再返回 Subscription对像,取而代之的是在 new Observer()方法中会多返回一个
    onSubscribe(@NonNull Disposable disposable) 方法,而Disposable disposable即时我们取消订阅要用到的对象,其方法有:

    package io.reactivex.disposables;
    
    /**
     * Represents a disposable resource.
     */
    public interface Disposable {
        /**
         * Dispose the resource, the operation should be idempotent.
         */
        void dispose();
    
        /**
         * Returns true if this resource has been disposed.
         * @return true if this resource has been disposed
         */
        boolean isDisposed();
    }
    

    dispose():取消订阅
    isDisposed():判断订阅是否已经取消

    ok,有了这些铺垫之后,我们进入正题。

    第一步,导入RxJava2.x依赖包
        //rxjava2.x
        compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
        compile 'io.reactivex.rxjava2:rxjava:2.1.0'
    
    第二步,直接上定时器类RxTimerUtil代码
    /**
     * Instruction:Rxjava2.x实现定时器
     * <p>
     * Author:pei
     * Date: 2017/6/29
     * Description:
     */
    
    public class RxTimerUtil {
    
        private static Disposable mDisposable;
    
        /** milliseconds毫秒后执行next操作
         *
         * @param milliseconds
         * @param next
         */
        public static void timer(long milliseconds,final IRxNext next) {
            Observable.timer(milliseconds, TimeUnit.MILLISECONDS)
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Observer<Long>() {
                        @Override
                        public void onSubscribe(@NonNull Disposable disposable) {
                            mDisposable=disposable;
                        }
    
                        @Override
                        public void onNext(@NonNull Long number) {
                            if(next!=null){
                                next.doNext(number);
                            }
                        }
    
                        @Override
                        public void onError(@NonNull Throwable e) {
                            //取消订阅
                            cancel();
                        }
    
                        @Override
                        public void onComplete() {
                            //取消订阅
                            cancel();
                        }
                    });
        }
    
    
        /** 每隔milliseconds毫秒后执行next操作
         *
         * @param milliseconds
         * @param next
         */
        public static void interval(long milliseconds,final IRxNext next){
            Observable.interval(milliseconds, TimeUnit.MILLISECONDS)
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Observer<Long>() {
                        @Override
                        public void onSubscribe(@NonNull Disposable disposable) {
                            mDisposable=disposable;
                        }
    
                        @Override
                        public void onNext(@NonNull Long number) {
                            if(next!=null){
                                next.doNext(number);
                            }
                        }
    
                        @Override
                        public void onError(@NonNull Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
        }
    
    
        /**
         * 取消订阅
         */
        public static void cancel(){
            if(mDisposable!=null&&!mDisposable.isDisposed()){
                mDisposable.dispose();
                LogUtil.e("====定时器取消======");
            }
        }
    
        public interface IRxNext{
            void doNext(long number);
        }
    }
    

    需要注意的是,若你使用的是延时执行一次的话,在调用 timer 后,不需要 cancel,因为我在代码中已经处理好了,
    如果是调用的重复执行的方法,你需要在activity的ondestroy方法中取消订阅,类似如下:

    @Override
        protected void onDestroy(){
            //取消定时器
            RxTimerUtil.cancel();
            LogUtil.e("====cancel====="+ DateUtil.getNowTime());
            super.onDestroy();
        }   
    

    多么简单粗暴,好了,今天就讲到这里吧,谢谢诶。

    相关文章

      网友评论

      • 9a61b2ee4fe2:是引入的Rxjava2 为什么还是会报observeOn(io.reactivex.Schedulers) in Observable cannot be applied to (rx.Scheduler) 错误呢
      • 之歌:你这个工具 要是多个地方同时调用就GG了
      • zwonb:楼主你这个取消订阅后,上游会不会还在发送事件?
        奔跑的佩恩:会发送,但是下游不会处理
      • txwgoogol:你好 我想问一下这个怎么用 :fearful: RxTimerUtil.timer(3, IRxNext); IRxNext这个参数怎么写 :unamused:
        e6d2e445023a:RxTimerUtil.timer(5, new RxTimerUtil.IRxNext(){
        @Override
        public void doNext(long number) {
        Log.d("TTT","-----" + number);
        }
        }); 这样不行,这里直接打印0,
        txwgoogol:@奔跑的佩恩 谢谢
        奔跑的佩恩:@wampyr 是个接口,你直接 把new IRxNext设为参数就好了

      本文标题:RxJava2.x实现定时器

      本文链接:https://www.haomeiwen.com/subject/fuamcxtx.html