美文网首页
ScheduledThreadPoolExecutor详解

ScheduledThreadPoolExecutor详解

作者: 永远的太阳0123 | 来源:发表于2018-12-21 13:36 被阅读0次

    1 ScheduledThreadPoolExecutor.ScheduledFutureTask内部类
    ScheduledThreadPoolExecutor.ScheduledFutureTask继承了FutureTask,实现了RunnableScheduledFuture接口。
    1.1 ScheduledThreadPoolExecutor.ScheduledFutureTask中的字段
    (1)sequenceNumber:任务序号。
    (2)time:何时执行本次任务。
    (3)period:period大于0,period代表前一次开始执行任务到下一次开始执行任务的时间;period等于0,说明这个任务是一次性任务;period小于0,period的绝对值代表前一次执行任务完毕到下一次开始执行任务的时间。
    (4)outerTask:本次任务执行完毕,下一周期需要执行的任务。
    (5)heapIndex:任务在阻塞队列中的索引。

            private final long sequenceNumber;
            private long time;
            private final long period;
            RunnableScheduledFuture<V> outerTask = this;
            int heapIndex;
    

    1.2 ScheduledThreadPoolExecutor.ScheduledFutureTask中的构造方法

            // 创建一个一次性任务
            ScheduledFutureTask(Runnable r, V result, long ns) {
                // 将r和result包装成callable
                super(r, result);
                this.time = ns;
                this.period = 0;
                this.sequenceNumber = sequencer.getAndIncrement();
            }
            // 创建一个周期性任务
            ScheduledFutureTask(Runnable r, V result, long ns, long period) {
                // 将r和result包装成callable
                super(r, result);
                this.time = ns;
                this.period = period;
                this.sequenceNumber = sequencer.getAndIncrement();
            }
            // 创建一个一次性任务
            ScheduledFutureTask(Callable<V> callable, long ns) {
                super(callable);
                this.time = ns;
                this.period = 0;
                this.sequenceNumber = sequencer.getAndIncrement();
            }
    

    1.3 ScheduledThreadPoolExecutor.ScheduledFutureTask中的isPeriodic方法

            // 判断任务是不是周期性任务
            public boolean isPeriodic() {
                return period != 0;
            }
    

    1.4 ScheduledThreadPoolExecutor.ScheduledFutureTask中的cancel方法

            public boolean cancel(boolean mayInterruptIfRunning) {
                // 调用父类FutureTask中的cancel方法
                boolean cancelled = super.cancel(mayInterruptIfRunning);
                // 如果(成功取消这个任务)并且(调用本方法时可以从阻塞队列中移除这个任务)并且(这个任务在阻塞队列中的索引大于0)
                if (cancelled && removeOnCancel && heapIndex >= 0)
                    // 从阻塞队列中移除这个任务
                    remove(this);
                return cancelled;
            }
    

    1.5 ScheduledThreadPoolExecutor.ScheduledFutureTask中的run方法

            public void run() {
                boolean periodic = isPeriodic();
                // 如果当前线程池状态下不执行当前任务
                if (!canRunInCurrentRunState(periodic))
                    // 取消当前任务
                    cancel(false);
                // 如果当前任务是一次性任务
                else if (!periodic)
                    // 执行FutureTask中的run方法
                    ScheduledFutureTask.super.run();
                // 运行到这里,说明当前任务是周期性任务
                // 执行FutureTask中的runAndReset方法,如果callable中的call方法正常结束并且任务运行状态处于NEW状态
                else if (ScheduledFutureTask.super.runAndReset()) {
                    // 设置下一周期何时执行任务
                    setNextRunTime();
                    // 尝试将outerTask添加到阻塞队列
                    reExecutePeriodic(outerTask);
                }
            }
    

    1.5.1 ScheduledThreadPoolExecutor.ScheduledFutureTask中的setNextRunTime方法

            private void setNextRunTime() {
                long p = period;
                // p代表前一次开始执行任务到下一次开始执行任务的时间
                if (p > 0)
                    time += p;
                // p的绝对值代表前一次执行任务完毕到下一次开始执行任务的时间
                else
                    time = triggerTime(-p);
            }
    

    1.5.2 ScheduledThreadPoolExecutor中的reExecutePeriodic方法

        // 尝试将给定任务添加到阻塞队列
        void reExecutePeriodic(RunnableScheduledFuture<?> task) {
            // 如果当前线程池状态下执行周期性任务
            if (canRunInCurrentRunState(true)) {
                // 将给定任务添加到阻塞队列
                super.getQueue().add(task);
                // 如果当前线程池状态下不执行周期性任务并且成功从阻塞队列中移除给定任务
                if (!canRunInCurrentRunState(true) && remove(task))
                    // 取消给定任务
                    task.cancel(false);
                else
                    ensurePrestart();
            }
        }
    

    1.6 ScheduledThreadPoolExecutor.ScheduledFutureTask中的getDelay方法

            // 距离执行本次任务还有多长时间
            public long getDelay(TimeUnit unit) {
                return unit.convert(time - now(), NANOSECONDS);
            }
    

    1.7 ScheduledThreadPoolExecutor.ScheduledFutureTask中的compareTo方法

            public int compareTo(Delayed other) {
                // 如果两个对象相同
                if (other == this)
                    return 0;
                if (other instanceof ScheduledFutureTask) {
                    ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                    long diff = time - x.time;
                    // 先比较time
                    if (diff < 0)
                        return -1;
                    else if (diff > 0)
                        return 1;
                    // 再比较sequenceNumber
                    else if (sequenceNumber < x.sequenceNumber)
                        return -1;
                    else
                        return 1;
                }
                long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
                return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
            }
    

    2 ScheduledThreadPoolExecutor.DelayedWorkQueue内部类
    2.1 字段

            private static final int INITIAL_CAPACITY = 16;
            // 堆数组
            private RunnableScheduledFuture<?>[] queue =
                new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
            private final ReentrantLock lock = new ReentrantLock();
            private int size = 0;
            private Thread leader = null;
            private final Condition available = lock.newCondition();
    

    2.2 添加元素
    (1)add方法

            public boolean add(Runnable e) {
                return offer(e);
            }
    

    (2)offer方法

            public boolean offer(Runnable x) {
                if (x == null)
                    throw new NullPointerException();
                // 强制转换
                RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
                final ReentrantLock lock = this.lock;
                lock.lock();
                try {
                    int i = size;
                    // 如果size大于数组的长度
                    if (i >= queue.length)
                        // 调正数组大小
                        grow();
                    size = i + 1;
                    // 如果之前
                    if (i == 0) {
                        queue[0] = e;
                        setIndex(e, 0);
                    } else {
                        siftUp(i, e);
                    }
                    if (queue[0] == e) {
                        leader = null;
                        available.signal();
                    }
                } finally {
                    lock.unlock();
                }
                return true;
            }
    

    (4)grow方法
    只有拿到锁,才能调用grow方法。

            // 堆数组扩容
            private void grow() {
                int oldCapacity = queue.length;
                // 扩容50%
                int newCapacity = oldCapacity + (oldCapacity >> 1);
                // 如果溢出
                if (newCapacity < 0)
                    newCapacity = Integer.MAX_VALUE;
                // 将元素复制到新数组中
                queue = Arrays.copyOf(queue, newCapacity);
            }
    

    (5)setIndex方法

            private void setIndex(RunnableScheduledFuture<?> f, int idx) {
                if (f instanceof ScheduledFutureTask)
                    ((ScheduledFutureTask)f).heapIndex = idx;
            }
    

    (6)siftUp方法

            private void siftUp(int k, RunnableScheduledFuture<?> key) {
                while (k > 0) {
                    int parent = (k - 1) >>> 1;
                    RunnableScheduledFuture<?> e = queue[parent];
                    if (key.compareTo(e) >= 0)
                        break;
                    queue[k] = e;
                    setIndex(e, k);
                    k = parent;
                }
                queue[k] = key;
                setIndex(key, k);
            }
    

    3 ScheduledThreadPoolExecutor
    ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,实现了ScheduledExecutorService接口。
    3.1 ScheduledThreadPoolExecutor中的字段
    (1)continueExistingPeriodicTasksAfterShutdown:线程池处于SHUTDOWN状态,是否继续执行现有的周期性任务。
    (2)executeExistingDelayedTasksAfterShutdown:线程池处于SHUTDOWN状态,是否执行现有的一次性任务。
    (3)removeOnCancel:调用ScheduledThreadPoolExecutor.ScheduledFutureTask中的cancel方法,是否从阻塞队列中移除这个任务。
    (4)sequencer:可以生成ScheduledThreadPoolExecutor.ScheduledFutureTask中的sequenceNumber,即任务序号。

        private volatile boolean continueExistingPeriodicTasksAfterShutdown;
        private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
        private volatile boolean removeOnCancel = false;
        private static final AtomicLong sequencer = new AtomicLong();
    

    3.2 ScheduledThreadPoolExecutor中的schedule方法

        // 执行一个一次性任务
        // command:command中的run方法代表需要执行的任务
        // delay:执行任务的延迟
        public ScheduledFuture<?> schedule(Runnable command,
                                           long delay,
                                           TimeUnit unit) {
            if (command == null || unit == null)
                throw new NullPointerException();
            // 将任务包装成ScheduledThreadPoolExecutor.ScheduledFutureTask对象
            // decorateTask方法直接返回这个ScheduledThreadPoolExecutor.ScheduledFutureTask对象
            // 子类中可以重写decorateTask方法
            RunnableScheduledFuture<?> t = decorateTask(command,
                new ScheduledFutureTask<Void>(command, null,
                                              triggerTime(delay, unit)));
            delayedExecute(t);
            return t;
        }
    
        // 执行一个一次性任务
        // callable:callable中的call方法代表需要执行的任务
        // delay:执行任务的延迟
        public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                               long delay,
                                               TimeUnit unit) {
            if (callable == null || unit == null)
                throw new NullPointerException();
            // 将任务包装成ScheduledThreadPoolExecutor.ScheduledFutureTask对象
            // decorateTask方法直接返回这个ScheduledThreadPoolExecutor.ScheduledFutureTask对象
            // 子类中可以重写decorateTask方法
            RunnableScheduledFuture<V> t = decorateTask(callable,
                new ScheduledFutureTask<V>(callable,
                                           triggerTime(delay, unit)));
            delayedExecute(t);
            return t;
        }
    

    3.3 ScheduledThreadPoolExecutor中的scheduleAtFixedRate方法

        // 执行一个周期性任务
        // 计划中开始执行任务的时间:initialDelay、initialDelay+period、initialDelay+2*period……
        // 如果某一次执行任务抛出异常,则禁止后续的执行。否则,只能通过取消任务或关闭线程池的方式结束任务
        // 如果某一次执行任务的时间超过了period,则推迟后续的执行。一个任务不会被两个线程同时执行
        // command:command中的run方法代表需要执行的任务
        // initialDelay:第一次执行任务的延迟
        // period:前一次开始执行任务到下一次开始执行任务的时间
        public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                      long initialDelay,
                                                      long period,
                                                      TimeUnit unit) {
            if (command == null || unit == null)
                throw new NullPointerException();
            if (period <= 0)
                throw new IllegalArgumentException();
            // 将任务包装成ScheduledThreadPoolExecutor.ScheduledFutureTask对象
            ScheduledFutureTask<Void> sft =
                new ScheduledFutureTask<Void>(command,
                                              null,
                                              triggerTime(initialDelay, unit),
                                              unit.toNanos(period));
            // decorateTask方法直接返回传入的ScheduledThreadPoolExecutor.ScheduledFutureTask对象
            // 子类中可以重写decorateTask方法
            RunnableScheduledFuture<Void> t = decorateTask(command, sft);
            sft.outerTask = t;
            delayedExecute(t);
            return t;
        }
    

    3.4 ScheduledThreadPoolExecutor中的scheduleWithFixedDelay方法

        // 执行一个周期性任务
        // 如果某一次执行任务抛出异常,则禁止后续的执行。否则,只能通过取消任务或关闭线程池的方式结束任务
        // command:command中的run方法代表需要执行的任务
        // initialDelay:第一次执行任务的延迟
        // delay:前一次执行任务完毕到下一次开始执行任务的时间
        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                         long initialDelay,
                                                         long delay,
                                                         TimeUnit unit) {
            if (command == null || unit == null)
                throw new NullPointerException();
            if (delay <= 0)
                throw new IllegalArgumentException();
            // 将任务包装成ScheduledThreadPoolExecutor.ScheduledFutureTask对象
            ScheduledFutureTask<Void> sft =
                new ScheduledFutureTask<Void>(command,
                                              null,
                                              triggerTime(initialDelay, unit),
                                              unit.toNanos(-delay));
            // decorateTask方法直接返回传入的ScheduledThreadPoolExecutor.ScheduledFutureTask对象
            // 子类中可以重写decorateTask方法
            RunnableScheduledFuture<Void> t = decorateTask(command, sft);
            sft.outerTask = t;
            delayedExecute(t);
            return t;
        }
    

    3.5 ScheduledThreadPoolExecutor中的delayedExecute方法
    schedule方法、scheduleAtFixedRate方法和scheduleWithFixedDelay方法都调用了delayedExecute方法。

        private void delayedExecute(RunnableScheduledFuture<?> task) {
            // 如果线程池不处于RUNNING状态
            if (isShutdown())
                // 执行拒绝策略
                reject(task);
            // 如果线程池处于RUNNING状态
            else {
                // 将当前任务task添加到阻塞队列
                super.getQueue().add(task);
                // 如果(线程池已不处于RUNNING状态)并且(当前线程池状态下不执行task)并且(成功从阻塞队列中移除task)
                if (isShutdown() &&
                    !canRunInCurrentRunState(task.isPeriodic()) &&
                    remove(task))
                    // 取消当前任务task
                    task.cancel(false);
                else
                    ensurePrestart();
            }
        }
    

    3.5.1 ThreadPoolExecutor中的isShutdown方法

        // 判断线程池是否处于RUNNING状态。
        public boolean isShutdown() {
            return ! isRunning(ctl.get());
        }
    

    3.5.2 ScheduledThreadPoolExecutor中的canRunInCurrentRunState方法

        // 判断当前线程池状态下是否执行一次性任务或周期性任务。
        boolean canRunInCurrentRunState(boolean periodic) {
            return isRunningOrShutdown(periodic ?
                                       continueExistingPeriodicTasksAfterShutdown :
                                       executeExistingDelayedTasksAfterShutdown);
        }
    

    3.5.2.1 ThreadPoolExecutor中的isRunningOrShutdown方法

        final boolean isRunningOrShutdown(boolean shutdownOK) {
            int rs = runStateOf(ctl.get());
            // 如果(线程池处于RUNNING状态)或(线程池处于SHUTDOWN状态并且参数shutdownOK为true),返回true
            return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
        }
    

    3.5.3 ThreadPoolExecutor中的ensurePrestart方法

        void ensurePrestart() {
            int wc = workerCountOf(ctl.get());
            // 如果workerCount小于核心线程数
            if (wc < corePoolSize)
                // 尝试启动一个新线程,并且将这个线程的第一个任务设置为null
                addWorker(null, true);
            // 如果workerCount和核心线程数都等于0
            else if (wc == 0)
                // 尝试启动一个新线程,并且将这个线程的第一个任务设置为null
                addWorker(null, false);
        }
    

    相关文章

      网友评论

          本文标题:ScheduledThreadPoolExecutor详解

          本文链接:https://www.haomeiwen.com/subject/pzumkqtx.html