美文网首页JAVAAndroid知识Android技术知识
Java线程池原理分析ScheduledThreadPoolEx

Java线程池原理分析ScheduledThreadPoolEx

作者: Mars_M | 来源:发表于2017-06-14 17:10 被阅读209次

前言

在上一篇线程池的文章《Java线程池原理分析ThreadPoolExecutor篇》中从ThreadPoolExecutor源码分析了其运行机制。限于篇幅,留下了ScheduledThreadPoolExecutor未做分析,因此本文继续从源代码出发分析ScheduledThreadPoolExecutor的内部原理。

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor同ThreadPoolExecutor一样也可以从 Executors线程池工厂创建,所不同的是它具有定时执行,以周期或间隔循环执行任务等功能。

public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

}

public interface ScheduledExecutorService extends ExecutorService {
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

}

ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,因此它具有ThreadPoolExecutor的所有能力。
通过super方法的参数可知,核心线程的数量即传入的参数,而线程池的线程数为Integer.MAX_VALUE,几乎为无上限。
这里采用了DelayedWorkQueue任务队列,也是定时任务的核心,留在后面分析。

ScheduledThreadPoolExecutor实现了ScheduledExecutorService 中的接口:

public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

延时执行Callable任务的功能。

 public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

延时执行Runnable任务的功能。

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

可以延时循环执行周期性任务。

假设任务执行时间固定2s,period为1s,因为任务的执行时间大于规定的period,所以任务会每隔2s(任务执行时间)开始执行一次。如果任务执行时间固定为0.5s,period为1s,因为任务执行时间小于period,所以任务会每隔1s(period)开始执行一次。实际任务的执行时间即可能是大于period的,也可能小于period,scheduleAtFixedRate的好处就是每次任务的开始时间间隔必然大于等于period。

假设一项业务需求每天凌晨3点将数据库备份,然而数据库备份的时间小于24H,最适合用scheduleAtFixedRate方法实现。

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

可以延时以相同间隔时间循环执行任务。

假设任务执行的时间固定为2s,delay为1s,那么任务会每隔3s(任务时间+delay)开始执行一次。

如果业务需求本次任务的结束时间与下一个任务的开始时间固定,使用scheduleWithFixedDelay可以方便地实现业务。

ScheduledFuture

四个执行任务的方法都返回了ScheduledFuture对象,它与Future有什么区别?

public interface ScheduledFuture<V> extends Delayed, Future<V> {
}

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

public interface Comparable<T> {
    public int compareTo(T o);
}

可以看到ScheduledFuture也继承了Future,并且继承了Delayed,增加了getDelay方法,而Delayed继承自Comparable,所以具有compareTo方法。

四种执行定时任务的方法

schedule(Runnable command,long delay, TimeUnit unit)

    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }

这个方法中出现了几个陌生的类,首先是ScheduledFutureTask:

private class ScheduledFutureTask<V>  extends FutureTask<V> implements RunnableScheduledFuture<V> {
            ...
            ScheduledFutureTask(Runnable r, V result, long ns) {
              super(r, result);
              this.time = ns;
              this.period = 0;
              this.sequenceNumber = sequencer.getAndIncrement();
           }
}

public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {
    boolean isPeriodic();
}

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

这个类是ScheduledThreadPoolExecutor的内部类,继承自FutureTask实现了RunnableScheduledFuture接口。RunnableScheduledFuture有些复杂,继承自RunnableFuture和ScheduledFuture接口。可见ScheduledThreadPoolExecutor身兼多职。这个类既可以作为Runnable被线程执行,又可以作为FutureTask用于获取Callable任务call方法返回的结果。

在FutureTask的构造方法中传入Runnable对象会将其转换为返回值为null的Callable对象。

/**
     * Modifies or replaces the task used to execute a runnable.
     * This method can be used to override the concrete
     * class used for managing internal tasks.
     * The default implementation simply returns the given task.
     *
     * @param runnable the submitted Runnable
     * @param task the task created to execute the runnable
     * @param <V> the type of the task's result
     * @return a task that can execute the runnable
     * @since 1.6
     */
    protected <V> RunnableScheduledFuture<V> decorateTask(
        Runnable runnable, RunnableScheduledFuture<V> task) {
        return task;
    }

从decorateTask的字面意义判断它将具体的RunnableScheduledFuture实现类向上转型为RunnableScheduledFuture接口。从它的方法描述和实现看出它只是简单的将ScheduledFutureTask向上转型为RunnableScheduledFuture接口,由protected 修饰符可知设计者希望子类扩展这个方法的实现。

