美文网首页Android开发程序员技术干货
ScheduledThreadPoolExecutor原理分析

ScheduledThreadPoolExecutor原理分析

作者: wo883721 | 来源:发表于2018-04-16 11:20 被阅读84次

    线程池就是维持几个工作线程,然后从任务队列中获取任务执行。所以要实现延时或者定时执行任务,就要做到以下三点:

    1. 任务要能返回它的延时时间和是否为定时任务。
    2. 任务队列要根据任务的延时时间进行排序。这个我们在上一章DelayedWorkQueue原理分析中已经讲解过了。
    3. 如果是定时任务,任务执行完成之后,还可以再次执行它。

    所以要分析ScheduledThreadPoolExecutor原理关键就是对它的任务类ScheduledFutureTask的分析。

        private class ScheduledFutureTask<V>
                extends FutureTask<V> implements RunnableScheduledFuture<V> {}
    

    FutureTask类我们已经在以前的文章分析过了,而RunnableScheduledFuture接口的作用是什么呢?

    一. RunnableScheduledFuture接口

    我们知道要实现延时或者定时执行任务,任务要能返回它的延时时间,任务是否为定时任务,任务能根据延时时间排序。所以可以想象出RunnableScheduledFuture接口中的方法了。

    public interface Comparable<T> {
        // 比较两个实例的大小
        public int compareTo(T o);
    }
    /**
     * 实现Comparable接口,说明Delayed实例可以进行比较
     */
    public interface Delayed extends Comparable<Delayed> {
    
        /**
         * @return 返回剩余的延时时间
         */
        long getDelay(TimeUnit unit);
    }
    
    /**
     * 实现了Delayed接口,可以返回延时时间以及能够根据延时时间进行比较
     */
    public interface ScheduledFuture<V> extends Delayed, Future<V> {
    }
    
    // 继承自Runnable和Future接口。可以当做Runnable实例使用
    public interface RunnableFuture<V> extends Runnable, Future<V> {
        // 运行任务
        void run();
    }
    
    public interface RunnableScheduledFuture<V> extends 
                     RunnableFuture<V>, ScheduledFuture<V> {
    
        /**
         * 是否为周期定时任务
         * @return 返回true,表示是定时任务
         */
        boolean isPeriodic();
    }
    

    RunnableScheduledFuture接口中,最重要的就是这四个方法:

    1. compareTo(T o): 可以比较任务的延时时间,进行排序用的。
    2. getDelay(TimeUnit unit): 返回任务剩余的延时时间
    3. run(): 运行任务。如果是定时任务,任务完成之后,还可以继续执行。
    4. isPeriodic(): 是否为周期定时任务

    二. 何时创建ScheduledFutureTask任务

    何时创建ScheduledFutureTask任务,就是ScheduledExecutorService接口四个方法会创建ScheduledFutureTask任务实例。

    2.1 创建延时任务

        // 给定的延迟时间delay之后,才会执行任务command
        public ScheduledFuture<?> schedule(Runnable command,
                                           long delay,
                                           TimeUnit unit) {
            if (command == null || unit == null)
                throw new NullPointerException();
            /**
             * 创建一个延时任务ScheduledFutureTask实例。
             * triggerTime方法会根据给定的延时时间delay,返回任务开始的时间。
             * decorateTask方法是让子类能够修饰ScheduledFutureTask任务实例,本类中没做处理。
             */
            RunnableScheduledFuture<?> t = decorateTask(command,
                new ScheduledFutureTask<Void>(command, null,
                                              triggerTime(delay, unit)));
            // 延时执行任务
            delayedExecute(t);
            return t;
        }
    
        // 给定的延迟时间delay之后,才会执行任务callable
        public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                               long delay,
                                               TimeUnit unit) {
            if (callable == null || unit == null)
                throw new NullPointerException();
            /**
             * 创建一个延时任务ScheduledFutureTask实例。
             * triggerTime方法会根据给定的延时时间delay,返回任务开始的时间。
             * decorateTask方法是让子类能够修饰ScheduledFutureTask任务实例,本类中没做处理。
             */
            RunnableScheduledFuture<V> t = decorateTask(callable,
                new ScheduledFutureTask<V>(callable,
                                           triggerTime(delay, unit)));
            // 延时执行任务
            delayedExecute(t);
            return t;
        }
    

    这两个方法的流程是一样的,只不过一个是Runnable类型的任务,一个Callable类型的任务。

    1. 调用triggerTime方法根据给定的延时时间delay,返回任务开始的时间。
    2. 创建一个延时任务ScheduledFutureTask实例。
    3. 调用decorateTask方法是让子类能够修饰ScheduledFutureTask任务实例。
    4. 调用delayedExecute(t)方法延时执行任务。

    2.2 创建延时定时任务

        public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                      long initialDelay,
                                                      long period,
                                                      TimeUnit unit) {
            if (command == null || unit == null)
                throw new NullPointerException();
            if (period <= 0)
                throw new IllegalArgumentException();
            /**
             * 创建一个延时定时任务ScheduledFutureTask实例。
             * triggerTime方法会根据给定的延时时间delay,返回任务开始的时间。
             */
            ScheduledFutureTask<Void> sft =
                new ScheduledFutureTask<Void>(command,
                                              null,
                                              triggerTime(initialDelay, unit),
                                              unit.toNanos(period));
            // decorateTask方法是让子类能够修饰ScheduledFutureTask任务实例,本类中没做处理。
            RunnableScheduledFuture<Void> t = decorateTask(command, sft);
            // outerTask作用就是下一次定时执行的任务,在reExecutePeriodic方法中需要
            sft.outerTask = t;
            // 延时执行任务
            delayedExecute(t);
            return t;
        }
    
        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                         long initialDelay,
                                                         long delay,
                                                         TimeUnit unit) {
            if (command == null || unit == null)
                throw new NullPointerException();
            if (delay <= 0)
                throw new IllegalArgumentException();
            /**
             * 创建一个延时定时任务ScheduledFutureTask实例。
             * triggerTime方法会根据给定的延时时间delay,返回任务开始的时间。
             */
            ScheduledFutureTask<Void> sft =
                new ScheduledFutureTask<Void>(command,
                                              null,
                                              triggerTime(initialDelay, unit),
                                              unit.toNanos(-delay));
            // decorateTask方法是让子类能够修饰ScheduledFutureTask任务实例,本类中没做处理。
            RunnableScheduledFuture<Void> t = decorateTask(command, sft);
            // outerTask作用就是下一次定时执行的任务,在reExecutePeriodic方法中需要
            sft.outerTask = t;
            // 延时执行任务
            delayedExecute(t);
            return t;
        }
    

    我们知道scheduleAtFixedRate方法是固定周期时间去执行任务,而scheduleWithFixedDelay方法是在任务完成之后,延时delay时间再去执行任务。
    但是我们发现这两个方法几乎是一模一样的,唯一不同地就是创建延时定时任务ScheduledFutureTask实例时,一个传递的是正数period,一个传递的是负数-delay。

    1. 调用triggerTime方法根据给定的延时时间delay,返回任务开始的时间。
    2. 创建一个延时定时任务ScheduledFutureTask实例。
    3. 调用decorateTask方法是让子类能够修饰ScheduledFutureTask任务实例。
    4. 设置outerTask,作用就是下一次定时执行的任务,在reExecutePeriodic方法中需要。
    5. 延时执行任务。

    2.3 triggerTime方法

        /**
         * 返回延时任务开始的时间
         */
        private long triggerTime(long delay, TimeUnit unit) {
            return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
        }
    
        /**
         * 返回延时任务开始的时间。利用当前时间加上给定的延时时间
         */
        long triggerTime(long delay) {
            return now() +
                ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
        }
    

    2.4 decorateTask方法

        /**
         * 让子类能够修饰或者替换任务task。
         * 这里只是简单地返回task
         */
        protected <V> RunnableScheduledFuture<V> decorateTask(
            Runnable runnable, RunnableScheduledFuture<V> task) {
            return task;
        }
    

    2.5 delayedExecute方法

        private void delayedExecute(RunnableScheduledFuture<?> task) {
            // 如果线程池不是RUNNING状态,那么调用reject(task)方法,
            // 拒绝执行任务task
            if (isShutdown())
                reject(task);
            else {
                // 将任务添加到任务队列中,会根据任务的延时时间进行排序
                super.getQueue().add(task);
                /**
                 * 如果线程池不是RUNNING状态,那么就判断能不能在当前状态下运行,
                 * 主要就是能不能在SHUTDOWN状态下运行。
                 * 如果不能在当前状态下运行,那么就调用remove方法,
                 * 从任务队列中移除刚刚添加的任务task。
                 *
                 * 只有移除成功了,才可以调用task.cancel(false)方法取消任务,
                 * 否则这个延时任务task都还要执行。
                 */
                if (isShutdown() &&
                    !canRunInCurrentRunState(task.isPeriodic()) &&
                    remove(task))
                    task.cancel(false);
                else
                    // 预先启动工作线程,确保线程池中有工作线程。
                    ensurePrestart();
            }
        }
    

    这个方法的主要作用就是将任务添加到任务队列中,因为这里任务队列是优先级队列DelayedWorkQueue,它会根据任务的延时时间进行排序。

    1. 如果线程池不是RUNNING状态,不能执行延时任务task,那么调用reject(task)方法,拒绝执行任务task。
    2. 将任务添加到任务队列中,会根据任务的延时时间进行排序。
    3. 因为是多线程并发环境,就必须判断在添加任务的过程中,线程池状态是否被别的线程更改了,那么就可能要取消任务了。
    4. 将任务添加到任务队列后,还要确保线程池中有工作线程,不然任务也不为执行。所以ensurePrestart()方法预先启动工作线程,确保线程池中有工作线程。

    与ThreadPoolExecutor类中execute方法执行任务方法不同:

    1. 因为是延时任务task,所以不能将任务当成工作线程第一个任务,只能将任务添加到任务队列中,等待着工作线程来执行。
    2. 任务队列DelayedWorkQueue容量几乎是无限的,所以也不需要最大池。除非核心池数量是0,那么必须创建一个工作线程来运行任务,否则线程池的线程数不可能超过核心池数量。
    3. 当线程池状态被别的线程改变时,取消任务的判断条件不同。当线程池状态不是RUNNING,而且不能在当前状态下运行,那么就调用remove方法,移除刚刚添加的任务task。只有移除任务成功,才可以调用task.cancel(false)方法取消任务,否则这个延时任务task都还要执行。

    2.6 canRunInCurrentRunState方法

        // 判断能不能在当前状态下运行
        boolean canRunInCurrentRunState(boolean periodic) {
            // 调用父类ThreadPoolExecutor中的isRunningOrShutdown方法
            return isRunningOrShutdown(periodic ?
                                       continueExistingPeriodicTasksAfterShutdown :
                                       executeExistingDelayedTasksAfterShutdown);
        }
    
    
        /**
         * 是ThreadPoolExecutor中的方法
         * @param shutdownOK 表示线程池能在SHUTDOWN状态下运行
         */
        final boolean isRunningOrShutdown(boolean shutdownOK) {
            // 线程状态
            int rs = runStateOf(ctl.get());
            // 当线程是RUNNING状态,或者是SHUTDOWN状态且shutdownOK也为true
            return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
        }
    

    canRunInCurrentRunState方法就是返回线程池能不能在当前状态下运行。

    注: continueExistingPeriodicTasksAfterShutdown与executeExistingDelayedTasksAfterShutdown都是提供给外部设置。

    2.7 ensurePrestart方法

        void ensurePrestart() {
            // 线程池中的线程数量
            int wc = workerCountOf(ctl.get());
            // 如果小于核心池数量,就创建新的工作线程
            if (wc < corePoolSize)
                addWorker(null, true);
            // 说明corePoolSize数量是0,必须创建一个工作线程来执行任务
            else if (wc == 0)
                addWorker(null, false);
        }
    

    ensurePrestart方法作用:预先启动工作线程,确保线程池中有工作线程。

    三. ScheduledFutureTask类

    ScheduledFutureTask是一个延时定时任务,它可以返回任务剩余延时时间,可以被周期性地执行。

    3.1 重要成员属性

            /** 是一个序列,每次创建任务的时候,都会自增。 */
            private final long sequenceNumber;
    
            /** 任务能够开始执行的时间 */
            private long time;
    
            /**
             * 任务周期执行的时间
             * 0表示不是一个周期定时任务
             * 正数表示固定周期时间去执行任务
             * 负数表示任务完成之后,延时period时间再去执行任务
             */
            private final long period;
    
            /** 表示再次执行的任务,在reExecutePeriodic中调用 */
            RunnableScheduledFuture<V> outerTask = this;
    
            /**
             * 表示在任务队列中的索引位置,用来支持快速从队列中删除任务。
             */
            int heapIndex;
    

    属性说明:

    1. sequenceNumber: 是一个序列,每次创建任务的时候,都会自增。
    2. time: 任务能够开始执行的时间。
    3. period: 任务周期执行的时间。0表示不是一个周期定时任务。
    4. outerTask: 表示再次执行的任务,在reExecutePeriodic中调用
    5. heapIndex: 表示在任务队列中的索引位置,用来支持快速从队列中删除任务。

    3.2 构造函数

    3.2.1 创建延时任务

            /**
             * 创建延时任务
             */
            ScheduledFutureTask(Runnable r, V result, long ns) {
                // 调用父类的方法
                super(r, result);
                // 任务开始的时间
                this.time = ns;
                // period是0,不是一个周期定时任务
                this.period = 0;
                // 每次创建任务的时候,sequenceNumber都会自增
                this.sequenceNumber = sequencer.getAndIncrement();
            }
    
             /**
             * 创建延时任务
             */
            ScheduledFutureTask(Callable<V> callable, long ns) {
                // 调用父类的方法
                super(callable);
                // 任务开始的时间
                this.time = ns;
                // period是0,不是一个周期定时任务
                this.period = 0;
                // 每次创建任务的时候,sequenceNumber都会自增
                this.sequenceNumber = sequencer.getAndIncrement();
            }
    

    3.2.2 创建延时定时任务

            /**
             * 创建延时定时任务
             */
            ScheduledFutureTask(Runnable r, V result, long ns, long period) {
                // 调用父类的方法
                super(r, result);
                // 任务开始的时间
                this.time = ns;
                // 周期定时时间
                this.period = period;
                // 每次创建任务的时候,sequenceNumber都会自增
                this.sequenceNumber = sequencer.getAndIncrement();
            }
    

    3.3 运行任务run方法

            public void run() {
                // 是否是周期任务
                boolean periodic = isPeriodic();
                // 如果不能在当前状态下运行,那么就要取消任务
                if (!canRunInCurrentRunState(periodic))
                    cancel(false);
                // 如果只是延时任务,那么就调用run方法,运行任务。
                else if (!periodic)
                    ScheduledFutureTask.super.run();
                // 如果是周期定时任务,调用runAndReset方法,运行任务。
                // 这个方法不会改变任务的状态,所以可以反复执行。
                else if (ScheduledFutureTask.super.runAndReset()) {
                    // 设置周期任务下一次执行的开始时间time
                    setNextRunTime();
                    // 重新执行任务outerTask
                    reExecutePeriodic(outerTask);
                }
            }
    

    这个方法会在ThreadPoolExecutor的runWorker方法中调用,而且这个方法调用,说明肯定已经到了任务的开始时间time了。

    1. 先判断当前线程状态能不能运行任务,如果不能,就调用cancel()方法取消本任务。
    2. 如果任务只是一个延时任务,那么调用父类的run()运行任务,改变任务的状态,表示任务已经运行完成了。
    3. 如果任务只是一个周期定时任务,那么就任务必须能够反复执行,那么就不能调用run()方法,它会改变任务的状态。而是调用runAndReset()方法,只是简单地运行任务,而不会改变任务状态。
    4. 设置周期任务下一次执行的开始时间time,并重新执行任务。

    setNextRunTime设置任务下一次执行的开始时间time的方法:

            /**
             * 设置任务下一次执行的开始时间time
             */
            private void setNextRunTime() {
                // 周期时间
                long p = period;
                if (p > 0)
                    time += p;
                else
                    time = triggerTime(-p);
            }
    

    我们发现当p大于0的时候,调用time += p,那么就可能出现计算后的time可能小于当前时间,因为任务执行时间超过了周期时间p,所以将这个任务添加到任务队列中,它就会立即执行。
    而当p小于0的时候,调用time = triggerTime(-p)方法,就是在当前时间上,再加上-p的延时时间,所以这个任务添加到任务队列中,必须延时-p时间后,才能执行。

    reExecutePeriodic重新执行任务的方法:

        /**
         * 重新执行任务task
         */
        void reExecutePeriodic(RunnableScheduledFuture<?> task) {
            // 判断当前线程池状态能不能运行任务
            if (canRunInCurrentRunState(true)) {
                // 将任务添加到任务队列,会根据任务延时时间进行排序
                super.getQueue().add(task);
                // 如果线程池状态改变了,当前状态不能运行任务,那么就尝试移除任务,
                // 移除成功,就取消任务。
                if (!canRunInCurrentRunState(true) && remove(task))
                    task.cancel(false);
                else
                    // 预先启动工作线程,确保线程池中有工作线程。
                    ensurePrestart();
            }
        }
    

    这个方法与delayedExecute方法很像,都是将任务添加到任务队列中。

    1. 如果当前线程池状态能够运行任务,那么任务添加到任务队列。
    2. 如果在在添加任务的过程中,线程池状态是否被别的线程更改了,那么就要进行判断,是否需要取消任务。
    3. 调用ensurePrestart()方法,预先启动工作线程,确保线程池中有工作线程。

    3.4 其他重要方法

    3.4.1 取消任务

          public boolean cancel(boolean mayInterruptIfRunning) {
                // 调用父类的cancel方法取消任务
                boolean cancelled = super.cancel(mayInterruptIfRunning);
                // 将任务从任务队列中移除
                if (cancelled && removeOnCancel && heapIndex >= 0)
                    remove(this);
                return cancelled;
            }
    

    3.4.2 getDelay方法

            // 返回任务剩余延时时间
            public long getDelay(TimeUnit unit) {
                return unit.convert(time - now(), NANOSECONDS);
            }
    

    3.4.3 compareTo方法

            public int compareTo(Delayed other) {
                if (other == this)
                    return 0;
                // 如果是ScheduledFutureTask实例
                if (other instanceof ScheduledFutureTask) {
                    ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                    // 比较任务开始时间
                    long diff = time - x.time;
                    if (diff < 0)
                        return -1;
                    else if (diff > 0)
                        return 1;
                    // 比较那么任务是新创建的,因为新创建的sequenceNumber小一点。
                    else if (sequenceNumber < x.sequenceNumber)
                        return -1;
                    else
                        return 1;
                }
                // 比较任务的剩余延时时间
                long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
                return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
            }
    

    3.4.4 isPeriodic方法

           /**
             * 是否是周期定时任务
             */
            public boolean isPeriodic() {
                return period != 0;
            }
    

    四. 创建延时定时线程池

    4.1 构造函数

        public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue());
        }
    
        public ScheduledThreadPoolExecutor(int corePoolSize,
                                           ThreadFactory threadFactory) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue(), threadFactory);
        }
    
        public ScheduledThreadPoolExecutor(int corePoolSize,
                                           RejectedExecutionHandler handler) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue(), handler);
        }
    
        public ScheduledThreadPoolExecutor(int corePoolSize,
                                           ThreadFactory threadFactory,
                                           RejectedExecutionHandler handler) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue(), threadFactory, handler);
        }
    

    都是调用父类ThreadPoolExecutor构造函数的方法,唯一要注意的地方就是任务队列只能是DelayedWorkQueue实例,用户没有办法更换ScheduledThreadPoolExecutor的任务队列属性。

    4.2 创建单个线程的定时线程池

        public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
            return new DelegatedScheduledExecutorService
                (new ScheduledThreadPoolExecutor(1));
        }
    
        public static ScheduledExecutorService newSingleThreadScheduledExecutor(
                     ThreadFactory threadFactory) {
            return new DelegatedScheduledExecutorService
                (new ScheduledThreadPoolExecutor(1, threadFactory));
        }
    

    4.3 创建固定数量的定时线程池

        public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
        }
    
        public static ScheduledExecutorService newScheduledThreadPool(
                int corePoolSize, ThreadFactory threadFactory) {
            return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
        }
    

    相关文章

      网友评论

        本文标题:ScheduledThreadPoolExecutor原理分析

        本文链接:https://www.haomeiwen.com/subject/behskftx.html