下面我们来说一下ScheduledThreadPoolExecutor。我们之前说过,定时任务的线程池不是直接用ThreadPoolExecutor实现的,其实是用ScheduledThreadPoolExecutor实现的,下面我们就来分析一下定时任务的线程池的原理。
我们来看一下ScheduledThreadPoolExecutor的构造方法
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
其实也是用的ThreadPoolExecutor实现的,只不过阻塞队列用的是DelayedWorkQueue,这个队列有一点像DelayQueue,是一个带优先级的延时队列。在队列里面放的是RunnableScheduledFuture,在ScheduledThreadPoolExecutor有一个内部类ScheduledFutureTask,这个类实现了RunnableScheduledFuture接口,我们来看一下这个类
ScheduledFutureTask(Callable<V> callable, long ns) {
super(callable);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
这个类继承了FutureTask,所以可以提交一个Callable对象,看一下compareTo方法
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
其实就是按队列的执行时间进行排序,DelayedWorkQueue会执行时间最前的任务。再来看一下run方法
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
判断线程池的执行状态,然后执行任务。
再看一下delayedExecute方法
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
先判断线程池的执行状态,然后去任务队列里取出任务执行,这个任务就是当前时间最前面的那个任务,然后从队列中删除任务,然后取消执行任务。在ScheduledFutureTask的getDelay方法会计算下次任务执行的时间,当队列中的第一个任务取出时,按任务执行时间执行任务。
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
再来看一下schedule方法
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
其实也是一样,创建ScheduledFutureTask对象执行任务。
ScheduledThreadPoolExecutor就分析到这里了。
网友评论