之所以向上转型为RunnableScheduledFuture接口,设计者也是希望将具体与接口分离。

    private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            //情况一
            reject(task);
        else {
            super.getQueue().add(task);
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                //情况二
                task.cancel(false);
            else
                //情况三
                ensurePrestart();
        }
    }

    boolean canRunInCurrentRunState(boolean periodic) {
        return isRunningOrShutdown(periodic ?
                                   continueExistingPeriodicTasksAfterShutdown :
                                   executeExistingDelayedTasksAfterShutdown);
    }

    private volatile boolean executeExistingDelayedTasksAfterShutdown = true;

    final boolean isRunningOrShutdown(boolean shutdownOK) {
        int rs = runStateOf(ctl.get());
        return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
    }

delayedExecute方法负责执行延时任务。
情况一 : 先判断线程池是否关闭,若关闭则拒绝任务。
情况二:线程池未关闭,将任务添加到父类的任务队列,即DelayedWorkQueue中。下面再次判断线程池是否关闭,并且判断canRunInCurrentRunState方法的返回值是否为false。因为传入Runnable参数,task.isPeriodic()为false,所以isRunningOrShutdown返回true。所以这里不会执行到。
情况三:任务成功添加到任务队列,执行ensurePrestart方法。

    void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
    }

addWorker已经在ThreadPoolExecutor篇分析过,该方法负责同步将线程池数量+1,并且创建Worker对象添加到HashSet中,最后开启Worker对象中的线程。因为RunnableScheduledFuture对象已经被添加到任务队列,Worker中的线程通过getTask方法自然会取到DelayedWorkQueue中的RunnableScheduledFuture任务并执行它的run方法。

这里需要注意的是addWorker方法只在核心线程数未达上限或者没有线程的情况下执行,并不像ThreadPoolExecutor那样可以同时存在多个非核心线程,ScheduledThreadPoolExecutor最多只支持一个非核心线程,除非它终止了不会再创建新的非核心线程。

schedule(Callable<V> callable, long delay, TimeUnit unit)

    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        if (callable == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<V> t = decorateTask(callable,
            new ScheduledFutureTask<V>(callable,
                                       triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }

    protected <V> RunnableScheduledFuture<V> decorateTask(
        Callable<V> callable, RunnableScheduledFuture<V> task) {
        return task;
    }

     ScheduledFutureTask(Callable<V> callable, long ns) {
            super(callable);
            this.time = ns;
            this.period = 0;
            this.sequenceNumber = sequencer.getAndIncrement();
        }

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

与schedule(Runnable command,long delay,TimeUnit unit)相比除了可以通过ScheduledFutureTask的get方法得到返回值外没有区别。

scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

与上述两个方法的区别在于ScheduledFutureTask的构造函数多了参数period,即任务执行的最小周期:

        ScheduledFutureTask(Runnable r, V result, long ns, long period) {
            super(r, result);
            this.time = ns;
            this.period = period;
            this.sequenceNumber = sequencer.getAndIncrement();
        }

scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(-delay));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

与scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)的区别是参数delay传入到ScheduledFutureTask的构造方法中是以负数的形式。

小结

四种延时启动任务的方法除了构造ScheduledFutureTask的参数不同外,运行机制是相同的。先将任务添加到DelayedWorkQueue 中,然后创建Worker对象,启动内部线程轮询DelayedWorkQueue 中的任务。

那么DelayedWorkQueue的add方法是如何实现的,线程轮询DelayedWorkQueue 调用的poll和take方法又如何实现?

回顾getTask方法获取任务时的代码片段:

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();

如果我们设置ScheduledThreadPoolExecutor的核心线程数量为0,则执行poll方法。而对于核心线程则执行take方法。

下面分析DelayedWorkQueue 的具体实现。

DelayedWorkQueue

static class DelayedWorkQueue extends AbstractQueue<Runnable>
        implements BlockingQueue<Runnable> {
        private static final int INITIAL_CAPACITY = 16;
        private RunnableScheduledFuture<?>[] queue =
            new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
        private final ReentrantLock lock = new ReentrantLock();
        private int size = 0;
}

首先DelayedWorkQueue 是ScheduledThreadPoolExecutor的静态内部类。它的内部有一个RunnableScheduledFuture数组,且初始容量为16.这里提前说明下,queue 数组储存的其实是二叉树结构的索引,这个二叉树其实就是最小堆。

