美文网首页Android开发程序员技术干货
ScheduledThreadPoolExecutor原理分析

ScheduledThreadPoolExecutor原理分析

作者: wo883721 | 来源:发表于2018-04-16 11:20 被阅读84次

线程池就是维持几个工作线程,然后从任务队列中获取任务执行。所以要实现延时或者定时执行任务,就要做到以下三点:

  1. 任务要能返回它的延时时间和是否为定时任务。
  2. 任务队列要根据任务的延时时间进行排序。这个我们在上一章DelayedWorkQueue原理分析中已经讲解过了。
  3. 如果是定时任务,任务执行完成之后,还可以再次执行它。

所以要分析ScheduledThreadPoolExecutor原理关键就是对它的任务类ScheduledFutureTask的分析。

    private class ScheduledFutureTask<V>
            extends FutureTask<V> implements RunnableScheduledFuture<V> {}

FutureTask类我们已经在以前的文章分析过了,而RunnableScheduledFuture接口的作用是什么呢?

一. RunnableScheduledFuture接口

我们知道要实现延时或者定时执行任务,任务要能返回它的延时时间,任务是否为定时任务,任务能根据延时时间排序。所以可以想象出RunnableScheduledFuture接口中的方法了。

public interface Comparable<T> {
    // 比较两个实例的大小
    public int compareTo(T o);
}
/**
 * 实现Comparable接口,说明Delayed实例可以进行比较
 */
public interface Delayed extends Comparable<Delayed> {

    /**
     * @return 返回剩余的延时时间
     */
    long getDelay(TimeUnit unit);
}

/**
 * 实现了Delayed接口,可以返回延时时间以及能够根据延时时间进行比较
 */
public interface ScheduledFuture<V> extends Delayed, Future<V> {
}

// 继承自Runnable和Future接口。可以当做Runnable实例使用
public interface RunnableFuture<V> extends Runnable, Future<V> {
    // 运行任务
    void run();
}

public interface RunnableScheduledFuture<V> extends 
                 RunnableFuture<V>, ScheduledFuture<V> {

    /**
     * 是否为周期定时任务
     * @return 返回true,表示是定时任务
     */
    boolean isPeriodic();
}

RunnableScheduledFuture接口中,最重要的就是这四个方法:

  1. compareTo(T o): 可以比较任务的延时时间,进行排序用的。
  2. getDelay(TimeUnit unit): 返回任务剩余的延时时间
  3. run(): 运行任务。如果是定时任务,任务完成之后,还可以继续执行。
  4. isPeriodic(): 是否为周期定时任务

二. 何时创建ScheduledFutureTask任务

何时创建ScheduledFutureTask任务,就是ScheduledExecutorService接口四个方法会创建ScheduledFutureTask任务实例。

2.1 创建延时任务

    // 给定的延迟时间delay之后,才会执行任务command
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        /**
         * 创建一个延时任务ScheduledFutureTask实例。
         * triggerTime方法会根据给定的延时时间delay,返回任务开始的时间。
         * decorateTask方法是让子类能够修饰ScheduledFutureTask任务实例,本类中没做处理。
         */
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        // 延时执行任务
        delayedExecute(t);
        return t;
    }

    // 给定的延迟时间delay之后,才会执行任务callable
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        if (callable == null || unit == null)
            throw new NullPointerException();
        /**
         * 创建一个延时任务ScheduledFutureTask实例。
         * triggerTime方法会根据给定的延时时间delay,返回任务开始的时间。
         * decorateTask方法是让子类能够修饰ScheduledFutureTask任务实例,本类中没做处理。
         */
        RunnableScheduledFuture<V> t = decorateTask(callable,
            new ScheduledFutureTask<V>(callable,
                                       triggerTime(delay, unit)));
        // 延时执行任务
        delayedExecute(t);
        return t;
    }

这两个方法的流程是一样的,只不过一个是Runnable类型的任务,一个Callable类型的任务。

  1. 调用triggerTime方法根据给定的延时时间delay,返回任务开始的时间。
  2. 创建一个延时任务ScheduledFutureTask实例。
  3. 调用decorateTask方法是让子类能够修饰ScheduledFutureTask任务实例。
  4. 调用delayedExecute(t)方法延时执行任务。

