基本用法
class Main {
public static void main(String[] args) {
ScheduledThreadPoolExecutor poolExecutor = new ScheduledThreadPoolExecutor(1);
poolExecutor.scheduleAtFixedRate(new task(), 5, 10, TimeUnit.SECONDS);
}
public static class task implements Runnable {
@Override
public void run() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
带着问题看源码
实战案例

之前写过的一段原生Java定期执行任务的代码 设想效果是每天早上八点跑一次任务
写上面那段代码时碰到两个问题
一、ScheduledThreadPool好像还挺复杂 比起直接用while(true) sleep 用 ScheduledThreadPool这种方式有什么性能优势吗
二、因为数据量很大可能耗时很久 当设置每天8点执行一次 如果到第二天8点 上一次的任务还没执行完成 会发生什么

类结构

实现了ScheduledExecutorService接口并继承了ThreadPoolExecutor
同样是出自熟悉的狗哥之手
先拖到最后看下本次目标多少 也不多 加上注释以及最后一行留白一共1284行

提交任务
提交任务的三种方式
- pool.schedule 提交一次性任务
- pool.scheduleWithFixedDelay 提交周期任务(固定时间间隔)
- pool.scheduleAtFixedRate 提交周期任务(固定执行时间)

提交任务方式一

提交任务方式二
与上一个方法基本一致 两处不同
1)多了一步outerTask赋值
2)多传了一个参数 delay
执行延迟时间

提交任务方式三
与方式二代码几乎一模一样 (找不同 重要伏笔)
一个想法:这里是不是可以用个方法重载减少代码重复呢

接下来看三种提交方式都有的核心方法:delayedExcute
( ) 将任务加入父类队列



任务最终被加到了父类ThreadPoolExecutor
中的workQueue
工作队列中

提交阶段总结:提交任务方法完成了两件事:
1)把用户传入的runnable任务封装成一个ScheduledFutureTask
内部类对象
2)将该ScheduledFutureTask添加到父类ThreadPoolExecutor
的workQueue
工作队列中
执行任务
用户提交的任务被封装成ScheduledFutureTask
该内部类中有一个run方法用于执行任务
内部类ScheduledFutureTask源码
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
//任务添加到ScheduledThreadPoolExecutor中被分配的唯一序列号
private final long sequenceNumber;
//该任务的执行时间
private long time;
//该任务的循环周期(延迟时间)
private final long period;
//重新入队的实际任务 实际指向当前对象本身
RunnableScheduledFuture<V> outerTask = this;
//当前任务在延迟队列中的索引
//能够更加方便的取消当前任务
int heapIndex;
//构造方法1
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
//构造方法2
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
//构造方法3
ScheduledFutureTask(Callable<V> callable, long ns) {
super(callable);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
//该任务还有多久执行
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
//比较两个任务谁先执行
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>) other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
//如果下次执行任务的时间相同,则会比较任务的sequenceNumber值,
//sequenceNumber值小的任务会排在前面。
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
//是否周期性任务
public boolean isPeriodic() {
return period != 0;
}
// 设置下一次的执行时间
// time 为上一次任务的开始执行时间
// p > 0 说明是提交方式二 固定频率执行 time += p 在上一次开始时间的基础上计算下一次执行时间
// p < 0 说明是提交方式三 固定延迟执行 转回正数之后调用一次triggerTime 上次任务执行完毕now() + 执行周期p 而不是time+p
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
//overflowFree方法 防止溢出 任务运行足够久这个long是有可能溢出
long triggerTime(long delay) {
// 上次任务执行完毕now() + delay
return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = super.cancel(mayInterruptIfRunning);
if (cancelled && removeOnCancel && heapIndex >= 0)
remove(this);
return cancelled;
}
public void run() {
boolean periodic = isPeriodic();
//检查当前状态是否能运行 不能运行则取消任务
if (!canRunInCurrentRunState(periodic))
cancel(false);
//如果能运行 则再判断是否周期性任务
else if (!periodic)
//不是周期性任务则直接调用run方法执行一次就行了
ScheduledFutureTask.super.run();
//如果是需要周期性执行的任务 则调用runAndReset方法
//runAndReset返回true代表执行成功了 然后计算并设置出下一次的执行时间
else if (ScheduledFutureTask.super.runAndReset()) {
// 设置下一次执行时间
setNextRunTime();
// 将该任务重新放入队列
reExecutePeriodic(outerTask);
}
}
}