add方法

        public boolean add(Runnable e) {
            return offer(e);
        }

        public boolean offer(Runnable x) {
            if (x == null)
                throw new NullPointerException();
            RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int i = size;
                if (i >= queue.length)
                    grow();
                size = i + 1;
                if (i == 0) {
                    queue[0] = e;
                    setIndex(e, 0);
                } else {
                    siftUp(i, e);
                }
                if (queue[0] == e) {
                    leader = null;
                    available.signal();
                }
            } finally {
                lock.unlock();
            }
            return true;
        }

        private void grow() {
            int oldCapacity = queue.length;
            int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
            if (newCapacity < 0) // overflow
                newCapacity = Integer.MAX_VALUE;
            queue = Arrays.copyOf(queue, newCapacity);
        }

        private void setIndex(RunnableScheduledFuture<?> f, int idx) {
            if (f instanceof ScheduledFutureTask)
                ((ScheduledFutureTask)f).heapIndex = idx;
        }

        private void siftUp(int k, RunnableScheduledFuture<?> key) {
            while (k > 0) {
                int parent = (k - 1) >>> 1;
                RunnableScheduledFuture<?> e = queue[parent];
                if (key.compareTo(e) >= 0)
                    break;
                queue[k] = e;
                setIndex(e, k);
                k = parent;
            }
            queue[k] = key;
            setIndex(key, k);
        }

在执行add方法时内部执行的是offer方法,添加RunnableScheduledFuture任务到队列时先通过内部的ReentrantLock加锁,因此在多线程调用schedule(Runnable command,long delay, TimeUnit unit)添加任务时也能保证同步。

接下来先判断队列是否已满,若已满就先通过grow方法扩容。扩容算法是将现有容量*1.5,然后将旧的数组复制到新的数组。(左移一位等于除以2)。

然后判断插入的是否为第一个任务,如果是就将RunnableScheduledFuture向下转型为ScheduledFutureTask,并将其heapIndex 属性设置为0.

如果不是第一个任务,则执行siftUp方法。该方法先找到父亲RunnableScheduledFuture对象节点,将要插入的RunnableScheduledFuture节点与之compareTo比较,若父亲RunnableScheduledFuture对象的启动时间小于当前要插入的节点的启动时间,则将节点插入到末尾。反之会对二叉树以启动时间升序重新排序RunnableScheduledFuture接口的实现其实是ScheduledFutureTask类:

    new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit);

    private long triggerTime(long delay, TimeUnit unit) {
        return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
    }

    /**
     * Returns the trigger time of a delayed action.
     */
    long triggerTime(long delay) {
        return now() +
            ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
    }

    final long now() {
        return System.nanoTime();
    }

第三个参数triggerTime方法返回的就是任务延时的时间加上当前时间。

在ScheduledFutureTask内部实现了compareTo方法:

public int compareTo(Delayed other) {
            if (other == this) // compare zero if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
        }

比较的两个任务的启动时间。所以DelayedWorkQueue内部的二叉树是以启动时间早晚排序的。

poll方法

        public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null) {
                        //情况一 空队列
                        if (nanos <= 0)
                            return null;
                        else
                            nanos = available.awaitNanos(nanos);
                    } else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            //情况二 已到启动时间  
                            return finishPoll(first);
                        if (nanos <= 0)
                            //情况三 未到启动时间,但是线程等待超时
                            return null;
                        first = null; // don't retain ref while waiting
                        if (nanos < delay || leader != null)
                            nanos = available.awaitNanos(nanos);
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                long timeLeft = available.awaitNanos(delay);
                                nanos -= delay - timeLeft;
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }

非核心线程会通过poll方法同步获取任务队列中的RunnableScheduledFuture,如果队列为空或者在timeout内还等不到任务的启动时间,都将返回null。如果任务队列不为空,并且首个任务已到启动时间线程就能够获取RunnableScheduledFuture任务并执行run方法。

take方法

        public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null)
                        available.await();
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            return finishPoll(first);
                        first = null; // don't retain ref while waiting
                        if (leader != null)
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }

与非核心线程执行的poll方法相比,核心线程执行的take方法并不会超时,在获取到首个将要启动的任务前,核心线程会一直阻塞。

finishPoll方法

        private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
            int s = --size;
            RunnableScheduledFuture<?> x = queue[s];
            queue[s] = null;
            if (s != 0)
                siftDown(0, x);
            setIndex(f, -1);
            return f;
        }

在成功获取任务后,DelayedWorkQueue的finishPoll方法会将任务移除队列,并以启动时间升序重排二叉树。

小结

DelayedWorkQueue内部维持了一个以任务启动时间升序排序的二叉树数组,启动时间最靠前的任务即数组的首个位置上的任务。核心线程通过take方法一直阻塞直到获取首个要启动的任务。非核心线程通过poll方法会在timeout时间内阻塞尝试获取首个要启动的任务,如果超过timeout未得到任务不会继续阻塞。