2.2 创建延时定时任务

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        /**
         * 创建一个延时定时任务ScheduledFutureTask实例。
         * triggerTime方法会根据给定的延时时间delay,返回任务开始的时间。
         */
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        // decorateTask方法是让子类能够修饰ScheduledFutureTask任务实例,本类中没做处理。
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        // outerTask作用就是下一次定时执行的任务,在reExecutePeriodic方法中需要
        sft.outerTask = t;
        // 延时执行任务
        delayedExecute(t);
        return t;
    }

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
        /**
         * 创建一个延时定时任务ScheduledFutureTask实例。
         * triggerTime方法会根据给定的延时时间delay,返回任务开始的时间。
         */
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(-delay));
        // decorateTask方法是让子类能够修饰ScheduledFutureTask任务实例,本类中没做处理。
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        // outerTask作用就是下一次定时执行的任务,在reExecutePeriodic方法中需要
        sft.outerTask = t;
        // 延时执行任务
        delayedExecute(t);
        return t;
    }

我们知道scheduleAtFixedRate方法是固定周期时间去执行任务,而scheduleWithFixedDelay方法是在任务完成之后,延时delay时间再去执行任务。
但是我们发现这两个方法几乎是一模一样的,唯一不同地就是创建延时定时任务ScheduledFutureTask实例时,一个传递的是正数period,一个传递的是负数-delay。

  1. 调用triggerTime方法根据给定的延时时间delay,返回任务开始的时间。
  2. 创建一个延时定时任务ScheduledFutureTask实例。
  3. 调用decorateTask方法是让子类能够修饰ScheduledFutureTask任务实例。
  4. 设置outerTask,作用就是下一次定时执行的任务,在reExecutePeriodic方法中需要。
  5. 延时执行任务。

2.3 triggerTime方法

    /**
     * 返回延时任务开始的时间
     */
    private long triggerTime(long delay, TimeUnit unit) {
        return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
    }

    /**
     * 返回延时任务开始的时间。利用当前时间加上给定的延时时间
     */
    long triggerTime(long delay) {
        return now() +
            ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
    }

2.4 decorateTask方法

    /**
     * 让子类能够修饰或者替换任务task。
     * 这里只是简单地返回task
     */
    protected <V> RunnableScheduledFuture<V> decorateTask(
        Runnable runnable, RunnableScheduledFuture<V> task) {
        return task;
    }

2.5 delayedExecute方法

    private void delayedExecute(RunnableScheduledFuture<?> task) {
        // 如果线程池不是RUNNING状态,那么调用reject(task)方法,
        // 拒绝执行任务task
        if (isShutdown())
            reject(task);
        else {
            // 将任务添加到任务队列中,会根据任务的延时时间进行排序
            super.getQueue().add(task);
            /**
             * 如果线程池不是RUNNING状态,那么就判断能不能在当前状态下运行,
             * 主要就是能不能在SHUTDOWN状态下运行。
             * 如果不能在当前状态下运行,那么就调用remove方法,
             * 从任务队列中移除刚刚添加的任务task。
             *
             * 只有移除成功了,才可以调用task.cancel(false)方法取消任务,
             * 否则这个延时任务task都还要执行。
             */
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                // 预先启动工作线程,确保线程池中有工作线程。
                ensurePrestart();
        }
    }

这个方法的主要作用就是将任务添加到任务队列中,因为这里任务队列是优先级队列DelayedWorkQueue,它会根据任务的延时时间进行排序。

  1. 如果线程池不是RUNNING状态,不能执行延时任务task,那么调用reject(task)方法,拒绝执行任务task。
  2. 将任务添加到任务队列中,会根据任务的延时时间进行排序。
  3. 因为是多线程并发环境,就必须判断在添加任务的过程中,线程池状态是否被别的线程更改了,那么就可能要取消任务了。
  4. 将任务添加到任务队列后,还要确保线程池中有工作线程,不然任务也不为执行。所以ensurePrestart()方法预先启动工作线程,确保线程池中有工作线程。

与ThreadPoolExecutor类中execute方法执行任务方法不同:

  1. 因为是延时任务task,所以不能将任务当成工作线程第一个任务,只能将任务添加到任务队列中,等待着工作线程来执行。
  2. 任务队列DelayedWorkQueue容量几乎是无限的,所以也不需要最大池。除非核心池数量是0,那么必须创建一个工作线程来运行任务,否则线程池的线程数不可能超过核心池数量。
  3. 当线程池状态被别的线程改变时,取消任务的判断条件不同。当线程池状态不是RUNNING,而且不能在当前状态下运行,那么就调用remove方法,移除刚刚添加的任务task。只有移除任务成功,才可以调用task.cancel(false)方法取消任务,否则这个延时任务task都还要执行。