核心方法:FutureTask.runAndReset()
// 执行任务并忽略任务返回值,执行完毕后将该任务重置为初始状态,
// 如果计算遇到异常或被取消,则无法重置回初始状态
// 该方法用于需要重复执行的场景
// 如果成功运行并重置,则返回true
protected boolean runAndReset() {
//如果状态为NEW则接着会通过unsafe类把任务执行线程引用CAS的保存在runner字段中 如果保存失败,则直接返回;
if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
//当成员变量callable不为null且状态为NEW则可以执行
if (c != null && s == NEW) {
try {
c.call(); // don't set result
//任务正常执行完毕
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
//执行完毕之后将runner执行线程置为null
runner = null;
s = state;
//当执行状态为中断 处理中断
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}
是否是周期性任务的判断依据是看period成员变量是否为0 只有提交方式一没传这个参数 默认值0

计算下一次的执行时间时就用到了period的正负符号了

到这里开头的第二个问题似乎能解决了 由于我使用的是方式二scheduleAtFixedRate
p > 0 走的是 time += p (上一次的开始时间+delay )假如开始时间是前一天早上8点
那么算出的下一次time确实是第二天8点
但是实际上第二天9点上一个任务才执行完毕
所以第二天8点 上一个任务还没完成
此时会重新启动一个任务 两个任务一起跑?
还是说只有前一次执行完毕才会处理下一次任务 然后拿到time发现已经错过了第二天8点 然后略过第二次任务 等第三天8点再执行?
想要回答这个问题 关键还得看看从延迟队列中取出任务并执行的那段逻辑是怎么写的

等不及了 直接剧透 实际测试一把

测试结果显示 问题二场景 最后结果既不是错过第二天8点直接第三天8点执行 也不是第二天8点两个任务并行执行 而是的等到第一个任务9点执行完毕之后立即执行第二次任务
执行阶段总结:当定时任务属于周期性任务 那么执行任务的run方法会在任务执行完毕后 计算出下一次的执行时间 并将该任务重新放回DelayWorkQueue
延迟队列
任务调度
上面我们看了任务的run方法 那么是谁去调用这个run方法 又是什么时候去调用呢
延迟阻塞队列DelayedWorkQueue
中放的元素是ScheduledFutureTask
,提交的任务被包装成ScheduledFutureTask
放进工作队列,Woker
工作线程消费工作队列中的任务,即调用ScheduledFutureTask.run()
,ScheduledFutureTask
又调用任务的run()
,这点和ThreadPoolExecutor
差不多,而ScheduledThreadPoolExecutor
是如何实现按时间调度的呢?
猜测可能是有一个专门负责调度的额外线程在监听队列中存储的任务的执行时间是否到了 到了的话就取出任务调用run方法(后面发现猜错了 狗哥使用了一种更加优秀的设计)
DelayedWorkQueue
源码

DelayedWorkQueue
实现自BlockingQueue
是高度定制化的阻塞队列+优先队列 其核心数据结构是二叉最小堆的优先队列,队列满时会自动扩容,所以offer操作永远不会阻塞,maximumPoolSize也就用不上了,所以线程池中永远会保持至多有corePoolSize个工作线程正在运行。
类结构
//位于ScheduledThreadPoolExecutor中的一个内部类
//一种定制化的延迟队列+优先队列 只支持放入RunnableScheduledFutures
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
//DelayedWorkQueue 基于基于堆的数据结构
//类似于 DelayQueue 和 PriorityQueue 中的数据结构,
//除了每个 ScheduledFutureTask 还将其索引记录到堆数组中。
//这消除了在取消时查找任务的需要,大大加快了删除速度(从 O(n)下降到 O(log n)),
//并减少了垃圾保留,否则会因等待元素上升到顶部而发生清除前。
//但是因为队列也可能包含 RunnableScheduledFutures 不是 ScheduledFutureTasks,
//我们不能保证有这样的索引可用,在这种情况下我们回退到线性搜索。
//(我们希望大多数任务不会被修饰,并且速度更快的情况会更常见。)
//所有堆操作都必须记录索引更改——主要是在 siftUp 和 siftDown 中。
//删除后,任务的heapIndex 设置为 -1。请注意,ScheduledFutureTasks
//最多可以在队列中出现一次(对于其他类型的任务或工作队列不需要如此),
//因此由 heapIndex 唯一标识。
// 初始容量
private static final int INITIAL_CAPACITY = 16;
// 队列
private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
// 可重入锁 用于队列元素操作时加锁
private final ReentrantLock lock = new ReentrantLock();
//队列容量
private int size = 0;
//leader线程
private Thread leader = null;
//线程状态控制器
private final Condition available = lock.newCondition();
private void siftUp(int k, RunnableScheduledFuture<?> key) {...}
private void siftDown(int k, RunnableScheduledFuture<?> key) {...}
public boolean offer(Runnable x) {...}
public RunnableScheduledFuture<?> take() throws InterruptedException {...}
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit){...}
...
}
核心成员变量:Condition available
关键字synchronize可以与wait()和nitify()方法相结合实现实现等待/通知模式 类ReentrantLock也可以实现同样的功能,但需要借助condition对象 condition类是在JDK5中出现的技术,使用他有更好的灵活性,比如可以实现多路通知功能
,也就是在一个Lock对象里可以创建多个condition实例,线程对象可以注册在指定的condition中从而选择性的进行线程通知,在调度线程上更加灵活。
- 调用Condition的await()和signal()方法,都必须在lock保护之内,就是说必须在lock.lock()和lock.unlock之间才可以使用
- Conditon中的await()对应Object的wait();
- Condition中的signal()对应Object的notify();
核心成员变量:Thread leader
指定等待队列头部任务的线程。 Leader-Follower 模式的变体。用于减少不必要的定时等待。当一个线程成为领导者时,它只等待下一个任务的延迟时间是否到达,而其他线程则无限期地等待。领导线程必须在从 take() 或 poll(...) 返回之前向某个其他线程发出信号,除非某个其他线程在此期间成为领导。 每当队列的头部被具有较早到期时间的任务替换时,leader 字段将通过重置为空而无效,并且一些等待线程(但不一定是当前的leader)被发出信号。因此,等待线程必须准备好在等待期间获得和失去领导权
核心方法:offer添加元素
ScheduledThreadPoolExecutor
提交任务时调用的是DelayedWorkQueue.add
,而add
、put
等一些对外提供的添加元素的方法都调用了offer
,其基本流程如下:
- 其作为生产者的入口,首先获取锁。
- 判断队列是否要满了(
size >= queue.length
),满了就扩容grow()
。 - 队列未满,size+1。
- 判断添加的元素是否是第一个,是则不需要堆化。
- 添加的元素不是第一个,则需要堆化
siftUp
。 - 如果堆顶元素刚好是此时被添加的元素,则唤醒take线程消费。
- 最终释放锁。
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
//扩容
grow();
size = i + 1;
if (i == 0) {
//如果是入的是第一个元素,不需要堆化
queue[0] = e;
setIndex(e, 0);
} else {
//堆化
siftUp(i, e);
}
if (queue[0] == e) {
//如果堆顶元素刚好是入队列的元素,则唤醒take
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}