这里要特别说明要启动的任务指的是RunnableScheduledFuture内部的time减去当前时间小于等于0,未满足条件的任务不会被take或poll方法返回,这也就保证了未到指定时间任务不会执行。

执行ScheduledFutureTask

前面已经分析了schedule方法如何将RunnableScheduledFuture插入到DelayedWorkQueue,Worker内的线程如何获取定时任务。下面分析任务的执行过程,即ScheduledFutureTask的run方法:

        public void run() {
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();
                reExecutePeriodic(outerTask);
            }
        }

如果执行的是非周期型任务,调用ScheduledFutureTask.super.run()方法,即ScheduledFutureTask的父类FutureTask的run方法。FutureTask的run方法已经在ThreadPoolExecutor篇分析过,这里不再多说。

如果执行的是周期型任务,则执行ScheduledFutureTask.super.runAndReset():

    protected boolean runAndReset() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    c.call(); // don't set result
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        return ran && s == NEW;
    }

这个方法同run方法比较的区别是call方法执行后不设置结果,因为周期型任务会多次执行,所以为了让FutureTask支持这个特性除了发生异常不设置结果。

执行完任务后通过setNextRunTime方法计算下一次启动时间:

        private void setNextRunTime() {
            long p = period;
            if (p > 0)
                //情况一
                time += p;
            else
                //情况二
                time = triggerTime(-p);
        }

        long triggerTime(long delay) {
            return now() +
                ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
        }

还记得ScheduledThreadPoolExecutor执行定时任务的后两种scheduleAtFixedRate和scheduleWithFixedDelay。
scheduleAtFixedRate会执行到情况一,下一次任务的启动时间最早为上一次任务的启动时间加period。
scheduleWithFixedDelay会执行到情况二,这里很巧妙的将period参数设置为负数到达这段代码块,在此又将负的period转为正数。情况二将下一次任务的启动时间设置为当前时间加period。

然后将任务再次添加到任务队列:

    void reExecutePeriodic(RunnableScheduledFuture<?> task) {
        if (canRunInCurrentRunState(true)) {
            super.getQueue().add(task);
            if (!canRunInCurrentRunState(true) && remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }

ScheduledFuture的get方法

既然ScheduledFuture的实现是ScheduledFutureTask,而ScheduledFutureTask继承自FutureTask,所以ScheduledFuture的get方法的实现就是FutureTask的get方法的实现,FutureTask的get方法的实现分析在ThreadPoolExecutor篇已经写过,这里不再叙述。要注意的是ScheduledFuture的get方法对于非周期任务才是有效的。

ScheduledThreadPoolExecutor总结

  • ScheduledThreadPoolExecutor是实现自ThreadPoolExecutor的线程池,构造方法中传入参数n,则最多会有n个核心线程工作,空闲的核心线程不会被自动终止,而是一直阻塞在DelayedWorkQueue的take方法尝试获取任务。构造方法传入的参数为0,ScheduledThreadPoolExecutor将以非核心线程工作,并且最多只会创建一个非核心线程,参考上文中ensurePrestart方法的执行过程。而这个非核心线程以poll方法获取定时任务之所以不会因为超时就被回收,是因为任务队列并不为空,只有在任务队列为空时才会将空闲线程回收,详见ThreadPoolExecutor篇的runWorker方法,之前我以为空闲的非核心线程超时就会被回收是不正确的,还要具备任务队列为空这个条件。
  • ScheduledThreadPoolExecutor的定时执行任务依赖于DelayedWorkQueue,其内部用可扩容的数组实现以启动时间升序的二叉树。
  • 工作线程尝试获取DelayedWorkQueue的任务只有在任务到达指定时间才会成功,否则非核心线程会超时返回null,核心线程一直阻塞。
  • 对于非周期型任务只会执行一次并且可以通过ScheduledFuture的get方法阻塞得到结果,其内部实现依赖于FutureTask的get方法。
  • 周期型任务通过get方法无法获取有效结果,因为FutureTask对于周期型任务执行的是runAndReset方法,并不会设置结果。周期型任务执行完毕后会重新计算下一次启动时间并且再次添加到DelayedWorkQueue中。

在源代码的分析过程中发现分析DelayedWorkQueue还需要有二叉树的升序插入算法的知识,一开始也没有认出来这种数据结构,后来又看了别人的文章才了解。这里比较难理解,有兴趣的同学可以参考《深度解析Java8 – ScheduledThreadPoolExecutor源码解析》

相关文章

网友评论

    本文标题:Java线程池原理分析ScheduledThreadPoolEx

    本文链接:https://www.haomeiwen.com/subject/cmksqxtx.html