美文网首页
ScheduledThreadPoolExecutor原理和源码

ScheduledThreadPoolExecutor原理和源码

作者: 无聊之园 | 来源:发表于2019-05-01 18:46 被阅读0次

    延迟以及周期性执行线程池。

     public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue());
        }
    

    堵塞队列为DelayedWorkQueue,这是ScheduledThreadPoolExecutor的内部类。看其grow扩容方法会发现,这个队列最大大小为Integer.MAX_VALUE。所以队列可以放置很多元素。

    看堵塞队列的DelayedWorkQueue的offer方法
    可以看出offer的元素是RunnableScheduledFuture类型。详细的流程不看,设计堆的调整等操作,目的是保证一个有优先级级的队列,延迟时间短的优先级高,在队列的最前面。

    public boolean offer(Runnable x) {
                if (x == null)
                    throw new NullPointerException();
                // offer的元素是RunnableScheduledFuture类型
                RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
                final ReentrantLock lock = this.lock;
                lock.lock();
                try {
                    int i = size;
                    if (i >= queue.length)
                        grow();
                    size = i + 1;
                    if (i == 0) {
                        queue[0] = e;
                        setIndex(e, 0);
                    } else {
                        siftUp(i, e);
                    }
                    if (queue[0] == e) {
                        leader = null;
                        available.signal();
                    }
                } finally {
                    lock.unlock();
                }
                return true;
            }
    

    看DelayedWorkQueue队列保存的元素类型RunnableScheduledFuture。
    RunnableScheduledFuture的唯一一个自己的方法是isPeriodic,判断这个任务时候是周期性任务。
    RunnableScheduledFuture的父类是RunnableFuture继承了Runnable接口,所以RunnableScheduledFuture也可以看成一个线程。
    还有一个父类是ScheduledFuture,其继承了Delayed接口,Delayed接口唯一的方法就是获取任务的剩余延迟时间,以供延迟队列的延迟获取元素。
    所以RunnableScheduledFuture类型元素,即是一个线程,有run方法,也可以判断是否周期性执行,又可以获取任务剩余延迟时间。

    public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {
    
        /**
         * Returns {@code true} if this task is periodic. A periodic task may
         * re-run according to some schedule. A non-periodic task can be
         * run only once.
         *
         * @return {@code true} if this task is periodic
         */
        boolean isPeriodic();
    }
    public interface RunnableFuture<V> extends Runnable, Future<V> {
        /**
         * Sets this Future to the result of its computation
         * unless it has been cancelled.
         */
        void run();
    }
    public interface ScheduledFuture<V> extends Delayed, Future<V> {
    }
    

    看堵塞队列的take方法

    public RunnableScheduledFuture<?> take() throws InterruptedException {
                final ReentrantLock lock = this.lock;
                lock.lockInterruptibly();
                try {
                    for (;;) {
                        RunnableScheduledFuture<?> first = queue[0];
                        if (first == null)
                            available.await();
                        else {
                            // 获取队列顶部元素的剩余延迟时间
                            long delay = first.getDelay(NANOSECONDS);
                            // 剩余延迟时间小于0,则finishPoll调整堆,然后直接返回这个元素
                            if (delay <= 0)
                                return finishPoll(first);
                            first = null; // don't retain ref while waiting
                            // 剩余延迟时间大于0。如果leader不为空,说明已经有线程成为leader并等待堆顶任务
                            if (leader != null)
                                available.await();
                            else {
                                Thread thisThread = Thread.currentThread();
                                leader = thisThread;
                                try {
                                    // 堵塞等待delay时间
                                    available.awaitNanos(delay);
                                } finally {
                                    if (leader == thisThread)
                                        leader = null;
                                }
                            }
                        }
                    }
                } finally {
                    if (leader == null && queue[0] != null)
                        available.signal();
                    lock.unlock();
                }
            }
    

    take方法总结:take方法和普通的延迟队列一样,比如delayQueue。等待队列头部元素,剩余延迟时间过了之后,才能获取到值。

    DelayedWorkQueue延迟队列分析完了之后,分析ScheduledThreadPoolExecutor的关键方法。

    构造方法:除了堵塞队列以外,和ThreadPoolExcutor差不多,但是由于DelayedWorkQueue堵塞队列是无限大的,所以,不存在最大线程数以及最大线程数空闲时间。

    public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue());
        }
    

    execute方法:延迟时间为0,所以正常执行,该启动新线程则启动新线程,该放入队列则放入队列。

     public void execute(Runnable command) {
            schedule(command, 0, NANOSECONDS);
        }
    
    public ScheduledFuture<?> schedule(Runnable command,
                                           long delay,
                                           TimeUnit unit) {
            if (command == null || unit == null)
                throw new NullPointerException();
            // decorateTask方法只是返回最后一个入参,什么都没做。
            // ScheduledFutureTask类是RunnableScheduledFuture的实现类,所以ScheduledFutureTask类可以放入DelayWorkQueue堵塞队列
            RunnableScheduledFuture<?> t = decorateTask(command,
                new ScheduledFutureTask<Void>(command, null,
                                              triggerTime(delay, unit)));
             // delayedExecute方法把RunnableScheduledFuture放入堵塞队列,如果线程数小于核心线程,则开启线程从队列中取任务
            delayedExecute(t);
            return t;
        }
    // ScheduledFutureTask构造方法,period为0,说明不是周期性任务,time表示延迟时间。
    ScheduledFutureTask(Runnable r, V result, long ns) {
                super(r, result);
                this.time = ns;
                this.period = 0;
                this.sequenceNumber = sequencer.getAndIncrement();
            }
    
    private void delayedExecute(RunnableScheduledFuture<?> task) {
            if (isShutdown())
                reject(task);
            else {
                 // 放入堵塞队列
                super.getQueue().add(task);
                if (isShutdown() &&
                    !canRunInCurrentRunState(task.isPeriodic()) &&
                    remove(task))
                    task.cancel(false);
                else
                    // 如果线程数小于核心线程,则开启线程从队列中取任务
                    ensurePrestart();
            }
        }
    
    void ensurePrestart() {
            int wc = workerCountOf(ctl.get());
          // 已经开启线程数小于核心线程数,则开启新线程,
    // addWorker传入的第一参数为null,也就是worker的firstTask为null,所以直接从getTask()直接从队列中获取任务,
    // 堵塞队列是延迟队列,所以可能会延迟获取到任务。
            if (wc < corePoolSize)
                addWorker(null, true);
            else if (wc == 0)
                addWorker(null, false);
        }
    

    之后的流程就是,和和普通线程池一样,ThreadPoolExecutor的真正工作的内部类worker,其就是开启的线程类的runnable入参,线程start启动的时候,会运行workker的run方法,workker的run方法会循环从队列中获取任务执行,因为是延迟队列,所以会又延迟的效果,延迟的原理前面分析堵塞队列的时候分析了,condition.awaitNanos方法实现延迟等待。

    获取任务后,运行任务的run方法。

    public void run() {
                // 是否周期执行
                boolean periodic = isPeriodic();
                if (!canRunInCurrentRunState(periodic))
                    cancel(false);
                else if (!periodic)
                    // 不周期执行,则直接执行
                    ScheduledFutureTask.super.run();
                // 如果周期执行,则重置任务,再放入队列等待执行
              else if (ScheduledFutureTask.super.runAndReset()) {
                    setNextRunTime();
                    reExecutePeriodic(outerTask);
                }
            }
    

    scheduleAtFixedRate:该方法在initialDelay时长后第一次执行任务,以后每隔period时长,再次执行任务。注意,period是从任务开始执行算起的。开始执行任务后,定时器每隔period时长检查该任务是否完成,如果完成则再次启动任务,否则等该任务结束后才再次启动任务,看下图示例。

     new ScheduledFutureTask<Void>(command,
                                              null,
                                              triggerTime(initialDelay, unit),
                                              unit.toNanos(period)
    

    scheduleWithFixDelay:该方法在initialDelay时长后第一次执行任务,以后每当任务执行完成后,等待delay时长,再次执行任务,看下图示例。

    new ScheduledFutureTask<Void>(command,
                                             null,
                                             triggerTime(initialDelay, unit),
                                             unit.toNanos(-delay)
    

    和scheduledAtFixedRate类似,唯一不同的地方在于在于创建的ScheduledFutureTask不同

    相关文章

      网友评论

          本文标题:ScheduledThreadPoolExecutor原理和源码

          本文链接:https://www.haomeiwen.com/subject/hxjwnqtx.html