基于RxJava-1源码扩展的轮询器

作者: 木子v大可 | 来源:发表于2017-09-16 10:58 被阅读157次

    本文章分两大章:

    1:基于RxJava-1源码扩展的轮询器

    2:NoHttpRxUtils框架与扩展轮询的结合

    基于RxJava-1源码扩展的轮询器

    由于最近NoHttpRxUtils框架使用者反馈NoHttpRxUtils框架怎么没有轮询请求的功能呢?正好这几天有空闲,我就想去实现这个功能。前期我想要不要就基于Thread+Runnable+Handler或者Handler+Timer+TimerTask去实现呢?沉思一会....就把这两种方式给否决了。因为这两种方式实现的轮询"对于我来说"可控性不高而且产生的代码逻辑繁琐。更主要的一点是,NoHttpRxUtils框架是基于RxJava-1的封装。那么肯定要用RxJava-1的轮询去实现。所有我就去看看RxJava-1的轮询。

    RxJava-1的原生轮询器

        //创建轮询器
            Observable.interval(3000, 3000, TimeUnit.MILLISECONDS)
                    //数据处理行动监听器--->>此处不可以线程切换
                    .map(new Func1<Long, String>() {
                        @Override
                        public String call(Long aLong) {
                            rxJavaNumber++;
                            //在此方法里面做数据处理操作。此方法是子线程执行的。
                            return "处理完毕的数据";
                        }
                    })
                    //设置被订阅者事件执行线程-->对.map()"数据处理行动监听器"线程无影响
                    .subscribeOn(AndroidSchedulers.mainThread())
                    //轮询拦截器
                    .takeUntil(new Func1<String, Boolean>() {
                        @Override
                        public Boolean call(String s) {
                            //执行10次自动停止轮询
                            if (rxJavaNumber >= 10) {
                                return true;
                            } else {
                                return false;
                            }
                        }
                    })
                    //订阅者事件处理器线程切换
                    .observeOn(AndroidSchedulers.mainThread())
                    //订阅者事件处理监听器
                    .subscribe(new Action1<String>() {
                        @Override
                        public void call(String transferValue) {
                            String toString = mRxjavaPollText.getText().toString();
                            mRxjavaPollText.setText(transferValue + "\n\n" + toString);
                        }
                    });
    

    运行证实.subscribeOn(AndroidSchedulers.mainThread())的方法设置无法对.map()方法里面的实现类的线程控制,而.map()方法里面实现类的call()方法却执行在非UI线程中。所以导致RxJava的 Observable.interval(3000, 3000, TimeUnit.MILLISECONDS)无法像Observable.create(new Observable.OnSubscribe<T>())那样对"被观察者行为监听"和"观察者事件处理"随意切换线程,而且在业务层次上面好像也有点混乱(以上观点也许会由于我对rxjava不够深入的了解产生误差,请大家谅解)。 所以我就对rxjava-1轮询源码进行研究(发现rxjava的线程调度器真心的牛逼),然后我根据rxjava轮询源码去扩展出对应的轮询

    基于RxJava-1源码扩展的轮询器

    rxjava-1轮询源码参考类

    public final class OnSubscribeTimerPeriodically implements OnSubscribe<Long> {
        final long initialDelay;
        final long period;
        final TimeUnit unit;
        final Scheduler scheduler;
    
        public OnSubscribeTimerPeriodically(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
            this.initialDelay = initialDelay;
            this.period = period;
            this.unit = unit;
            this.scheduler = scheduler;
        }
    
        @Override
        public void call(final Subscriber<? super Long> child) {
          //由传入的线程调度控制Action0实现类call()方法执行在那个线程中
            final Worker worker = scheduler.createWorker();
            child.add(worker);
            worker.schedulePeriodically(new Action0() {
                long counter;
                @Override
                public void call() {
                  //轮询时,此方法会触发
                    try {
                        child.onNext(counter++);
                    } catch (Throwable e) {
                        try {
                            worker.unsubscribe();
                        } finally {
                            Exceptions.throwOrReport(e, child);
                        }
                    }
                }
                
            }, initialDelay, period, unit);
        }
    }
    

    基于参考RxJava-1"OnSubscribeTimerPeriodically"的源码实现的轮询类

    public final class OnSubscribeTimerPeriodically<V, T> implements Observable.OnSubscribe<T> {
        /**
         * 初始化加载延迟
         */
        final long initialDelay;
        /**
         * 轮询间隔时间
         */
        final long period;
        /**
         * 时间单位
         */
        final TimeUnit unit;
        /**
         * 订阅者线程线路
         */
        final Scheduler scheduler;
        /**
         * 可观察者事件监听器
         */
        private OnObserverEventListener<V, T> eventListener;
        /**
         * 可观察者线程线路-事件处理默认在子线程
         */
        private Scheduler eventScheduler = Schedulers.io();
        /**
         * 传输给被观察者接受的对象
         */
        private V transferValue;
    
        public OnSubscribeTimerPeriodically(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
            this.initialDelay = initialDelay;
            this.period = period;
            this.unit = unit;
            this.scheduler = scheduler;
        }
    
        /**
         * 赋值传输给被观察者接受的对象
         *
         * @param transferValue 被观察者接受的对象
         */
        public void setTransferValue(V transferValue) {
            this.transferValue = transferValue;
        }
    
        /**
         * 设置事件执行线程线路
         *
         * @param eventScheduler 线程线路
         */
        public void setEventScheduler(Scheduler eventScheduler) {
            if (null != eventScheduler) {
                this.eventScheduler = eventScheduler;
            }
        }
    
        /**
         * 设置可观察者事件监听器
         *
         * @param eventListener 可观察者事件监听器
         */
        public void setOnObserverEventListener(OnObserverEventListener<V, T> eventListener) {
            this.eventListener = eventListener;
        }
    
        @Override
        public void call(final Subscriber<? super T> subscriber) {
            //子线程调度器执行
            final Scheduler.Worker worker = Schedulers.io().createWorker();
            subscriber.add(worker);
            //开启子线程调度器定时器执行
            worker.schedulePeriodically(new Action0() {
                @Override
                public void call() {
                    try {
                        synchronized (OnSubscribeTimerPeriodically.this) {
                            //被观察事件处理的调度器
                            final Scheduler.Worker workerEvent = eventScheduler.createWorker();
                            workerEvent.schedule(new Action0() {
                                @Override
                                public void call() {
                                    if (null != eventListener) {
                                        try {
                                            final T observerEvent = eventListener.onObserverEvent(transferValue);
    
                                            //观察者事件处理的调度器
                                            final Scheduler.Worker workerDispose = scheduler.createWorker();
                                            workerDispose.schedule(new Action0() {
                                                @Override
                                                public void call() {
                                                    try {
                                                        subscriber.onNext(observerEvent);
                                                        awake();
                                                    } catch (Throwable e) {
                                                        try {
                                                            worker.unsubscribe();
                                                            workerEvent.unsubscribe();
                                                            workerDispose.unsubscribe();
                                                            awake();
                                                        } finally {
                                                            Exceptions.throwOrReport(e, subscriber);
                                                        }
                                                    }
                                                }
                                            });
                                        } catch (Throwable e) {
                                            try {
                                                worker.unsubscribe();
                                                workerEvent.unsubscribe();
                                                awake();
                                            } finally {
                                                Exceptions.throwOrReport(e, subscriber);
                                            }
                                        }
                                    }
                                }
                            });
                            //当前线程休眠,等待"被观察者"事件逻辑处理完毕
                            OnSubscribeTimerPeriodically.this.wait();
                        }
                    } catch (Throwable e) {
                        try {
                            worker.unsubscribe();
                        } finally {
                            Exceptions.throwOrReport(e, subscriber);
                        }
                    }
                }
    
            }, initialDelay, period, unit);
        }
    
        /**
         * 观察者已经根据被观察者的动作做出相应处理后唤醒调度器定时器继续往下走
         */
        private void awake() {
            synchronized (OnSubscribeTimerPeriodically.this) {
                OnSubscribeTimerPeriodically.this.notify();
            }
        }
    }
    

    OnSubscribeTimerPeriodically执行在非UI线程中,能够通过设置来决定"被观察者"和"观察者"行为事件执行线程

    通过继承RxJava-1"Observable<T>"来实现调用类

    public class ObservableExpand<V, T> extends Observable<T> {
    
        final OnSubscribeTimerPeriodically<V, T> onSubscribeTimerPeriodically;
    
        /**
         * Creates an Observable with a Function to execute when it is subscribed to.
         * <p>
         * <em>Note:</em> Use {@link #create(OnSubscribe)} to create an Observable, instead of this constructor,
         * unless you specifically have a need for inheritance.
         *
         * @param f {@link OnSubscribe} to be executed when {@link #subscribe(Subscriber)} is called
         */
        protected ObservableExpand(OnSubscribe<T> f, OnSubscribeTimerPeriodically<V, T> onSubscribe1) {
            super(f);
            onSubscribeTimerPeriodically = onSubscribe1;
        }
    
    
        /**
         * 轮询间隔方法
         *
         * @param initialDelay          初始化加载延迟
         * @param period                轮询间隔时间
         * @param unit                  时间单位
         * @param observerEventListener 可观察者事件监听器
         * @param <T>
         * @return 订阅间隔计时Builder
         */
        public static <V, T> Builder<V, T> intervalPolling(long initialDelay, long period, TimeUnit unit, OnObserverEventListener<V, T> observerEventListener) {
    
            return new Builder<>(initialDelay, period, unit, observerEventListener);
        }
    
        /**
         * 订阅间隔计时Builder
         *
         * @param <T>
         */
        public static class Builder<V, T> {
            /**
             * 初始化加载延迟
             */
            private long initialDelay;
            /**
             * 轮询间隔时间
             */
            private long period;
            /**
             * 时间单位
             */
            private TimeUnit unit;
            /**
             * 可观察者事件监听器
             */
            private OnObserverEventListener<V, T> observerEventListener;
    
            public Builder(long initialDelay, long period, TimeUnit unit, OnObserverEventListener<V, T> observerEventListener) {
                this.initialDelay = initialDelay;
                this.period = period;
                this.unit = unit;
                this.observerEventListener = observerEventListener;
            }
    
            /**
             * 设置可观察者监听器线程线路
             *
             * @param eventScheduler 线程线路
             * @param transferValue  待处理或者待传输的对象
             * @return
             */
            public ObservableExpand<V, T> subscribeOn(Scheduler eventScheduler, V transferValue) {
    
                OnSubscribeTimerPeriodically<V, T> timerPeriodically = new OnSubscribeTimerPeriodically<>(initialDelay, period, unit, Schedulers.computation());
                timerPeriodically.setOnObserverEventListener(observerEventListener);
                timerPeriodically.setTransferValue(transferValue);
                timerPeriodically.setEventScheduler(eventScheduler);
                return new ObservableExpand<>(RxJavaHooks.onCreate(timerPeriodically), timerPeriodically);
            }
        }
    }
    

    ObservableExpand继承与Observable<T>,并采用建筑模式去创建OnSubscribeTimerPeriodically

    如何调用扩展的轮询器?

     ObservableExpand.intervalPolling(3000, 3000, TimeUnit.MILLISECONDS,
                    //被观察者行为监听器 ->正在处理
                    new OnObserverEventListener<String, String>() {
                        @Override
                        public String onObserverEvent(String transferValue) {
                            try {
                                //模拟耗时操作
                                Thread.sleep(3 * 1000);
                                expandNumber++;
                                transferValue = "扩展轮询次数:" + expandNumber + "\n传输进来的值:" + transferValue;
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            //此处可以放置你要处理的数据或者逻辑。
                            return transferValue;
                        }
                    })
                    //指定被观察者行为监听器执行线程。传输对象进被观察者行为监听器
                    .subscribeOn(Schedulers.io(), transitionList)
                    //设置拦截器
                    .takeUntil(new Func1<String, Boolean>() {
                        @Override
                        public Boolean call(String untilData) {
                            //执行10次自动停止轮询,也可根据untilData对象值去判断是否停止轮询
                            if (expandNumber >= 10) {
                                return true;
                            } else {
                                return false;
                            }
                        }
                    })
                    //指定观察者触发监听器执行线程
                    .observeOn(AndroidSchedulers.mainThread())
                    //观察者触发监听器
                    .subscribe(new Action1<String>() {
                        @Override
                        public void call(String s) {
                            String toString = mExpandPollText.getText().toString();
                            mExpandPollText.setText(s + "\n\n" + toString);
                        }
                    });
    

    扩展的轮询器的调用就是这么rxjava

    点击进入扩展轮询github链接

    NoHttpRxUtils框架与扩展轮询的结合

    NoHttpRxUtils框架是what?点击查看NoHttpRxUtils框架博客点击进入NoHttpRxUtils框架github

    NoHttpRxUtils轮询请求,采用链式调用

    //获取请求对象
    RxNoHttpUtils.rxNoHttpRequest()
    
    //此处省略NoHttp网络请求设置参数的方法
    ...
    
    //设置当前轮询请求Sign
    .setSign(new Object())
    
    //创建轮询请求对象,并指定响应转换类型和请求成功或者失败回调接口
    .builderPoll(Objects.class,new OnIsRequestListener<T>)
    
    //设置初始化加载延迟
    .setInitialDelay(3 * 1000)
    
    //设置轮询间隔时间-默认3秒
    .setPeriod(5 * 1000)
    
    //设置被观察者产生的行为事件监听器-
    //(如果此处实现被观察者产生的行为事件监听器,那么框架内部就不去维护此轮询请求,必须实现轮询拦截器接口去维护此轮询什么时候停止。)
    .setOnObserverEventListener(new OnObserverEventListener<RestRequest<T>, RxInformationModel<T>>(){
         @Override
      public RxInformationModel<T> onObserverEvent(RestRequest<T> transferValue) {
        // RxInformationModel对象方法介绍
        //getData()=获取请求数据
        //setData(T data)=赋值请求数据
        //setException(boolean exception)=赋值是否是异常状态
        //isException()=获取是否异常状态
        //setThrowable(Throwable throwable)=赋值异常类
        //getThrowable()=获取异常类
        //setStop(boolean stop)=赋值是否停止轮询状态
        //isStop()=获取是否轮询状态
    
        //RxInformationModel 此对象需要new 出来.
        //在此方法中可以换成自己钟意的网络框架去请求,如果上面设置网络请求参数,除了body其它的都能从RestRequest里面取得。
       return informationModel;
      }
    })
    
    // 设置设置数据拦截监听对象
    .setBooleanFunc1(new Func1<RxInformationModel<T>, Boolean>() {
        @Override 
       public Boolean call(RxInformationModel<T> stringRxInformationModel) {
      //在此方法里面可以根据RxInformationModel.getData()获取请求的数据,然后根据请求的数据来决定是否停止轮询
        return stringRxInformationModel.isStop();
       } })
    
    //设置观察者根据被观察产生的行为做出相应处理监听器
    //如果实现了此接口,那么builderPoll中实现的OnIsRequestListener将无效。
    .setRxInformationModelAction1(new Action1<RxInformationModel<T>>() {
        @Override 
       public void call(RxInformationModel<T> stringRxInformationModel) {
        //在此方法里面根据RxInformationModel中的数据做出相应动作
       }
    })
    
    //转换成轮询请求类
    .switchPoll()
    
    //开始请求
    .requestRxNoHttp();
    

    NoHttpRxUtils轮询请求调用中所有的泛型<T>都相互关联的。指定一个泛型类型,其它泛型都必须是此类型

    如何取消轮询请求

    //单个取消Sign对应的轮询请求
    RxNoHttpUtils.cancelPoll(Sign));
    
    //取消批量Sign对应的轮询请求
    RxNoHttpUtils.cancelPoll(Sign[]);
    
    //取消所有的轮询请求
    // RxNoHttpUtils.cancelPollAll(); 
    

    取消轮询请求必须要求调用此方法.setSign(new Object())设置方可有效。

    相关文章

      网友评论

        本文标题:基于RxJava-1源码扩展的轮询器

        本文链接:https://www.haomeiwen.com/subject/buassxtx.html