核心方法:siftUp向上堆化
新添加的元素先会加到堆底,然后一步步和上面的父亲节点比较,若小于父亲节点则和父亲节点互换位置,循环比较直至大于父亲节点才结束循环。
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
//找到父亲节点
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
if (key.compareTo(e) >= 0)
// 添加的元素 大于父亲节点,则结束循环
break;
//添加的元素小于父亲节点,则位置互换
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}

核心方法:take消费元素
Worker工作线程启动后就会调用take方法循环消费工作队列中的元素
take基本流程如下:
- 首先获取可中断锁,判断堆顶元素是否是空,空的则阻塞等待
available.await()
。 - 堆顶元素不为空,则获取其延迟执行时间
delay
,delay <= 0
说明到了执行时间,出队列finishPoll
。 -
delay > 0
还没到执行时间,判断leader
线程是否为空,不为空则说明有其他take线程也在等待,当前take将无限期阻塞等待。 -
leader
线程为空,当前take线程设置为leader
,并阻塞等待delay
时长。 - 当前leader线程等待delay时长自动唤醒护着被其他take线程唤醒,则最终将
leader
设置为null
。 - 再循环一次判断
delay <= 0
出队列。 - 跳出循环后判断leader为空并且堆顶元素不为空,则唤醒其他take线程,最后是否锁。
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0]; //取出堆顶元素
if (first == null)
//堆为空,等待
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
//到了执行时间,出队列
return finishPoll(first);
first = null; // don't retain ref while waiting
//还没到执行时间
if (leader != null)
//此时若有其他take线程在等待,当前take将无限期等待
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}


