ScheduledThreadPoolExecutor定时线程池
ScheduledThreadPoolExecutor底层也是线程池,只是队列使用的是DelayedWorkQueue队列
并且最大线程数为整型最大数
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
先分析一下DelayedWorkQueue队列
DelayedWorkQueue队列底层使用的是堆数据结构,保证堆顶是最小的数,堆顶就是最先要执行任务,这里的任务为RunnableScheduledFuture,获取任务的时候,直接获取下角标为0的任务即可.
l 堆都是满二叉树.因为满二叉树会充分利用数组的内存空间;
l 最小堆是指父节点比左节点和右节点都小的结构,所以整个最小堆中,根节点是最小的节点;
l 最大堆是指父节点比左节点和右节点都大的结构,所以整个最大堆中,根节点是最大的节点;
l 最大堆和最小堆的左节点和右节点没有关系,只能判断父节点和左右两节点的大小关系;
基于堆的这些属性,堆适用于找到集合中的最大或者最小值;另外,堆结构记录任务及其索引的关系,便于插入数据或者删除数据后重新排序,所以堆适用于优先队列。可以了解一下这个网站
https://www.cs.usfca.edu/~galles/visualization/Heap.html
上面一行是数组,数据结构下标为:堆顶为0 ,左子节点为1,右节点为2,一次往下排
那每一个节点的父节点和左右子节点的怎么算呢
004举例,004的索引为1
父节点002的索引为 (i -1)/2 =0
左子节点008的索引为 2i +1 =3
右子节点 007的索引为 2i +2 =4
添加元素
image.png
添加003到堆的末尾,先左后右,进行比较,判断003和008大小,如果小则交换位置
image.png
再判断与004大小,小于交换位置
image.png
在判断与002大小,大于则位置不动,插入结束
删除元素
image.png
002从数组中移除,
image.png
堆尾008到堆顶,与子节点比较大小
image.png
先比较左节点,008大于003 交换,008再比较与004大小
image.png
删除元素结束
DelayedWorkQueue队列的源码
成员属性
//数组的初始大小16
private static final int INITIAL_CAPACITY = 16;
//锁
private final ReentrantLock lock = new ReentrantLock();
//数组
private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
//队列大小
private int size = 0;
//执行线程,线程池中的线程
private Thread leader = null;
//条件等待队列,在阻塞队列中分析过
private final Condition available = lock.newCondition();
Offer添加元素
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
//将任务强转为RunnableScheduledFuture
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
//添加的是使用锁,保证线程安全
lock.lock();
try {
int i = size;
//如果超过线程的最大数,则扩容,没有扩容因子,扩容1.5倍
if (i >= queue.length)
grow();
//数组加一
size = i + 1;
//如果数组为空,将任务直接放入队头
if (i == 0) {
queue[0] = e;
//设置索引,赋值heapIndex属性的值
setIndex(e, 0);
} else {
//如果数组不为空,根据上面分析的堆特性,将任务放入到队列
siftUp(i, e);
}
//数组为空的情况下,添加任务到队头,将leader赋值为null,并唤醒等待队列firstWaiter
线程
if (queue[0] == e) {
leader = null;
//唤醒等待队列的firstWaiter线程
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
扩容
private void grow() {
int oldCapacity = queue.length;
int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50% 1.5倍扩容
if (newCapacity < 0) // overflow
newCapacity = Integer.MAX_VALUE;
queue = Arrays.copyOf(queue, newCapacity);
}
siftUp 插入元素,首先插入队尾,与父节点比较,小于则交换,直到索引值k<=0 跳出,因为索引为0则值第一个元素,负数则没有意义
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
//获取父节点索引 >>>1 除2
int parent = (k - 1) >>> 1;
//获取父节点
RunnableScheduledFuture<?> e = queue[parent];
//比较大小,大于则退出,小于则交换
if (key.compareTo(e) >= 0)
break;
//如果小于交换
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}
SiftDown方法,从上而下将元素插入
private void siftDown(int k, RunnableScheduledFuture<?> key) {
//一半是因为父节点和叶子节点比较的时候,是先获取叶子节点小的,在和父节点比较
int half = size >>> 1;
while (k < half) {
//获得左侧叶子几点
int child = (k << 1) + 1;
RunnableScheduledFuture<?> c = queue[child];
//获取右侧叶子节点
int right = child + 1;
//左右叶子节点比较大小,并将小的赋值给c
if (right < size && c.compareTo(queue[right]) > 0)
c = queue[child = right];
//key小于c的话,退出循环,因为是小堆,父节点要小于子节点
if (key.compareTo(c) <= 0)
break;
//否则交换
queue[k] = c;
setIndex(c, k);
k = child;
}
queue[k] = key;
setIndex(key, k);
}
下面看元素的移除,涉及到线程的等待和唤醒
Poll方法
public RunnableScheduledFuture<?> poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//获取堆顶,就是最小的元素,因为线程要执行的就是延时时间最小的节点,
RunnableScheduledFuture<?> first = queue[0];
//获取队列为空,或者时间还没到,返回null
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
//first不为空,并且延时时间已经到了,则获取队列的第一个任务
return finishPoll(first);
} finally {
lock.unlock();
}
}
上面分析过如何取元素,将队头元素取出之后,将队尾的元素放到队头,自上而下的排序
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
//队尾的元素索引,并将队列的长度减一
int s = --size;
//获取队尾任务
RunnableScheduledFuture<?> x = queue[s];
queue[s] = null;
//放到对头 自上而下排序
if (s != 0)
siftDown(0, x);
setIndex(f, -1);
//返回对头任务
return f;
}
take方法
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
//获取队头
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
//如果null则将当先线程添加入等待队列
available.await();
else {
//如果不为null,则获取任务距离现在的时间差,单位为纳秒
long delay = first.getDelay(NANOSECONDS);
//如果<0则返回队头的任务
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
//leader是执行任务的线程,如果不为空,则leader执行任务,并将当前线程存放到等待队列
if (leader != null)
available.await();
else {
//如果为空,将当前线程赋值给leader
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
//让leader等待delay时间,等待任务的执行,当时间到了之后,继续执行for循环
available.awaitNanos(delay);
} finally {
//如果leader为当前线程,当前线程执行完任务后,将leader赋值为null
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 这段代码一定会执行,如果经过上面的判断,如果leader如果还为null 则唤醒条件等待队列的线程
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
image.png
第二次循环 delay<0 比较理想的情况,如果1093行代码, available.awaitNanos(delay);在等待的时候,可能被唤醒,或者中断,那第二次for循环delay可能还大于0
poll(long timeout, TimeUnit unit)方法就不分析了,和take方法基本一致,就是在线程的过期时间和任务距离现在的等待时间的一些判断
DelayedWorkQueue队列就介绍到这
ScheduledThreadPoolExecutor有4个主要的方法,
public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit)
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay,TimeUnit unit)
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay, long period,TimeUnit unit)
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)
DelayedWorkQueue队列中只存放ScheduledFutureTask
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
//等待时间
this.time = ns;
//周期时间
this.period = period;
//在堆数据结构在排序的时候,也就是执行compareTo 比较等待时间大小的时候,如果时间等,
//就按照sequenceNumber的大小排序
this.sequenceNumber = sequencer.getAndIncrement();
}
Schedule方法的两个形参一个是Runnable 一个是Callable都是一样的,只是Callable有返回参数,
schedule方法
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
//只返回了new ScheduledFutureTask<V>(callable,triggerTime(delay, unit))
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
//执行任务
delayedExecute(t);
return t;
}
delayedExecute方法
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
//添加任务到队列
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
//执行任务
ensurePrestart();
}
}
ensurePrestart主要就是调用了addWorker方法
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
在线程池章节中分析过主要就是ThreadPoolExecutor的getTask()方法的,从队列中获取任务就是上面分析过的take方法和poll(long timeout, TimeUnit unit)获取任务,返回null则线程执行结束,返回任务则执行任务.
scheduleAtFixedRate和scheduleWithFixedDelay的两个方法的区别
scheduleAtFixedRate如果周期是2秒,但是执行任务是3秒,那两个任务的执行时间为3秒,
scheduleWithFixedDelay如果周期是2秒,但是执行任务是3秒,那两个任务的执行时间为5秒
在调用上面两个方法的时候,都new ScheduledFutureTask,区别在最后一个参数
scheduleAtFixedRate为unit.toNanos(period)
scheduleWithFixedDelay为unit.toNanos(-delay)
当执行到ScheduledFutureTask重写的run方法时候,调用了下面的方法
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
time为下一次任务执行的时间, 在take的first.getDelay(NANOSECONDS);中time-当前时间
可以看到如果p>0,time+=p, first.getDelay(NANOSECONDS);得到的时间不一定为周期时间
举一个例子 这里就不用时间戳了正常time为时间戳
00:00:00秒开始执行 延时0秒(立即执行) 任务执行需要3秒 周期2秒
time 00:00:00
p为2
第一次执行getDelayde 因为延时0秒,返回0,执行任务,但是需要3秒, 00:00:03秒执行结束,但是周期为两秒,下一次执行时间为 00:00:02,此时的时间已经00:00:03,所以getDelayde为 -1 小于0 立即执行,00:00:06执行结束,周期两秒,因为累加所以下一次执行时间为00:00:04
getDelayde就为-2了,如果任务执行的时间一直为3秒,那time和当前时间差的就越来越大
如果p<0的话,用now+p,保证first.getDelay(NANOSECONDS);的结果为周期时间
通过上面的例子,可以推出, p<0的时候,不管任务执行多久,两次的任务间隔都为2秒
ScheduledFutureTask重写的run方法
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
//如果不是周期任务
else if (!periodic)
ScheduledFutureTask.super.run();
//如果是周期任务
else if (ScheduledFutureTask.super.runAndReset()) {
//下一个任务执行的时间,重新计算
setNextRunTime();
//将任务重新执行, outerTask为当前的任务, 调用scheduleAtFixedRate和scheduleWithFixedDelay方法的时候, 调用的这段代码sft.outerTask = t;保证了任务的周期进行
reExecutePeriodic(outerTask);
}
}
}
定时任务线程池,分析结束.多线程没有办法分析出所有的情况,我也只是分析出大体的流程,做个笔记,方便记忆,也希望能给大家一些帮助.
网友评论