2.6 canRunInCurrentRunState方法

    // 判断能不能在当前状态下运行
    boolean canRunInCurrentRunState(boolean periodic) {
        // 调用父类ThreadPoolExecutor中的isRunningOrShutdown方法
        return isRunningOrShutdown(periodic ?
                                   continueExistingPeriodicTasksAfterShutdown :
                                   executeExistingDelayedTasksAfterShutdown);
    }


    /**
     * 是ThreadPoolExecutor中的方法
     * @param shutdownOK 表示线程池能在SHUTDOWN状态下运行
     */
    final boolean isRunningOrShutdown(boolean shutdownOK) {
        // 线程状态
        int rs = runStateOf(ctl.get());
        // 当线程是RUNNING状态,或者是SHUTDOWN状态且shutdownOK也为true
        return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
    }

canRunInCurrentRunState方法就是返回线程池能不能在当前状态下运行。

注: continueExistingPeriodicTasksAfterShutdown与executeExistingDelayedTasksAfterShutdown都是提供给外部设置。

2.7 ensurePrestart方法

    void ensurePrestart() {
        // 线程池中的线程数量
        int wc = workerCountOf(ctl.get());
        // 如果小于核心池数量,就创建新的工作线程
        if (wc < corePoolSize)
            addWorker(null, true);
        // 说明corePoolSize数量是0,必须创建一个工作线程来执行任务
        else if (wc == 0)
            addWorker(null, false);
    }

ensurePrestart方法作用:预先启动工作线程,确保线程池中有工作线程。

三. ScheduledFutureTask类

ScheduledFutureTask是一个延时定时任务,它可以返回任务剩余延时时间,可以被周期性地执行。

3.1 重要成员属性

        /** 是一个序列,每次创建任务的时候,都会自增。 */
        private final long sequenceNumber;

        /** 任务能够开始执行的时间 */
        private long time;

        /**
         * 任务周期执行的时间
         * 0表示不是一个周期定时任务
         * 正数表示固定周期时间去执行任务
         * 负数表示任务完成之后,延时period时间再去执行任务
         */
        private final long period;

        /** 表示再次执行的任务,在reExecutePeriodic中调用 */
        RunnableScheduledFuture<V> outerTask = this;

        /**
         * 表示在任务队列中的索引位置,用来支持快速从队列中删除任务。
         */
        int heapIndex;

属性说明:

  1. sequenceNumber: 是一个序列,每次创建任务的时候,都会自增。
  2. time: 任务能够开始执行的时间。
  3. period: 任务周期执行的时间。0表示不是一个周期定时任务。
  4. outerTask: 表示再次执行的任务,在reExecutePeriodic中调用
  5. heapIndex: 表示在任务队列中的索引位置,用来支持快速从队列中删除任务。

3.2 构造函数

3.2.1 创建延时任务

        /**
         * 创建延时任务
         */
        ScheduledFutureTask(Runnable r, V result, long ns) {
            // 调用父类的方法
            super(r, result);
            // 任务开始的时间
            this.time = ns;
            // period是0,不是一个周期定时任务
            this.period = 0;
            // 每次创建任务的时候,sequenceNumber都会自增
            this.sequenceNumber = sequencer.getAndIncrement();
        }

         /**
         * 创建延时任务
         */
        ScheduledFutureTask(Callable<V> callable, long ns) {
            // 调用父类的方法
            super(callable);
            // 任务开始的时间
            this.time = ns;
            // period是0,不是一个周期定时任务
            this.period = 0;
            // 每次创建任务的时候,sequenceNumber都会自增
            this.sequenceNumber = sequencer.getAndIncrement();
        }

3.2.2 创建延时定时任务

        /**
         * 创建延时定时任务
         */
        ScheduledFutureTask(Runnable r, V result, long ns, long period) {
            // 调用父类的方法
            super(r, result);
            // 任务开始的时间
            this.time = ns;
            // 周期定时时间
            this.period = period;
            // 每次创建任务的时候,sequenceNumber都会自增
            this.sequenceNumber = sequencer.getAndIncrement();
        }

3.3 运行任务run方法

        public void run() {
            // 是否是周期任务
            boolean periodic = isPeriodic();
            // 如果不能在当前状态下运行,那么就要取消任务
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            // 如果只是延时任务,那么就调用run方法,运行任务。
            else if (!periodic)
                ScheduledFutureTask.super.run();
            // 如果是周期定时任务,调用runAndReset方法,运行任务。
            // 这个方法不会改变任务的状态,所以可以反复执行。
            else if (ScheduledFutureTask.super.runAndReset()) {
                // 设置周期任务下一次执行的开始时间time
                setNextRunTime();
                // 重新执行任务outerTask
                reExecutePeriodic(outerTask);
            }
        }