核心方法:finishPoll出队列
堆顶元素delay<=0,执行时间到,出队列就是一个向下堆化的过程siftDown
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
int s = --size;
RunnableScheduledFuture<?> x = queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);
setIndex(f, -1);
return f;
}
核心方法:siftDown向下堆化
由于堆顶元素出队列后,就破坏了堆的结构,需要组织整理下,将堆尾元素移到堆顶,然后向下堆化:
- 从堆顶开始,父亲节点与左右子节点中较小的孩子节点比较(左孩子不一定小于右孩子)。
- 父亲节点小于等于较小孩子节点,则结束循环,不需要交换位置。
- 若父亲节点大于较小孩子节点,则交换位置。
- 继续向下循环判断父亲节点和孩子节点的关系,直到父亲节点小于等于较小孩子节点才结束循环。
private void siftDown(int k, RunnableScheduledFuture<?> key) {
//k = 0, key = queue[size-1]
//无符号右移,相当于size/2
int half = size >>> 1;
while (k < half) {
//只需要比较一半
//找到左孩子节点
// child = 2k + 1
int child = (k << 1) + 1;
RunnableScheduledFuture<?> c = queue[child];
//右孩子节点
int right = child + 1;
//比较左右孩子大小
if (right < size && c.compareTo(queue[right]) > 0)
//c左孩子大于右孩子,则将有孩子赋值给左孩子
c = queue[child = right];
//比较key和孩子c
if (key.compareTo(c) <= 0)
//key小于等于c,则结束循环
break;
//key大于孩子c,则key与孩子交换位置
queue[k] = c;
setIndex(c, k);
k = child;
}
queue[k] = key;
setIndex(key, k);
}

