前言
在上一篇线程池的文章《Java线程池原理分析ThreadPoolExecutor篇》中从ThreadPoolExecutor源码分析了其运行机制。限于篇幅,留下了ScheduledThreadPoolExecutor未做分析,因此本文继续从源代码出发分析ScheduledThreadPoolExecutor的内部原理。
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor同ThreadPoolExecutor一样也可以从 Executors线程池工厂创建,所不同的是它具有定时执行,以周期或间隔循环执行任务等功能。
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
}
public interface ScheduledExecutorService extends ExecutorService {
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,因此它具有ThreadPoolExecutor的所有能力。
通过super方法的参数可知,核心线程的数量即传入的参数,而线程池的线程数为Integer.MAX_VALUE,几乎为无上限。
这里采用了DelayedWorkQueue任务队列,也是定时任务的核心,留在后面分析。
ScheduledThreadPoolExecutor实现了ScheduledExecutorService 中的接口:
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
延时执行Callable任务的功能。
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
延时执行Runnable任务的功能。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
可以延时循环执行周期性任务。
假设任务执行时间固定2s,period为1s,因为任务的执行时间大于规定的period,所以任务会每隔2s(任务执行时间)开始执行一次。如果任务执行时间固定为0.5s,period为1s,因为任务执行时间小于period,所以任务会每隔1s(period)开始执行一次。实际任务的执行时间即可能是大于period的,也可能小于period,scheduleAtFixedRate的好处就是每次任务的开始时间间隔必然大于等于period。
假设一项业务需求每天凌晨3点将数据库备份,然而数据库备份的时间小于24H,最适合用scheduleAtFixedRate方法实现。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
可以延时以相同间隔时间循环执行任务。
假设任务执行的时间固定为2s,delay为1s,那么任务会每隔3s(任务时间+delay)开始执行一次。
如果业务需求本次任务的结束时间与下一个任务的开始时间固定,使用scheduleWithFixedDelay可以方便地实现业务。
ScheduledFuture
四个执行任务的方法都返回了ScheduledFuture对象,它与Future有什么区别?
public interface ScheduledFuture<V> extends Delayed, Future<V> {
}
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
public interface Comparable<T> {
public int compareTo(T o);
}
可以看到ScheduledFuture也继承了Future,并且继承了Delayed,增加了getDelay方法,而Delayed继承自Comparable,所以具有compareTo方法。
四种执行定时任务的方法
schedule(Runnable command,long delay, TimeUnit unit)
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
这个方法中出现了几个陌生的类,首先是ScheduledFutureTask:
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
...
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {
boolean isPeriodic();
}
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
这个类是ScheduledThreadPoolExecutor的内部类,继承自FutureTask实现了RunnableScheduledFuture接口。RunnableScheduledFuture有些复杂,继承自RunnableFuture和ScheduledFuture接口。可见ScheduledThreadPoolExecutor身兼多职。这个类既可以作为Runnable被线程执行,又可以作为FutureTask用于获取Callable任务call方法返回的结果。
在FutureTask的构造方法中传入Runnable对象会将其转换为返回值为null的Callable对象。
/**
* Modifies or replaces the task used to execute a runnable.
* This method can be used to override the concrete
* class used for managing internal tasks.
* The default implementation simply returns the given task.
*
* @param runnable the submitted Runnable
* @param task the task created to execute the runnable
* @param <V> the type of the task's result
* @return a task that can execute the runnable
* @since 1.6
*/
protected <V> RunnableScheduledFuture<V> decorateTask(
Runnable runnable, RunnableScheduledFuture<V> task) {
return task;
}
从decorateTask的字面意义判断它将具体的RunnableScheduledFuture实现类向上转型为RunnableScheduledFuture接口。从它的方法描述和实现看出它只是简单的将ScheduledFutureTask向上转型为RunnableScheduledFuture接口,由protected 修饰符可知设计者希望子类扩展这个方法的实现。
之所以向上转型为RunnableScheduledFuture接口,设计者也是希望将具体与接口分离。
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
//情况一
reject(task);
else {
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
//情况二
task.cancel(false);
else
//情况三
ensurePrestart();
}
}
boolean canRunInCurrentRunState(boolean periodic) {
return isRunningOrShutdown(periodic ?
continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
final boolean isRunningOrShutdown(boolean shutdownOK) {
int rs = runStateOf(ctl.get());
return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}
delayedExecute方法负责执行延时任务。
情况一 : 先判断线程池是否关闭,若关闭则拒绝任务。
情况二:线程池未关闭,将任务添加到父类的任务队列,即DelayedWorkQueue中。下面再次判断线程池是否关闭,并且判断canRunInCurrentRunState方法的返回值是否为false。因为传入Runnable参数,task.isPeriodic()为false,所以isRunningOrShutdown返回true。所以这里不会执行到。
情况三:任务成功添加到任务队列,执行ensurePrestart方法。
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
addWorker已经在ThreadPoolExecutor篇分析过,该方法负责同步将线程池数量+1,并且创建Worker对象添加到HashSet中,最后开启Worker对象中的线程。因为RunnableScheduledFuture对象已经被添加到任务队列,Worker中的线程通过getTask方法自然会取到DelayedWorkQueue中的RunnableScheduledFuture任务并执行它的run方法。
这里需要注意的是addWorker方法只在核心线程数未达上限或者没有线程的情况下执行,并不像ThreadPoolExecutor那样可以同时存在多个非核心线程,ScheduledThreadPoolExecutor最多只支持一个非核心线程,除非它终止了不会再创建新的非核心线程。
schedule(Callable<V> callable, long delay, TimeUnit unit)
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
protected <V> RunnableScheduledFuture<V> decorateTask(
Callable<V> callable, RunnableScheduledFuture<V> task) {
return task;
}
ScheduledFutureTask(Callable<V> callable, long ns) {
super(callable);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
与schedule(Runnable command,long delay,TimeUnit unit)相比除了可以通过ScheduledFutureTask的get方法得到返回值外没有区别。
scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
与上述两个方法的区别在于ScheduledFutureTask的构造函数多了参数period,即任务执行的最小周期:
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
与scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)的区别是参数delay传入到ScheduledFutureTask的构造方法中是以负数的形式。
小结
四种延时启动任务的方法除了构造ScheduledFutureTask的参数不同外,运行机制是相同的。先将任务添加到DelayedWorkQueue 中,然后创建Worker对象,启动内部线程轮询DelayedWorkQueue 中的任务。
那么DelayedWorkQueue的add方法是如何实现的,线程轮询DelayedWorkQueue 调用的poll和take方法又如何实现?
回顾getTask方法获取任务时的代码片段:
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
如果我们设置ScheduledThreadPoolExecutor的核心线程数量为0,则执行poll方法。而对于核心线程则执行take方法。
下面分析DelayedWorkQueue 的具体实现。
DelayedWorkQueue
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
private static final int INITIAL_CAPACITY = 16;
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;
}
首先DelayedWorkQueue 是ScheduledThreadPoolExecutor的静态内部类。它的内部有一个RunnableScheduledFuture数组,且初始容量为16.这里提前说明下,queue 数组储存的其实是二叉树结构的索引,这个二叉树其实就是最小堆。
add方法
public boolean add(Runnable e) {
return offer(e);
}
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow();
size = i + 1;
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e);
}
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
private void grow() {
int oldCapacity = queue.length;
int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
if (newCapacity < 0) // overflow
newCapacity = Integer.MAX_VALUE;
queue = Arrays.copyOf(queue, newCapacity);
}
private void setIndex(RunnableScheduledFuture<?> f, int idx) {
if (f instanceof ScheduledFutureTask)
((ScheduledFutureTask)f).heapIndex = idx;
}
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}
在执行add方法时内部执行的是offer方法,添加RunnableScheduledFuture任务到队列时先通过内部的ReentrantLock加锁,因此在多线程调用schedule(Runnable command,long delay, TimeUnit unit)添加任务时也能保证同步。
接下来先判断队列是否已满,若已满就先通过grow方法扩容。扩容算法是将现有容量*1.5,然后将旧的数组复制到新的数组。(左移一位等于除以2)。
然后判断插入的是否为第一个任务,如果是就将RunnableScheduledFuture向下转型为ScheduledFutureTask,并将其heapIndex 属性设置为0.
如果不是第一个任务,则执行siftUp方法。该方法先找到父亲RunnableScheduledFuture对象节点,将要插入的RunnableScheduledFuture节点与之compareTo比较,若父亲RunnableScheduledFuture对象的启动时间小于当前要插入的节点的启动时间,则将节点插入到末尾。反之会对二叉树以启动时间升序重新排序RunnableScheduledFuture接口的实现其实是ScheduledFutureTask类:
new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit);
private long triggerTime(long delay, TimeUnit unit) {
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
/**
* Returns the trigger time of a delayed action.
*/
long triggerTime(long delay) {
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
final long now() {
return System.nanoTime();
}
第三个参数triggerTime方法返回的就是任务延时的时间加上当前时间。
在ScheduledFutureTask内部实现了compareTo方法:
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
比较的两个任务的启动时间。所以DelayedWorkQueue内部的二叉树是以启动时间早晚排序的。
poll方法
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null) {
//情况一 空队列
if (nanos <= 0)
return null;
else
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
//情况二 已到启动时间
return finishPoll(first);
if (nanos <= 0)
//情况三 未到启动时间,但是线程等待超时
return null;
first = null; // don't retain ref while waiting
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
非核心线程会通过poll方法同步获取任务队列中的RunnableScheduledFuture,如果队列为空或者在timeout内还等不到任务的启动时间,都将返回null。如果任务队列不为空,并且首个任务已到启动时间线程就能够获取RunnableScheduledFuture任务并执行run方法。
take方法
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
与非核心线程执行的poll方法相比,核心线程执行的take方法并不会超时,在获取到首个将要启动的任务前,核心线程会一直阻塞。
finishPoll方法
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
int s = --size;
RunnableScheduledFuture<?> x = queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);
setIndex(f, -1);
return f;
}
在成功获取任务后,DelayedWorkQueue的finishPoll方法会将任务移除队列,并以启动时间升序重排二叉树。
小结
DelayedWorkQueue内部维持了一个以任务启动时间升序排序的二叉树数组,启动时间最靠前的任务即数组的首个位置上的任务。核心线程通过take方法一直阻塞直到获取首个要启动的任务。非核心线程通过poll方法会在timeout时间内阻塞尝试获取首个要启动的任务,如果超过timeout未得到任务不会继续阻塞。
这里要特别说明要启动的任务指的是RunnableScheduledFuture内部的time减去当前时间小于等于0,未满足条件的任务不会被take或poll方法返回,这也就保证了未到指定时间任务不会执行。
执行ScheduledFutureTask
前面已经分析了schedule方法如何将RunnableScheduledFuture插入到DelayedWorkQueue,Worker内的线程如何获取定时任务。下面分析任务的执行过程,即ScheduledFutureTask的run方法:
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
如果执行的是非周期型任务,调用ScheduledFutureTask.super.run()方法,即ScheduledFutureTask的父类FutureTask的run方法。FutureTask的run方法已经在ThreadPoolExecutor篇分析过,这里不再多说。
如果执行的是周期型任务,则执行ScheduledFutureTask.super.runAndReset():
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}
这个方法同run方法比较的区别是call方法执行后不设置结果,因为周期型任务会多次执行,所以为了让FutureTask支持这个特性除了发生异常不设置结果。
执行完任务后通过setNextRunTime方法计算下一次启动时间:
private void setNextRunTime() {
long p = period;
if (p > 0)
//情况一
time += p;
else
//情况二
time = triggerTime(-p);
}
long triggerTime(long delay) {
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
还记得ScheduledThreadPoolExecutor执行定时任务的后两种scheduleAtFixedRate和scheduleWithFixedDelay。
scheduleAtFixedRate会执行到情况一,下一次任务的启动时间最早为上一次任务的启动时间加period。
scheduleWithFixedDelay会执行到情况二,这里很巧妙的将period参数设置为负数到达这段代码块,在此又将负的period转为正数。情况二将下一次任务的启动时间设置为当前时间加period。
然后将任务再次添加到任务队列:
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
ScheduledFuture的get方法
既然ScheduledFuture的实现是ScheduledFutureTask,而ScheduledFutureTask继承自FutureTask,所以ScheduledFuture的get方法的实现就是FutureTask的get方法的实现,FutureTask的get方法的实现分析在ThreadPoolExecutor篇已经写过,这里不再叙述。要注意的是ScheduledFuture的get方法对于非周期任务才是有效的。
ScheduledThreadPoolExecutor总结
- ScheduledThreadPoolExecutor是实现自ThreadPoolExecutor的线程池,构造方法中传入参数n,则最多会有n个核心线程工作,空闲的核心线程不会被自动终止,而是一直阻塞在DelayedWorkQueue的take方法尝试获取任务。构造方法传入的参数为0,ScheduledThreadPoolExecutor将以非核心线程工作,并且最多只会创建一个非核心线程,参考上文中ensurePrestart方法的执行过程。而这个非核心线程以poll方法获取定时任务之所以不会因为超时就被回收,是因为任务队列并不为空,只有在任务队列为空时才会将空闲线程回收,详见ThreadPoolExecutor篇的runWorker方法,之前我以为空闲的非核心线程超时就会被回收是不正确的,还要具备任务队列为空这个条件。
- ScheduledThreadPoolExecutor的定时执行任务依赖于DelayedWorkQueue,其内部用可扩容的数组实现以启动时间升序的二叉树。
- 工作线程尝试获取DelayedWorkQueue的任务只有在任务到达指定时间才会成功,否则非核心线程会超时返回null,核心线程一直阻塞。
- 对于非周期型任务只会执行一次并且可以通过ScheduledFuture的get方法阻塞得到结果,其内部实现依赖于FutureTask的get方法。
- 周期型任务通过get方法无法获取有效结果,因为FutureTask对于周期型任务执行的是runAndReset方法,并不会设置结果。周期型任务执行完毕后会重新计算下一次启动时间并且再次添加到DelayedWorkQueue中。
在源代码的分析过程中发现分析DelayedWorkQueue还需要有二叉树的升序插入算法的知识,一开始也没有认出来这种数据结构,后来又看了别人的文章才了解。这里比较难理解,有兴趣的同学可以参考《深度解析Java8 – ScheduledThreadPoolExecutor源码解析》。
网友评论