- ScheduledThreadPoolExecutor原理分析
- ScheduledThreadPoolExecutor 分析
- 线程池之ScheduledThreadPoolExecutor调
- ScheduledThreadPoolExecutor源码分析
- ScheduledThreadPoolExecutor源码分析
- 33-ScheduledThreadPoolExecutor源码
- ScheduledThreadPoolExecutor原理和源码
- ScheduledThreadPoolExecutor
- ScheduledThreadPoolExecutor
- juc——ScheduledExecutorService、Sc
线程池就是维持几个工作线程,然后从任务队列中获取任务执行。所以要实现延时或者定时执行任务,就要做到以下三点:
- 任务要能返回它的延时时间和是否为定时任务。
- 任务队列要根据任务的延时时间进行排序。这个我们在上一章DelayedWorkQueue原理分析中已经讲解过了。
- 如果是定时任务,任务执行完成之后,还可以再次执行它。
所以要分析ScheduledThreadPoolExecutor原理关键就是对它的任务类ScheduledFutureTask的分析。
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {}
FutureTask类我们已经在以前的文章分析过了,而RunnableScheduledFuture接口的作用是什么呢?
一. RunnableScheduledFuture接口
我们知道要实现延时或者定时执行任务,任务要能返回它的延时时间,任务是否为定时任务,任务能根据延时时间排序。所以可以想象出RunnableScheduledFuture接口中的方法了。
public interface Comparable<T> {
// 比较两个实例的大小
public int compareTo(T o);
}
/**
* 实现Comparable接口,说明Delayed实例可以进行比较
*/
public interface Delayed extends Comparable<Delayed> {
/**
* @return 返回剩余的延时时间
*/
long getDelay(TimeUnit unit);
}
/**
* 实现了Delayed接口,可以返回延时时间以及能够根据延时时间进行比较
*/
public interface ScheduledFuture<V> extends Delayed, Future<V> {
}
// 继承自Runnable和Future接口。可以当做Runnable实例使用
public interface RunnableFuture<V> extends Runnable, Future<V> {
// 运行任务
void run();
}
public interface RunnableScheduledFuture<V> extends
RunnableFuture<V>, ScheduledFuture<V> {
/**
* 是否为周期定时任务
* @return 返回true,表示是定时任务
*/
boolean isPeriodic();
}
RunnableScheduledFuture接口中,最重要的就是这四个方法:
- compareTo(T o): 可以比较任务的延时时间,进行排序用的。
- getDelay(TimeUnit unit): 返回任务剩余的延时时间
- run(): 运行任务。如果是定时任务,任务完成之后,还可以继续执行。
- isPeriodic(): 是否为周期定时任务
二. 何时创建ScheduledFutureTask任务
何时创建ScheduledFutureTask任务,就是ScheduledExecutorService接口四个方法会创建ScheduledFutureTask任务实例。
2.1 创建延时任务
// 给定的延迟时间delay之后,才会执行任务command
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
/**
* 创建一个延时任务ScheduledFutureTask实例。
* triggerTime方法会根据给定的延时时间delay,返回任务开始的时间。
* decorateTask方法是让子类能够修饰ScheduledFutureTask任务实例,本类中没做处理。
*/
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
// 延时执行任务
delayedExecute(t);
return t;
}
// 给定的延迟时间delay之后,才会执行任务callable
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
/**
* 创建一个延时任务ScheduledFutureTask实例。
* triggerTime方法会根据给定的延时时间delay,返回任务开始的时间。
* decorateTask方法是让子类能够修饰ScheduledFutureTask任务实例,本类中没做处理。
*/
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
// 延时执行任务
delayedExecute(t);
return t;
}
这两个方法的流程是一样的,只不过一个是Runnable类型的任务,一个Callable类型的任务。
- 调用triggerTime方法根据给定的延时时间delay,返回任务开始的时间。
- 创建一个延时任务ScheduledFutureTask实例。
- 调用decorateTask方法是让子类能够修饰ScheduledFutureTask任务实例。
- 调用delayedExecute(t)方法延时执行任务。
2.2 创建延时定时任务
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
/**
* 创建一个延时定时任务ScheduledFutureTask实例。
* triggerTime方法会根据给定的延时时间delay,返回任务开始的时间。
*/
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
// decorateTask方法是让子类能够修饰ScheduledFutureTask任务实例,本类中没做处理。
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
// outerTask作用就是下一次定时执行的任务,在reExecutePeriodic方法中需要
sft.outerTask = t;
// 延时执行任务
delayedExecute(t);
return t;
}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
/**
* 创建一个延时定时任务ScheduledFutureTask实例。
* triggerTime方法会根据给定的延时时间delay,返回任务开始的时间。
*/
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
// decorateTask方法是让子类能够修饰ScheduledFutureTask任务实例,本类中没做处理。
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
// outerTask作用就是下一次定时执行的任务,在reExecutePeriodic方法中需要
sft.outerTask = t;
// 延时执行任务
delayedExecute(t);
return t;
}
我们知道scheduleAtFixedRate方法是固定周期时间去执行任务,而scheduleWithFixedDelay方法是在任务完成之后,延时delay时间再去执行任务。
但是我们发现这两个方法几乎是一模一样的,唯一不同地就是创建延时定时任务ScheduledFutureTask实例时,一个传递的是正数period,一个传递的是负数-delay。
- 调用triggerTime方法根据给定的延时时间delay,返回任务开始的时间。
- 创建一个延时定时任务ScheduledFutureTask实例。
- 调用decorateTask方法是让子类能够修饰ScheduledFutureTask任务实例。
- 设置outerTask,作用就是下一次定时执行的任务,在reExecutePeriodic方法中需要。
- 延时执行任务。
2.3 triggerTime方法
/**
* 返回延时任务开始的时间
*/
private long triggerTime(long delay, TimeUnit unit) {
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
/**
* 返回延时任务开始的时间。利用当前时间加上给定的延时时间
*/
long triggerTime(long delay) {
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
2.4 decorateTask方法
/**
* 让子类能够修饰或者替换任务task。
* 这里只是简单地返回task
*/
protected <V> RunnableScheduledFuture<V> decorateTask(
Runnable runnable, RunnableScheduledFuture<V> task) {
return task;
}
2.5 delayedExecute方法
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 如果线程池不是RUNNING状态,那么调用reject(task)方法,
// 拒绝执行任务task
if (isShutdown())
reject(task);
else {
// 将任务添加到任务队列中,会根据任务的延时时间进行排序
super.getQueue().add(task);
/**
* 如果线程池不是RUNNING状态,那么就判断能不能在当前状态下运行,
* 主要就是能不能在SHUTDOWN状态下运行。
* 如果不能在当前状态下运行,那么就调用remove方法,
* 从任务队列中移除刚刚添加的任务task。
*
* 只有移除成功了,才可以调用task.cancel(false)方法取消任务,
* 否则这个延时任务task都还要执行。
*/
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
// 预先启动工作线程,确保线程池中有工作线程。
ensurePrestart();
}
}
这个方法的主要作用就是将任务添加到任务队列中,因为这里任务队列是优先级队列DelayedWorkQueue,它会根据任务的延时时间进行排序。
- 如果线程池不是RUNNING状态,不能执行延时任务task,那么调用reject(task)方法,拒绝执行任务task。
- 将任务添加到任务队列中,会根据任务的延时时间进行排序。
- 因为是多线程并发环境,就必须判断在添加任务的过程中,线程池状态是否被别的线程更改了,那么就可能要取消任务了。
- 将任务添加到任务队列后,还要确保线程池中有工作线程,不然任务也不为执行。所以ensurePrestart()方法预先启动工作线程,确保线程池中有工作线程。
与ThreadPoolExecutor类中execute方法执行任务方法不同:
- 因为是延时任务task,所以不能将任务当成工作线程第一个任务,只能将任务添加到任务队列中,等待着工作线程来执行。
- 任务队列DelayedWorkQueue容量几乎是无限的,所以也不需要最大池。除非核心池数量是0,那么必须创建一个工作线程来运行任务,否则线程池的线程数不可能超过核心池数量。
- 当线程池状态被别的线程改变时,取消任务的判断条件不同。当线程池状态不是RUNNING,而且不能在当前状态下运行,那么就调用remove方法,移除刚刚添加的任务task。只有移除任务成功,才可以调用task.cancel(false)方法取消任务,否则这个延时任务task都还要执行。
2.6 canRunInCurrentRunState方法
// 判断能不能在当前状态下运行
boolean canRunInCurrentRunState(boolean periodic) {
// 调用父类ThreadPoolExecutor中的isRunningOrShutdown方法
return isRunningOrShutdown(periodic ?
continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}
/**
* 是ThreadPoolExecutor中的方法
* @param shutdownOK 表示线程池能在SHUTDOWN状态下运行
*/
final boolean isRunningOrShutdown(boolean shutdownOK) {
// 线程状态
int rs = runStateOf(ctl.get());
// 当线程是RUNNING状态,或者是SHUTDOWN状态且shutdownOK也为true
return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}
canRunInCurrentRunState方法就是返回线程池能不能在当前状态下运行。
注: continueExistingPeriodicTasksAfterShutdown与executeExistingDelayedTasksAfterShutdown都是提供给外部设置。
2.7 ensurePrestart方法
void ensurePrestart() {
// 线程池中的线程数量
int wc = workerCountOf(ctl.get());
// 如果小于核心池数量,就创建新的工作线程
if (wc < corePoolSize)
addWorker(null, true);
// 说明corePoolSize数量是0,必须创建一个工作线程来执行任务
else if (wc == 0)
addWorker(null, false);
}
ensurePrestart方法作用:预先启动工作线程,确保线程池中有工作线程。
三. ScheduledFutureTask类
ScheduledFutureTask是一个延时定时任务,它可以返回任务剩余延时时间,可以被周期性地执行。
3.1 重要成员属性
/** 是一个序列,每次创建任务的时候,都会自增。 */
private final long sequenceNumber;
/** 任务能够开始执行的时间 */
private long time;
/**
* 任务周期执行的时间
* 0表示不是一个周期定时任务
* 正数表示固定周期时间去执行任务
* 负数表示任务完成之后,延时period时间再去执行任务
*/
private final long period;
/** 表示再次执行的任务,在reExecutePeriodic中调用 */
RunnableScheduledFuture<V> outerTask = this;
/**
* 表示在任务队列中的索引位置,用来支持快速从队列中删除任务。
*/
int heapIndex;
属性说明:
- sequenceNumber: 是一个序列,每次创建任务的时候,都会自增。
- time: 任务能够开始执行的时间。
- period: 任务周期执行的时间。0表示不是一个周期定时任务。
- outerTask: 表示再次执行的任务,在reExecutePeriodic中调用
- heapIndex: 表示在任务队列中的索引位置,用来支持快速从队列中删除任务。
3.2 构造函数
3.2.1 创建延时任务
/**
* 创建延时任务
*/
ScheduledFutureTask(Runnable r, V result, long ns) {
// 调用父类的方法
super(r, result);
// 任务开始的时间
this.time = ns;
// period是0,不是一个周期定时任务
this.period = 0;
// 每次创建任务的时候,sequenceNumber都会自增
this.sequenceNumber = sequencer.getAndIncrement();
}
/**
* 创建延时任务
*/
ScheduledFutureTask(Callable<V> callable, long ns) {
// 调用父类的方法
super(callable);
// 任务开始的时间
this.time = ns;
// period是0,不是一个周期定时任务
this.period = 0;
// 每次创建任务的时候,sequenceNumber都会自增
this.sequenceNumber = sequencer.getAndIncrement();
}
3.2.2 创建延时定时任务
/**
* 创建延时定时任务
*/
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
// 调用父类的方法
super(r, result);
// 任务开始的时间
this.time = ns;
// 周期定时时间
this.period = period;
// 每次创建任务的时候,sequenceNumber都会自增
this.sequenceNumber = sequencer.getAndIncrement();
}
3.3 运行任务run方法
public void run() {
// 是否是周期任务
boolean periodic = isPeriodic();
// 如果不能在当前状态下运行,那么就要取消任务
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 如果只是延时任务,那么就调用run方法,运行任务。
else if (!periodic)
ScheduledFutureTask.super.run();
// 如果是周期定时任务,调用runAndReset方法,运行任务。
// 这个方法不会改变任务的状态,所以可以反复执行。
else if (ScheduledFutureTask.super.runAndReset()) {
// 设置周期任务下一次执行的开始时间time
setNextRunTime();
// 重新执行任务outerTask
reExecutePeriodic(outerTask);
}
}
这个方法会在ThreadPoolExecutor的runWorker方法中调用,而且这个方法调用,说明肯定已经到了任务的开始时间time了。
- 先判断当前线程状态能不能运行任务,如果不能,就调用cancel()方法取消本任务。
- 如果任务只是一个延时任务,那么调用父类的run()运行任务,改变任务的状态,表示任务已经运行完成了。
- 如果任务只是一个周期定时任务,那么就任务必须能够反复执行,那么就不能调用run()方法,它会改变任务的状态。而是调用runAndReset()方法,只是简单地运行任务,而不会改变任务状态。
- 设置周期任务下一次执行的开始时间time,并重新执行任务。
setNextRunTime设置任务下一次执行的开始时间time的方法:
/**
* 设置任务下一次执行的开始时间time
*/
private void setNextRunTime() {
// 周期时间
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
我们发现当p大于0的时候,调用time += p,那么就可能出现计算后的time可能小于当前时间,因为任务执行时间超过了周期时间p,所以将这个任务添加到任务队列中,它就会立即执行。
而当p小于0的时候,调用time = triggerTime(-p)方法,就是在当前时间上,再加上-p的延时时间,所以这个任务添加到任务队列中,必须延时-p时间后,才能执行。
reExecutePeriodic重新执行任务的方法:
/**
* 重新执行任务task
*/
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
// 判断当前线程池状态能不能运行任务
if (canRunInCurrentRunState(true)) {
// 将任务添加到任务队列,会根据任务延时时间进行排序
super.getQueue().add(task);
// 如果线程池状态改变了,当前状态不能运行任务,那么就尝试移除任务,
// 移除成功,就取消任务。
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
// 预先启动工作线程,确保线程池中有工作线程。
ensurePrestart();
}
}
这个方法与delayedExecute方法很像,都是将任务添加到任务队列中。
- 如果当前线程池状态能够运行任务,那么任务添加到任务队列。
- 如果在在添加任务的过程中,线程池状态是否被别的线程更改了,那么就要进行判断,是否需要取消任务。
- 调用ensurePrestart()方法,预先启动工作线程,确保线程池中有工作线程。
3.4 其他重要方法
3.4.1 取消任务
public boolean cancel(boolean mayInterruptIfRunning) {
// 调用父类的cancel方法取消任务
boolean cancelled = super.cancel(mayInterruptIfRunning);
// 将任务从任务队列中移除
if (cancelled && removeOnCancel && heapIndex >= 0)
remove(this);
return cancelled;
}
3.4.2 getDelay方法
// 返回任务剩余延时时间
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
3.4.3 compareTo方法
public int compareTo(Delayed other) {
if (other == this)
return 0;
// 如果是ScheduledFutureTask实例
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
// 比较任务开始时间
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
// 比较那么任务是新创建的,因为新创建的sequenceNumber小一点。
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
// 比较任务的剩余延时时间
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
3.4.4 isPeriodic方法
/**
* 是否是周期定时任务
*/
public boolean isPeriodic() {
return period != 0;
}
四. 创建延时定时线程池
4.1 构造函数
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
都是调用父类ThreadPoolExecutor构造函数的方法,唯一要注意的地方就是任务队列只能是DelayedWorkQueue实例,用户没有办法更换ScheduledThreadPoolExecutor的任务队列属性。
4.2 创建单个线程的定时线程池
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor(
ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
4.3 创建固定数量的定时线程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
网友评论