核心方法:compareTo比较器
延迟队列是也是个优先队列 每次元素出队和入队也都需要经过堆化调整顺序
根据任务到期时间从小到大排序 通过调用compareTo
方法完成大小判断
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>) other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
ScheduledThreadPool
中的Leader-Follower
模式
定时调度线程池与一般线程池的一个重要不同:提交的任务是延迟而非立即执行的,因此worker线程调用队列的take以取出执行任务必定是要阻塞的。考虑到延迟任务必然有一个先后顺序 因此只需要一个线程使用timed waiting等待队首任务(堆顶)即可
ScheduledThreadPool
采用了Leader-Follower
模式,等待第一个线程的任务也称为leader,其调用available.awaitNanos待当前队列头部任务到达调度时间时唤醒。其余线程作为follower只需调用await方法无限阻塞等待,直至被leader唤醒,并重新完成抢锁->尝试执行队列首元素->抢leader->等待的循环。
举个例子来理解Leader-Follower
模式,拿饭店员工来对照理解。
- 单Reactor多线程模式 饭店的员工一般都是分角色的,比如接待员、服务员、厨师等等。
- 假如有一个叫做A的人,固定他作为饭店接待员,来客人了就分给客人一个座位号,然后交给其他服务员,比如B进行后续处理。
- B会根据座位号为客人引路,为客人点菜等等。
- 如果把A、B比作两个线程,客人比作任务,任务由A处理,到交接给B处理,有一次线程上下文切换。
- Leader-Follower模式 这次饭店不分角色了,每个人都是接待员和服务员,统称为员工。
- 每次只能有一个员工在门口等待,比如A先在门口等待,其他员工在屋里歇着。
- 来客人了的话,A会叫一个其他员工,比如B来门口接替自己。
- 然后A开始为客人服务,比如分配座位号,引路,点菜等全流程服务。
- 拿线程来说的话,就是接受任务,处理任务都是由线程A负责,没有线程上下文切换。
- DelayQueue的Leader-Follower模式 这次饭店也不分角色,都是员工,但是改变了经营策略,每个客人必须预约吃饭时间,预约采用APP预约。因为加入了延时,逻辑变得复杂了一些。
- 每次还是只能有一个员工在门口等待,比如A先在门口等待,A看了眼预约登记表,发现离预约最早到店的时间还有30分钟,A就什么都不干了,先休息30分钟。
- 其他员工还是先在屋里歇着,但是因为采用APP预约,客人约几点都有可能,如果此时有客人约的是10分钟后到店,因为A要30分钟后才能醒来干活,所以如果这位客人来了,门口就没有人接待了。
- 对于这个问题,饭店的软件系统在监听到最早到店时间变了的话,会再叫一个员工来门口等待,此员工可能是新员工B,也可能是叫醒了之前在门口休息的员工A。我们叫这位新员工X。
- 如果新员工X发现最早到店时间是现在,或者客人已经来了,就会叫一个员工C来门口接替自己,并立即开始为客人提供全流程服务。
- 如果新员工X发现最早到店时间是10分钟后,新员工X就像A之前一样,什么都不干了,先休息10分钟。
- 如果最早到店时间没有变化,还是30分钟后,软件系统不会叫人,其他员工看到A在门口等待,自己可以安心的在屋里歇着,等着A叫人替换他。
- 员工A在30分钟后醒来,客人也到了,A会叫一个同事比如B接替自己,而A为客人提供全流程服务。
问题解决
所以问题二到这里应该能解释了 DelayedWorkQueue
延迟队列只能有一个Leader线程 当Leader不为null 即有其他任务在等待执行的时候 下一个take线程无法从堆顶获取并启动任务 (这不废话吗 队首的任务都等着呢 后面的任务肯定没到时间啊)所以其实不是这个原因
问题二之所以隔天9点才能开启第二次任务是因为同一个任务执行完毕之后才会把自己重新放回队列 虽然算出来的执行时间是8点 但是9点才被放回任务队列 一放进去立马被取出来了判断一下delay时间 这个delay时间≤0 (小于0就属于上一个任务超时了导致下一个任务延迟了)都统一算作时间到了 立即执行
一个疑问:为什么不设计成支持多任务并列执行呢 答:其实是支持多任务并列执行的 只不过是不支持同一个任务同时并列执行而已
同时问题一也可以解释了 其实DelayWorkQueue是一个基于堆的优先队列 优化了多个定时任务的执行调度(避免无效监听等待和避免线程上下文切换)
但是如果你只有一个定时任务 直接while true sleep也是可以的
总结
- 任务执行方法位于
ScheduledFutureTask
的父类FutureTask.run()
该方法内部捕获异常且未打印异常信息,难以排查问题,同时周期性执行任务会因为任务代码抛异常而不再设置下次执行时间和把自己放回延迟队列的操作,即不会再周期性执行 因此任务代码一定要自行try-catch
做异常处理而不要直接抛出 -
ScheduledFutureTask
通过一个变量就区分了延迟和周期性执行,period=0
延迟执行,即只执行一次;period>0
固定频率周期执行;period<0
固定延迟时间周期执行,两次任务开始执行时间间隔受任务执行耗时影响。 - 如果周期性任务的执行时长大于
period
,且看重执行等间隔,建议使用scheduleWithFixedDelay()
。 - 若周期性任务的执行时长远小于
period
,则可以使用scheduleAtFixedRate()
。 - 如果周期性任务的执行时长大于
period
且使用的是scheduleAtFixedRate
则并不能实现固定频率执行 - 当线程池处于关闭状态(
shutdown
),周期性任务会被取消和阻止执行,非周期性任务会顺利执行完成不会被阻止。 - 同一个周期性定时任务出队执行完之后才会重新入队
-
DelayedWorkQueue
通过全局可重入锁来实现同步 -
DelayedWorkQueue
常用于定时任务 -
DelayedWorkQueue
内部使用优先级队列来存储 -
DelayedWorkQueue
添加元素满了之后会自动扩容原来容量的1/2,即永远不会阻塞,最大扩容可达Integer.MAX_VALUE
,所以线程池中至多有corePoolSize
个工作线程正在运行。。 -
DelayedWorkQueue
消费元素take,在堆顶元素为空和delay >0 时,阻塞等待。 -
DelayedWorkQueue
是一个生产永远不会阻塞,消费可以阻塞的生产者消费者模式。 -
DelayedWorkQueue
有一个leader线程的变量,是Leader-Follower
模式的变种。当一个take
线程变成leader
线程时,只需要等待下一次的延迟时间,不是leader
线程的其他take
线程则需要等leader
线程出队列了才唤醒其他take
线程 减少了不必要的定时等待。同时一个线程完成监听和任务执行 也避免了调度线程和任务执行线程的上下文切换成本
ScheduledThreadPoolExecutor与Timer的区别
JDK1.5开始提供ScheduledThreadPoolExecutor类,ScheduledThreadPoolExecutor类继承ThreadPoolExecutor类重用线程池实现了任务的周期性调度功能。在IDK1.5之前,实现任务的周期性调度主要使用的是Timer类和TimerTask类。
1.1线程角度
Timer是单线程模式,如果某个TimerTask任务的执行时间比较久,会影响到其他任务的调度执行。ScheduledThreadPoolExecutor是多线程模式,并且重用线程池,某个ScheduledFutureTask任务执行的时间比较久,不会影响到其他任务的调度执行。
1.2系统时间敏感度
Timer调度是基于操作系统的绝对时间的,对操作系统的时间敏感,一旦操作系统的时间改变,则Timer的调度不再精确。ScheduledThreadPoolExecutor调度是基于相对时间的,不受操作系统时间改变的影响。
1.3是否捕获异常
Timer不会捕获TimerTask抛出的异常,加上Timer又是单线程的。一旦某个调度任务出现异常则整个线程就会终止,其他需要调度的任务也不再执行。ScheduledThreadPoolExecutor基于线程池来实现调度功能,某个任务抛出异常后,其他任务仍能正常执行。
1.4任务是否具备优先级
Timer中执行的TimerTask任务整体上没有优先级的概念,只是按照系统的绝对时间来执行任务。ScheduledThreadPoolExecutor中执行的ScheduledFutureTask类实现了iavalang.Comparable接口和java.utilconcurrentDelayed接口,这也就说明了ScheduledFutureTask类中实现了两个非常重要的方法,一个是javalangComparable接口的compareTo方法,一个是java.util.concurrentDelayed接口的getDelay方法。在ScheduledFutureTask类中compareTo方法方法实现了任务的比较,距离下次执行的时间间隔短的任务会排在前面,也就是说,距离下次执行的时间间隔短的任务的优先级比较高。而getDelay方法则能够返回距离下次任务执行的时间间隔。
1.5是否支持对任务排序
Timer不支持对任务的排序。ScheduledThreadPoolExecutor类中定义了一个静态内部类DelayedWorkQueue,DelayedWorkQueue类本质上是一个有序队列,为需要调度的每个任务按照距离下次执行时间间隔的大小来排序
1.6能否获取返回的结果
Timer中执行的TimerTask类只是实现了iavaangRunnable接口,无法从TimerTask中获取返回的结果。ScheduledThreadPoolExecutor中执行的ScheduledFutureTask类继承了FutureTask类,能够通过遍Future来获取返回的结果。通过以上对ScheduledThreadPoolExecutor类和Timer类的对比,相信在JDK1.5之后,就没有使用Timer来实现定时任务调度的必要了。
网友评论