这个方法会在ThreadPoolExecutor的runWorker方法中调用,而且这个方法调用,说明肯定已经到了任务的开始时间time了。

  1. 先判断当前线程状态能不能运行任务,如果不能,就调用cancel()方法取消本任务。
  2. 如果任务只是一个延时任务,那么调用父类的run()运行任务,改变任务的状态,表示任务已经运行完成了。
  3. 如果任务只是一个周期定时任务,那么就任务必须能够反复执行,那么就不能调用run()方法,它会改变任务的状态。而是调用runAndReset()方法,只是简单地运行任务,而不会改变任务状态。
  4. 设置周期任务下一次执行的开始时间time,并重新执行任务。

setNextRunTime设置任务下一次执行的开始时间time的方法:

        /**
         * 设置任务下一次执行的开始时间time
         */
        private void setNextRunTime() {
            // 周期时间
            long p = period;
            if (p > 0)
                time += p;
            else
                time = triggerTime(-p);
        }

我们发现当p大于0的时候,调用time += p,那么就可能出现计算后的time可能小于当前时间,因为任务执行时间超过了周期时间p,所以将这个任务添加到任务队列中,它就会立即执行。
而当p小于0的时候,调用time = triggerTime(-p)方法,就是在当前时间上,再加上-p的延时时间,所以这个任务添加到任务队列中,必须延时-p时间后,才能执行。

reExecutePeriodic重新执行任务的方法:

    /**
     * 重新执行任务task
     */
    void reExecutePeriodic(RunnableScheduledFuture<?> task) {
        // 判断当前线程池状态能不能运行任务
        if (canRunInCurrentRunState(true)) {
            // 将任务添加到任务队列,会根据任务延时时间进行排序
            super.getQueue().add(task);
            // 如果线程池状态改变了,当前状态不能运行任务,那么就尝试移除任务,
            // 移除成功,就取消任务。
            if (!canRunInCurrentRunState(true) && remove(task))
                task.cancel(false);
            else
                // 预先启动工作线程,确保线程池中有工作线程。
                ensurePrestart();
        }
    }

这个方法与delayedExecute方法很像,都是将任务添加到任务队列中。

  1. 如果当前线程池状态能够运行任务,那么任务添加到任务队列。
  2. 如果在在添加任务的过程中,线程池状态是否被别的线程更改了,那么就要进行判断,是否需要取消任务。
  3. 调用ensurePrestart()方法,预先启动工作线程,确保线程池中有工作线程。

3.4 其他重要方法

3.4.1 取消任务

      public boolean cancel(boolean mayInterruptIfRunning) {
            // 调用父类的cancel方法取消任务
            boolean cancelled = super.cancel(mayInterruptIfRunning);
            // 将任务从任务队列中移除
            if (cancelled && removeOnCancel && heapIndex >= 0)
                remove(this);
            return cancelled;
        }

3.4.2 getDelay方法

        // 返回任务剩余延时时间
        public long getDelay(TimeUnit unit) {
            return unit.convert(time - now(), NANOSECONDS);
        }

3.4.3 compareTo方法

        public int compareTo(Delayed other) {
            if (other == this)
                return 0;
            // 如果是ScheduledFutureTask实例
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                // 比较任务开始时间
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                // 比较那么任务是新创建的,因为新创建的sequenceNumber小一点。
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            // 比较任务的剩余延时时间
            long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
        }

3.4.4 isPeriodic方法

       /**
         * 是否是周期定时任务
         */
        public boolean isPeriodic() {
            return period != 0;
        }

四. 创建延时定时线程池

4.1 构造函数

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }

    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), handler);
    }

    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory, handler);
    }

都是调用父类ThreadPoolExecutor构造函数的方法,唯一要注意的地方就是任务队列只能是DelayedWorkQueue实例,用户没有办法更换ScheduledThreadPoolExecutor的任务队列属性。

4.2 创建单个线程的定时线程池

    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }

    public static ScheduledExecutorService newSingleThreadScheduledExecutor(
                 ThreadFactory threadFactory) {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1, threadFactory));
    }

4.3 创建固定数量的定时线程池

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

    public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }

相关文章

网友评论

    本文标题:ScheduledThreadPoolExecutor原理分析

    本文链接:https://www.haomeiwen.com/subject/behskftx.html