延迟以及周期性执行线程池。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
堵塞队列为DelayedWorkQueue,这是ScheduledThreadPoolExecutor的内部类。看其grow扩容方法会发现,这个队列最大大小为Integer.MAX_VALUE。所以队列可以放置很多元素。
看堵塞队列的DelayedWorkQueue的offer方法
可以看出offer的元素是RunnableScheduledFuture类型。详细的流程不看,设计堆的调整等操作,目的是保证一个有优先级级的队列,延迟时间短的优先级高,在队列的最前面。
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
// offer的元素是RunnableScheduledFuture类型
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow();
size = i + 1;
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e);
}
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
看DelayedWorkQueue队列保存的元素类型RunnableScheduledFuture。
RunnableScheduledFuture的唯一一个自己的方法是isPeriodic,判断这个任务时候是周期性任务。
RunnableScheduledFuture的父类是RunnableFuture继承了Runnable接口,所以RunnableScheduledFuture也可以看成一个线程。
还有一个父类是ScheduledFuture,其继承了Delayed接口,Delayed接口唯一的方法就是获取任务的剩余延迟时间,以供延迟队列的延迟获取元素。
所以RunnableScheduledFuture类型元素,即是一个线程,有run方法,也可以判断是否周期性执行,又可以获取任务剩余延迟时间。
public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {
/**
* Returns {@code true} if this task is periodic. A periodic task may
* re-run according to some schedule. A non-periodic task can be
* run only once.
*
* @return {@code true} if this task is periodic
*/
boolean isPeriodic();
}
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
public interface ScheduledFuture<V> extends Delayed, Future<V> {
}
看堵塞队列的take方法
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
// 获取队列顶部元素的剩余延迟时间
long delay = first.getDelay(NANOSECONDS);
// 剩余延迟时间小于0,则finishPoll调整堆,然后直接返回这个元素
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
// 剩余延迟时间大于0。如果leader不为空,说明已经有线程成为leader并等待堆顶任务
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 堵塞等待delay时间
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
take方法总结:take方法和普通的延迟队列一样,比如delayQueue。等待队列头部元素,剩余延迟时间过了之后,才能获取到值。
DelayedWorkQueue延迟队列分析完了之后,分析ScheduledThreadPoolExecutor的关键方法。
构造方法:除了堵塞队列以外,和ThreadPoolExcutor差不多,但是由于DelayedWorkQueue堵塞队列是无限大的,所以,不存在最大线程数以及最大线程数空闲时间。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
execute方法:延迟时间为0,所以正常执行,该启动新线程则启动新线程,该放入队列则放入队列。
public void execute(Runnable command) {
schedule(command, 0, NANOSECONDS);
}
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
// decorateTask方法只是返回最后一个入参,什么都没做。
// ScheduledFutureTask类是RunnableScheduledFuture的实现类,所以ScheduledFutureTask类可以放入DelayWorkQueue堵塞队列
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
// delayedExecute方法把RunnableScheduledFuture放入堵塞队列,如果线程数小于核心线程,则开启线程从队列中取任务
delayedExecute(t);
return t;
}
// ScheduledFutureTask构造方法,period为0,说明不是周期性任务,time表示延迟时间。
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
// 放入堵塞队列
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
// 如果线程数小于核心线程,则开启线程从队列中取任务
ensurePrestart();
}
}
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
// 已经开启线程数小于核心线程数,则开启新线程,
// addWorker传入的第一参数为null,也就是worker的firstTask为null,所以直接从getTask()直接从队列中获取任务,
// 堵塞队列是延迟队列,所以可能会延迟获取到任务。
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
之后的流程就是,和和普通线程池一样,ThreadPoolExecutor的真正工作的内部类worker,其就是开启的线程类的runnable入参,线程start启动的时候,会运行workker的run方法,workker的run方法会循环从队列中获取任务执行,因为是延迟队列,所以会又延迟的效果,延迟的原理前面分析堵塞队列的时候分析了,condition.awaitNanos方法实现延迟等待。
获取任务后,运行任务的run方法。
public void run() {
// 是否周期执行
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
// 不周期执行,则直接执行
ScheduledFutureTask.super.run();
// 如果周期执行,则重置任务,再放入队列等待执行
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
scheduleAtFixedRate:该方法在initialDelay时长后第一次执行任务,以后每隔period时长,再次执行任务。注意,period是从任务开始执行算起的。开始执行任务后,定时器每隔period时长检查该任务是否完成,如果完成则再次启动任务,否则等该任务结束后才再次启动任务,看下图示例。
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period)
scheduleWithFixDelay:该方法在initialDelay时长后第一次执行任务,以后每当任务执行完成后,等待delay时长,再次执行任务,看下图示例。
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay)
和scheduledAtFixedRate类似,唯一不同的地方在于在于创建的ScheduledFutureTask不同
网友评论