ScheduledThreadPoolExecutor
该线程池继承于ThreadPoolExecutor并且实现了ScheduledExecutorService,用于计划任务执行,如定时周期任务,延迟执行任务等存有计划的任务。
此类的几个特性
- 不管是不是计划任务都会当做计划任务封装为ScheduledFutureTask对象,如果不是计划任务延迟则为0.
- 核心线程数和最大线程数的限制并没有发生改变和ThreadPoolExecutor一样,改变处是workQueue的实现,采用了内部类DelayedWorkQueue,此队列是针对延迟任务的专用队列,从而没有了ThreadPoolExecutor对队列的限制,在ThreadPoolExecutor中如果任务数超过了队列大小那么会拒绝执行,而DelayedWorkQueue的实现会无限制添加具体做法在讲述时详细介绍。
- 重写了OnShutdown方法和任务删除方法
- 添加了decorateTask方法用于监控添加的任务信息。
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {
//是否在调用shutdown方法的时候继续运行周期任务
//true即使是调用了shutdown 也会继续运行周期任务
//false或者shutdownNow方法都不会继续运行周期任务
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
//上方是周期任务此处是非周期任务即延迟任务是否在shutdown后继续执行默认是true
//true即使shutdown也会继续执行
//为false或者shutdownNow则会在关闭线程池的时候会取消非延迟任务的执行
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
//ScheduledFutureTask 任务在取消的时候是否从队列中删除,false则不删除true则删除
private volatile boolean removeOnCancel = false;
//用于确保相同的延迟时间任务间的先后顺序,当做id即可
private static final AtomicLong sequencer = new AtomicLong();
//获取当前纳秒级别时间
final long now() {
return System.nanoTime();
}
//计划任务专用的Future任务
//在netty系列文章里有详细介绍过FutureTask所以这里不做更多的介绍
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
//每个任务的为一个系列号根据上方的sequencer获取得来
private final long sequenceNumber;
//任务最终执行的时间,类型为纳秒
private long time;
//任务的执行周期三个值 正数、负数、0
//这三个值的代表请参照1、看!源码之netty线程池结构文章中对schedule、scheduleAtFixedRate、scheduleWithFixedDelay三个方法的讲解
private final long period;
//用于周期任务再次入队列,因为入队列操作是在线程池中所以在任务中是不能入队列的只能调用外部方法this是不可用的所以使用一个变量将this传出供其使用。
RunnableScheduledFuture<V> outerTask = this;
//在堆数据中的下标位置方便查找删除等操作,具体后面讲解队列时会详细讲述
int heapIndex;
//创建延迟任务构造 传入一个Runnable 之前介绍是说过Future是需要结果的所以传入result,ns 则延迟执行的时间,此构造创建出的任务是非周期任务,他不会周期循环执行,可以看到他的周期时间设置是0
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
//创建周期任务 和构造一的差距是 支持传入周期时间单位是纳秒,
//上方有一点没有讲解就是对他的序列号,可以看到构造里都有给序列号赋值的操作,此操作用于任务间的执行顺序选择。
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
//创建存有返回结果的future 不用传入Runnable而是Callable 此类在netty一文中有讲解不明白的读者可以去看看。
//这三个构造都调用了父类的构造器,父类的讲解将会在下方讲解
ScheduledFutureTask(Callable<V> callable, long ns) {
super(callable);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
//获取当前时间到执行时间的时长
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
//比较方法,用于比较两个任务谁优先执行
public int compareTo(Delayed other) {
//优化如果传入比较的任务和当前任务是同一个则执行顺序不变
if (other == this) // compare zero if same object
return 0;
//如果传入的任务类型是当前的类型则进入
if (other instanceof ScheduledFutureTask) {
//将传入的类型转换为当前类型
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
//获取两个执行时间的差值
long diff = time - x.time;
//如果小于0 则代表当前的执行时间比较靠前返回-1
//比如 2019-02-17 16:00:00 自然小于 2019-02-17 17:00:00
//只不过这里将时间转换为了纳秒
if (diff < 0)
return -1;
//大于0 则代表当前任务的时间需要比传入的任务后执行
else if (diff > 0)
return 1;
//到这里说明diff是等于0的则代表他俩的执行时间是一样的那么就判断序列号的大小如果当前的序列号小于传入的任务序列号则代表当前的任务先添加所以优先执行传入-1 否则 传入的任务是先添加的所以优先执行返回1
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
//如果传入的类型不是当前的类型那么获取他俩的距离下次执行的时差然后做减法
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
//如果小于0 则代表当前的时间比传入任务的先执行返回-1
//如果传入的大于0 则代表传入的任务先执行,但是除了大于0 还有一种情况那就是等于0如果等于0 则返回0 执行顺序随便。
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
//如果当前的任务是周期任务则返回true 如果不是周期任务则返回false
public boolean isPeriodic() {
return period != 0;
}
//设置下次的执行时间
private void setNextRunTime() {
//获取周期
long p = period;
//如果周期大于0 则代表根据本次执行时间开始添加周期,如果添加的结果大于当前时间now则会立即执行
//这么做是因为如果p是正数则代表忽略执行任务的时长
if (p > 0)
time += p;
else
//如果为负数则恰恰相反,不忽略执行时长,将从本次执行结束的时间加上周期。
time = triggerTime(-p);
}
//当前任务取消传入的参数代表如果当前任务正在运行是否进行线程中断,true则为取消当前任务并且中断运行,false则相反
public boolean cancel(boolean mayInterruptIfRunning) {
//这里调用了父类的cancel方法
boolean cancelled = super.cancel(mayInterruptIfRunning);
//如果取消成功,并且当前任务是取消并且删除,并且当前的任务还在队列中,之前再说heapIndex时有说过,这个字段是在队列中的下标如果为-1则代表当前任务已经出队列了。
if (cancelled && removeOnCancel && heapIndex >= 0)
//上面条件都满足则进行队列删除
remove(this);
//返回取消结果
return cancelled;
}
//重写了父类的方法,因为是周期任务所以需要重新入队列并且计算下次执行时间,从而需要重新父类的run方法
public void run() {
//获取当前是否是周期任务
boolean periodic = isPeriodic();
//获取当前运行状态看是否满足取消任务
//下面将此方法的时候详细介绍
//理解为当前任务是否因为线程池shutdown而取消
if (!canRunInCurrentRunState(periodic))
cancel(false);
//如果是立即立即执行的任务则调用父级的run方法
else if (!periodic)
ScheduledFutureTask.super.run();
//如果是周期任务则调用父级的runAndReset方法具体方法的实现在讲解父类的时候会进行详细说明。
else if (ScheduledFutureTask.super.runAndReset()) {
//执行成功则设置下一次执行时间
setNextRunTime();
//并且将周期任务再次添加到队列中
//之前在介绍outerTask的有说明 此变量就是this的一个别名
reExecutePeriodic(outerTask);
}
}
}
//查看当前线程池当前的执行状态
boolean canRunInCurrentRunState(boolean periodic) {
//这里判断了是否为周期任务如果是周期任务则传入continueExistingPeriodicTasksAfterShutdown字段,此字段是用来判断如果线程池shutdown是否继续运行周期任务,true则继续,false则取消
//如果不是周期任务则会传入executeExistingDelayedTasksAfterShutdown字段,此字段是用来半段线程池是否在shutdown后是否继续执行延迟任务,true继续执行,false则是取消
return isRunningOrShutdown(periodic ?
continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}
//此类的核心方法,任务添加都会调用此方法
private void delayedExecute(RunnableScheduledFuture<?> task) {
//当前线程池是否为shutdown状态,如果是则拒绝任务
if (isShutdown())
reject(task);
else {
//否则添加到队列中
super.getQueue().add(task);
//添加完后再次判断是否为shutdown状态,如果是则判断当前线程池是否设置了shutdown后继续运行的操作如果没有则删除任务并且将任务设置为取消状态
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
//否则启动运行,之所以最后再启动运行是因为我们不能保任务达到了可执行的状态,所以先添加到队列中为了确保运行从而启动了线程去运行判断队列消息。具体的讲到此方法后详细讲述
ensurePrestart();
}
}
//周期任务在执行结束后会从下加入队列,而此处则是用来处理重新执行的队列任务
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
//判断当前的线程池是否shutdown状态并且是否运行继续运行,因为执行到这里的时候基本都是周期任务所以直接传入了true
if (canRunInCurrentRunState(true)) {
//如果当前线程池没有被shutdown或者说shutdown但是设置的即使是shutdown后也继续运行那么就会进去到此处,将计算好下次执行时间的任务添加到任务队列中
super.getQueue().add(task);
//如果添加完后发现被shutdown并且shutdown不允许继续运行则删除,删除成功后进行任务的取消
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
//上方已经做了解释,此处不再多说
ensurePrestart();
}
}
//如果当前线程池被shutdown了则会调用此方法进行处理队列中的任务
//进行队列中任务清除工作
@Override void onShutdown() {
//获取线程池的执行队列
BlockingQueue<Runnable> q = super.getQueue();
//获取任务类型是延迟任务是否在shutdow后继续运行
boolean keepDelayed =
getExecuteExistingDelayedTasksAfterShutdownPolicy();
//获取任务类型是周期任务是否在shutdow后继续运行
boolean keepPeriodic =
getContinueExistingPeriodicTasksAfterShutdownPolicy();
//如果都是false 则代表不管是什么类型任务都进行取消
if (!keepDelayed && !keepPeriodic) {
//遍历队列任务如果是RunnableScheduledFuture则进行取消
for (Object e : q.toArray())
if (e instanceof RunnableScheduledFuture<?>)
((RunnableScheduledFuture<?>) e).cancel(false);
//取消完成后clear进行清除队列中的所有任务
q.clear();
}
else {
//如果并不是所有任务都取消则进入此处首先遍历队列
for (Object e : q.toArray()) {
//如果是RunnableScheduledFuture任务则进行cancel和remove操作否则不做处理
if (e instanceof RunnableScheduledFuture) {
//将任务类型转换
RunnableScheduledFuture<?> t =
(RunnableScheduledFuture<?>)e;
//根据任务的是否是当前周期任务来判断使用谁来控制
//如果是周期任务则使用keepPeriodic来设置
//否则使用keepDelayed来控制是否删除并且取消
//还有个条件isCancelled用来判断当前的任务是否已经被取消,去过已经取消也进行删除操作
if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
t.isCancelled()) {
if (q.remove(t))
t.cancel(false);
}
}
}
}
//尝试终止线程池
tryTerminate();
}
//此方法是执行任务的装饰方法,在创建任务的时候不会直接创建而是通过此装饰方法进行创建,如果有特殊需求需要在添加任务前进行处理则重写此类即可,不用去重写schedule等方法,默认实现就是返回当前传入的任务
protected <V> RunnableScheduledFuture<V> decorateTask(
Runnable runnable, RunnableScheduledFuture<V> task) {
return task;
}
//上方方法的重构,Runnable改成了Callable
protected <V> RunnableScheduledFuture<V> decorateTask(
Callable<V> callable, RunnableScheduledFuture<V> task) {
return task;
}
//线程池构造器,传入核心线程数
public ScheduledThreadPoolExecutor(int corePoolSize) {
//此处调用了父级的构造传入的核心线程数是传入的值
//传入的最大线程池数则是MAX_VALUE
//传入的线程存货时间是0,单位是纳秒
//队列是内部类DelayedWorkQueue 延迟任务队列
//这几个参数不了解的可以去参照第一篇文章ThreadPoolExecutor的源码分析
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
//相比上方多了个线程工厂,用于创建线程
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
//相比第一个构造器多了个设置拒绝处理,之前有说过如果shutdown后再添加任务会做拒绝处理,此处构造则是支持用户自定义拒绝处理方式
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
//前两个构造器的合体
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
//计算下次执行时间
private long triggerTime(long delay, TimeUnit unit) {
//将传入周期计算为纳秒传入重构方法中
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
//上方方法的重构
long triggerTime(long delay) {
//当前时间加上计算出的周期纳秒就是下次执行时间的纳秒
//这里做了判断此判断是以防long溢出,如果当前延迟时间小于Long的最大值的一半则返回当前延迟,不会造成溢出,相反则做溢出处理
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
//此方法用于避免long溢出
private long overflowFree(long delay) {
//首先获取队列中的下一个弹出元素 peek 仅仅是查看并不会弹出队列
Delayed head = (Delayed) super.getQueue().peek();
//如果等于null则直接返回当前延迟
if (head != null) {
//否则获取当前的任务的延迟时间,执行时间减去当前时间
long headDelay = head.getDelay(NANOSECONDS);
//如果任务的延迟时间小于0 则代表此任务已经达到的执行任务的条件
//第一个任务已经达到了执行的条件但是还没有执行,说明调度有延迟导致任务执行的时候并没有准时完成
//第二个条件代表如果当前设置的延迟时间加上因为调度的延迟时间小于0 则代表着 long类型溢出了,这里的加上是根据第一条件满足小于0 来的,因为小于所以负负得正所以是加上
if (headDelay < 0 && (delay - headDelay < 0))
//进入到这里说明产生了溢出所以重新进行定义使用long的最大值减去因为调度延迟的时间作为他的周期时长。
delay = Long.MAX_VALUE + headDelay;
}
//不管周期设置是否溢出需要注意的是调用方,加上了当前的纳秒时间则代表绝对是一个负数因为发生了溢出,但是在执行时候确实正数因为,在执行的时候获取延迟时是对当前时间做了减法再次出现溢出从而会通过溢出来进行时间转换。
return delay;
}
//添加延迟任务
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
//简单空指针校验
if (command == null || unit == null)
throw new NullPointerException();
//这里看到他调用的是前面讲述的包装方法来创建任务
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
//然后使用之前讲的延迟执行进行添加任务到执行队列中
delayedExecute(t);
return t;
}
//与上方方法的差异则是将Runnable改成了Callable进行了调用
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
//添加周期任务
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
//校验传入的数据信息
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
//创建周期任务,比上方多了两个参数,一个是第一次执行时的延迟initialDelay,第二个是周期时间,可以看出在上面的执行时间是当前时间加上延迟时间,而这里是初始化延迟时间加当前时间,并且给了一个周期的字段。
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
//这里多了赋值outerTask属性一步,此步是为了如果用户重写了outerTask方法对其发生改变则此处为了保证一致性所以做了赋值操作。
sft.outerTask = t;
delayedExecute(t);
return t;
}
//和上方方法唯一区别处就获取当前周期的时候是使用的负数,具体原因上方已经进行讲解了
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
//立即执行任务,可以看出他调用了延迟方法,传入的延迟时间是0 纳秒
public void execute(Runnable command) {
schedule(command, 0, NANOSECONDS);
}
//与上方方法相同只不过此方法支持返回Future
public Future<?> submit(Runnable task) {
return schedule(task, 0, NANOSECONDS);
}
//此方法将Runnable 和result转成一个Callable
public <T> Future<T> submit(Runnable task, T result) {
return schedule(Executors.callable(task, result), 0, NANOSECONDS);
}
//传参是Callable
public <T> Future<T> submit(Callable<T> task) {
return schedule(task, 0, NANOSECONDS);
}
//设置如果线程池关闭后周期任务的执行条件是继续执行还是取消任务
public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
//给之前讲述的字段赋值,并且判断如果是false则代表立即停止任务,并且判断当前线程池状态是否已经是shutdown状态,如果是则执行onShutdown方法。
continueExistingPeriodicTasksAfterShutdown = value;
if (!value && isShutdown())
onShutdown();
}
//上面方法是set 此方法是get
public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
return continueExistingPeriodicTasksAfterShutdown;
}
//与上方set相同只不过此处是设置任务类型为延迟任务的设置字段
public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
executeExistingDelayedTasksAfterShutdown = value;
if (!value && isShutdown())
onShutdown();
}
//上方是set 此处是get
public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
return executeExistingDelayedTasksAfterShutdown;
}
//设置取消任务的时候是否从队列中移除
public void setRemoveOnCancelPolicy(boolean value) {
removeOnCancel = value;
}
//获取取消是否移除的设置
public boolean getRemoveOnCancelPolicy() {
return removeOnCancel;
}
//关闭线程池,并没有自己的逻辑直接使用的父级方法
public void shutdown() {
super.shutdown();
}
//立即关闭,采用了父级的方法
public List<Runnable> shutdownNow() {
return super.shutdownNow();
}
//获取当前任务的队列
public BlockingQueue<Runnable> getQueue() {
return super.getQueue();
}
//延迟线程池的核心,也是他专属延迟队列
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
//初始化队列大小
private static final int INITIAL_CAPACITY = 16;
//可以看出队列就是一个数组,并且创了初始队列大小为上方设置的
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
//创建了队列锁,添加修改删除等对队列的操作都有可能出现多线程并发问题所以此处创建了内部锁
private final ReentrantLock lock = new ReentrantLock();
//这里需要理解两个概念队列长度和队列任务的个数
//这个size代表队列任务的个数,而长度则是queue.length
private int size = 0;
//管理线程,既然是线程池执行操作那么此队列的操作会是多线程的所以此字段是为了记录进入当前线程池操作的线程。
private Thread leader = null;
//用于等待和唤醒,更深入的讲解需要单独文章,暂时只理解为当前线程的等待和唤醒
private final Condition available = lock.newCondition();
//设置当任务的在队列中的下标索引
private void setIndex(RunnableScheduledFuture<?> f, int idx) {
if (f instanceof ScheduledFutureTask)
((ScheduledFutureTask)f).heapIndex = idx;
}
//在讲解下方法方法前需要了解堆化,堆化的意思就是讲数组转化为堆类型,堆是一个完全二叉树,而下面的两个方法就是对数组的堆化的操作,并且是从小到大的操作。
//k 指定一个向上排序的坐标位置
//key 需要添加到队列中的任务
private void siftUp(int k, RunnableScheduledFuture<?> key) {
//只有k大于0 才代表这没有超过根节点,因为根节点的下标就是0。
while (k > 0) {
//前面讲解了堆是一个完全二叉树那么他不会出现空节点的情况,除了第一个节点其他节点都是有头部的。
//这里第一次进来根据任务的个数减一获取到最后一个任务的下表,并且无符号右移代表着除以二的操作,获取到他的父级下标,这里需要了解两个公式,之前说了是二叉树则代表一个节点会有两个子节点左节点和右节点,而他俩的公式是:左节点=父节点下标*2+1,右节点=父节点下标*2+2。
//知道了这两个公式现在可以理解下面的算法不管是左节点还是右节点只要除以2都会获得父级节点下标
//而此处就是获取队列中最后一个节点的父节点
int parent = (k - 1) >>> 1;
//然后获取父级元素
RunnableScheduledFuture<?> e = queue[parent];
//这里进行了比较,看当前添加的任务先执行还是父级任务先执行,如果是大于等于0 则代表当前父级先执行,则进行break退出循环
if (key.compareTo(e) >= 0)
break;
//如果是当前任务先执行,则将父级任务放到最后一位,这里有一点绕因为k是原先数组中任务的个数但是进入这个方法的都是需要添加任务的,从而这个k就成了添加任务的默认下标,因为我们在获取数组中最后一位数组下标的时候都是length-1是因为数组下标是从0开始计算的。
//所以这里的既可以当做原先数组的任务个数也可以当做当前任务的默认下标,之所以是默认是因为这里做了和父级节点的比较,如果当前添加的新任务先执行与父级任务那么父级任务会被放到数组的最后一位,而当前新任务默认节点会改成为父级任务的节点,因为这是个循环,他会无限制的与父级节点进行比较。
queue[k] = e;
//重新设定获取到的父节点的内部数组下标
setIndex(e, k);
//这里将重写修改新任务的默认下标位置。
//之所以说是默认是因为这是一个循环他会再次执行这一段的逻辑
k = parent;
}
//如果比较成功则最终设置当前任务在队列中的下标位置
queue[k] = key;
//并且设置当前任务的下标为任务内部的属性值
setIndex(key, k);
//到这里对于任务的添加完成了此处的时间复杂度空间为O(logn)
//因为在寻找父级节点的时候是对当前下标的除以二操作,所以他的递进也是存在倍数2的从而他的复杂度是logn。
}
//上方是对数据的从下到上的排序,此方法是从上到下的排序
//k是指定向下排序的坐标位置
//key 进行重拍的任务,一般是使用数组中最后的一个任务
private void siftDown(int k, RunnableScheduledFuture<?> key) {
//获取数组中的中间位置
int half = size >>> 1;
//只有当前指定的下标小于了中间点才代表这此节点有下级
//这里可以假设有6个元素,中间数是3 而需要排序的数也是3,通过画图可以看出3是没有子节点的所以不能进入此循环
while (k < half) {
//之前讲述的左节点的下标计算,就是父级节点*2+1而此处的child就是左节点下标
int child = (k << 1) + 1;
//获取到左节点任务
RunnableScheduledFuture<?> c = queue[child];
//左节点+1就是父节点*2+2,从而获取到了右节点的下标
int right = child + 1;
//判断右节点是否是最后size因为下标是从0开始所以size是没有值的,从而不会进行下面的操作
//如果右节点是小于size则代表这个节点是存在的,然后使用左节点和右节点进行比较,如果右节点先执行则进入if
if (right < size && c.compareTo(queue[right]) > 0)
//此if是将左节点的任务改成右节点的任务,因为需要和key进竞争父级的位置,所以需要最小的那个值,因为是从小到大排序
c = queue[child = right];
//获取到k坐标下的两个节点中最小的那个从而和key进行比较
//如果key小于或者等于0则跳出循环
if (key.compareTo(c) <= 0)
break;
//否则将父级的位置设置为c并且设置index下标
queue[k] = c;
setIndex(c, k);
//然后空出来的位置则作为父级节点k位置继续使用key和k的两个子节点进行竞争
k = child;
}
//直到key竞争成功则将k位置给key
queue[k] = key;
setIndex(key, k);
}
//之前讲过这里的队列使用的试数组,数组是固定大小但是之前有说过,队列是可以自动扩展的而这个方法就是对其进行扩展的操作。
private void grow() {
//获取到当前数组的长度
int oldCapacity = queue.length;
//使用当前长度加上他的百分之五十,可以看出他的增长率是50%。
int newCapacity = oldCapacity + (oldCapacity >> 1);
//如果新的长度小于0 则代表长度溢出了从设置为int的最大值
if (newCapacity < 0) // overflow
newCapacity = Integer.MAX_VALUE;
//此处是对队列拷贝工作将原先的队列任务复制到新的数组空间中
queue = Arrays.copyOf(queue, newCapacity);
}
//查找任务下标
private int indexOf(Object x) {
//判断传入的任务是否为null避免空指针
if (x != null) {
//判断是否为ScheduledFutureTask类型,因为此类型之前说过有个heapIndex字段用于记录他在队列中的位置从而减少了循环的时间复杂度,这个字段的优化点在这个时候体现了出来。
if (x instanceof ScheduledFutureTask) {
//获取当前任务的下标
int i = ((ScheduledFutureTask) x).heapIndex;
//下标大于0代表任务还在队列中,小于size也是为了确保在队列中,再次判断获取的下标中的任务和传入的任务是一个对象,从而返回当前获取i下标
if (i >= 0 && i < size && queue[i] == x)
return i;
} else {
//如果不是ScheduledFutureTask则进行笨办法进行了循环比较校验,比较成功则返回获取的下标
for (int i = 0; i < size; i++)
if (x.equals(queue[i]))
return i;
}
}
//如果查询不到则返回-1
return -1;
}
//判断传入的任务是否在队列中
public boolean contains(Object x) {
//获取当前队列的锁
final ReentrantLock lock = this.lock;
//并且将上锁,是为了在当前这一刻获取的数据,以防在多线程情况下对队列发生了修改从而获取到错误结果。当然也有可能在获取到结果后立马出现了出队列从而和会出现问题。所以在使用的时候需要注意这一点
lock.lock();
try {
//就是调用的indexOf方法进行判断不等于-1则代表任务找到。
return indexOf(x) != -1;
} finally {
//进行解锁,以防死锁
lock.unlock();
}
}
//删除任务
public boolean remove(Object x) {
//获取锁并且上锁防止再删除过程中对任务再次发生改变
final ReentrantLock lock = this.lock;
lock.lock();
try {
//先获取到删除任务的下标
int i = indexOf(x);
//没有找到则返回false删除失败
if (i < 0)
return false;
//找到则先讲其设置为-1
setIndex(queue[i], -1);
//将队列任务个数进行减减操作
int s = --size;
//获取到最后一个任务的下标,并且通过下标获取对象
RunnableScheduledFuture<?> replacement = queue[s];
//设置最后一位对象为null,既然要删除一个任务那么自然会有空的节点,所以需要将空的节点进行填值,那么最后一位自然需要设置为null
queue[s] = null;
//判断删除的数据是否是最后一位,如果不是最后一位那么进入if
if (s != i) {
//首先进行从下到上的排序。
siftDown(i, replacement);
//如果发现i坐标的两个子节点都是大于最后一个节点的那么代表着他的位置没有发生改变就是i所以此处做了处理如果发生位置没有改变,则向上进行比较排序。
if (queue[i] == replacement)
siftUp(i, replacement);
}
//到这里的都是true
return true;
} finally {
//释放锁
lock.unlock();
}
}
//获取当前队列的任务个数,为了保证准确性所以加了锁
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return size;
} finally {
lock.unlock();
}
}
//判断当前队列是否为空
public boolean isEmpty() {
return size() == 0;
}
//获取队列的最大数,之前在将grow的时候看到过他的最大值就是MAX_VALUE
public int remainingCapacity() {
return Integer.MAX_VALUE;
}
//查看即将执行的队列任务,很简答就是获取数组的0下标
public RunnableScheduledFuture<?> peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return queue[0];
} finally {
lock.unlock();
}
}
//添加队列任务
public boolean offer(Runnable x) {
//做了检测防止空指针
if (x == null)
throw new NullPointerException();
//将Runable转为RunnableScheduledFuture类型
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
//上锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//获取当前size为i
int i = size;
//如果i大于等于队列长度则进行队列的扩张
if (i >= queue.length)
grow();
//给size加一因为新加了一个任务
size = i + 1;
//判断i是否等于0
if (i == 0) {
//等于则将0下标放成当前任务
queue[0] = e;
setIndex(e, 0);
} else {
//否则进行向上排序比较
siftUp(i, e);
}
//如果当前队列的第一个任务就是添加的新任务
if (queue[0] == e) {
//则唤醒当前等待执行的线程
//因为如果队列中的任务都执行完了会存在线程睡眠等待的情况,此处就是唤醒正在睡眠的线程起来继续完成任务。
leader = null;
available.signal();
}
} finally {
//释放线程锁
lock.unlock();
}
//返回true
return true;
}
//下面三个方法都是对offer的调用没有什么可讲的
public void put(Runnable e) {
offer(e);
}
public boolean add(Runnable e) {
return offer(e);
}
public boolean offer(Runnable e, long timeout, TimeUnit unit) {
return offer(e);
}
//此方法是用于出队列时使用的,队列操作添加修改但是他们永远都只有一个出口那就是下标0,所有要正常出队的都需要经过下标0。而此处就是对于正常出队列的操作进行后续操作。
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
//如果一个任务出队列那么队列个数进行减一
int s = --size;
//并且获取最后一个任务
RunnableScheduledFuture<?> x = queue[s];
//并且设置最后一个index为null
queue[s] = null;
//s不等0则代表队列还有任务,然后使用父级下标0 和 最后一个任务进行向下排序
if (s != 0)
siftDown(0, x);
//不管是否是最后一个任务都将此任务设置为-1出队下标
setIndex(f, -1);
//返回传入的出队任务f
return f;
}
//任务出队列,如果数组中没有任务或者任务执行时间未到则返回null
public RunnableScheduledFuture<?> poll() {
//对于队列的操作都需要上锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//获取下标0的任务
RunnableScheduledFuture<?> first = queue[0];
//判断当前任务是否符合条件,如果是null则代表没有任务,返回null
//如果当前时间到执行时间的时长大于0 则也返回null未到执行时间
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
//得到执行结果后返回获取到的任务,并且对数组进行排序,具体查看方法说明
return finishPoll(first);
} finally {
//释放锁
lock.unlock();
}
}
//任务出队列,与上方方法不同的是此方法内部是死循环,如果获取不到任务将会进行睡眠等待,直到再添加或者其他操作唤醒的时候继续工作。
public RunnableScheduledFuture<?> take() throws InterruptedException {
//获取锁并且使用能够被中断的方式获取,lock()方法如果出现中断是会继续执行的,而lockInterruptibly发生中断异常则会停止执行。
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//死循环获取队列执行任务
for (;;) {
//获取数组中的0号位的任务
RunnableScheduledFuture<?> first = queue[0];
//如果任务是null则进行等待,在添加任务的时候将会被唤醒,唤醒后将继续循环
if (first == null)
available.await();
else {
//如果任务不是null则获取当时时间到执行时间间的时长
long delay = first.getDelay(NANOSECONDS);
//如果时长小于等于0则代表可以立即执行
if (delay <= 0)
//调用finishPoll做出队列后的操作并且返回first
return finishPoll(first);
//如果没有达到执行时间则会进行等待操作,等待过程不需要引用所以此处设置为了null
first = null;
//上方的等待是因为没有任务,而此处等待是因为任务还未到执行时间,但是first不执行后面的都没办法执行从而导致需要一个领导就是leader线程,如果他不等null说明已经有人负责first的执行了所以其他线程等待即可。
if (leader != null)
available.await();
else {
//如果发现leader是null的说明并没有线程成为领导线程,就获取当前的执行线程并且设置他为领导线程
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
//使他睡眠到执行时间
available.awaitNanos(delay);
} finally {
//时间到后清除leader,因为已经被唤醒了,代表着first已经有线程在执行了,而一个first只能被一个线程执行,所以可以假设所有的线程都在等待只有这个线程在运行,运行到此处将leader清除,代表着这个first已经被指定了执行线程。
//当然前提是leader是thisThread
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
//当定时等待线程被唤醒后会获取到first任务,既然已经获取到了任务那么如果下个任务和first执行时间一样那么需要在唤醒一个线程
//而此处先判断leader是否为null,如果不为null代表下一个first任务已经被指定了执行线程,并且queue不为null代表队列是有任务的
if (leader == null && queue[0] != null)
//满足上面的条件则唤醒一个线程继续获取任务
available.signal();
//释放锁,这里需要注意等待的时候会释放锁,当结束等待的时候会继续获取锁,而此处再次释放锁
lock.unlock();
}
}
//限时获取任务
//timeout 超时时间
//unit 超时时间的类型是秒、分、纳秒等
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
throws InterruptedException {
//首先根据时间类型将时间转换为纳秒
long nanos = unit.toNanos(timeout);
//获取可中断锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//死循环获取
for (;;) {
//获取队列0下标的first任务
RunnableScheduledFuture<?> first = queue[0];
//如果等于null 则判断纳秒数是否小于等于0,如果小于等于则返回null,否则等待指定的纳秒数,等待返回的时间一般是小于等于0的一个数值,除非出现中断或者唤醒等情况,才会发生正数。
if (first == null) {
if (nanos <= 0)
return null;
else
nanos = available.awaitNanos(nanos);
} else {
//获取first任务当前时间到执行时间的执行时长
long delay = first.getDelay(NANOSECONDS);
//如果满足执行时间则进行返回并且处理后续操作
if (delay <= 0)
return finishPoll(first);
//如果设置的等待纳秒数是小于等于0的则返回null
if (nanos <= 0)
return null;
//等待时清空任务引用
first = null;
//纳秒数如果小于延迟数,或者领导线程不是null,则进行等待
//这里条件需要拆开讲,设置时长如果小于执行时长,则代表即使等待超时也不会执行到此任务,从而设置纳秒数为小于等于0下拨循环返回null。
//第二个条件用来限制是否first任务已经绑定了执行线程,如果绑定了那么进行等待你下一波循环返回null吧,这个处理触发代表着超时时间超过了执行时长
//这里的执行时长代表着执行时间减去当前时间的这个时长
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
//到这里说明超时时间大于了执行时长,并且当前并没有领导线程绑定了first任务
//获取当前线程作为领导线程绑定当前的first任务
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
//获取等到的结果,如果出现唤醒或者中断等情况则会返回正值,否则为负数
long timeLeft = available.awaitNanos(delay);
//这里timeLeft代表两种情况
//正数,则是等待未到指定时间就结束了,则延迟数减去未完成时间获取了具体的等待时间 使用纳秒数减去等待时间数则就是下一个循环的等到时间,这样就可以确保设置的等待时间的准确性
//负数和0,0不用说了,负数则负负等正,就是等待超过了设置的时间这样与等到时间进行相加获取到具体的执行时长,再去用纳秒数去减去时长获取的还是剩余数,这样还是为了保证设置的具体执行时间
nanos -= delay - timeLeft;
} finally {
//如果等待被唤醒则判断当前线程是否为管理线程,如果是则清除,与take方法的意思是一样的。
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
//具体含义查看take
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
//清空队列
public void clear() {
//之前说过个任何对于队列的操作都需要内部锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//循环设置null并且设置队列的下标状态为出队列-1
for (int i = 0; i < size; i++) {
RunnableScheduledFuture<?> t = queue[i];
if (t != null) {
queue[i] = null;
setIndex(t, -1);
}
}
//设置任务数为0
size = 0;
} finally {
lock.unlock();
}
}
//获取达到执行条件的任务,意味着任务超时或者刚好到执行时间时则返回任务否则返回null
private RunnableScheduledFuture<?> peekExpired() {
//获取下标0 first
RunnableScheduledFuture<?> first = queue[0];
//判断first 不等于null 代表有执行任务
//判断任务的执行时间小于0 则返回null
//两个条件连起来代表 有执行任务并且执行时长小于等于0则返回first任务
return (first == null || first.getDelay(NANOSECONDS) > 0) ?
null : first;
}
//将达到执行条件的任务添加到指定集合并且从队列中移除
public int drainTo(Collection<? super Runnable> c) {
//常规校验
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
//获取队列操作锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//声明两个变量 first用于存储获取到的任务
//n 用于记录移除的任务个数
RunnableScheduledFuture<?> first;
int n = 0;
//使用peekExpired筛选任务并且获取达到执行条件的任务
while ((first = peekExpired()) != null) {
//先添加到集合中,这个代码执行顺序是防止传入的集合c发生add异常从而导致任务丢失既不在队列中也在c集合中添加失败的问题
c.add(first);
//添加成功后进行移除等后续操作
finishPoll(first);
//使添加数进行加加操作
++n;
}
//返回获取的任务个数
return n;
} finally {
//释放锁
lock.unlock();
}
}
//与上方一致只不过加了个限制就是指定了获取任务的最大数
public int drainTo(Collection<? super Runnable> c, int maxElements) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
final ReentrantLock lock = this.lock;
lock.lock();
try {
RunnableScheduledFuture<?> first;
int n = 0;
while (n < maxElements && (first = peekExpired()) != null) {
c.add(first);
finishPoll(first);
++n;
}
return n;
} finally {
lock.unlock();
}
}
//以数组方式返回队列任务
public Object[] toArray() {
//获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//这里要注意如果直接返回队列数组则会导致引用返回,如果外部修改数组那么队列中的数据也会发生修改,所以此处使用copy,并且copy的个数是队列的个数。
return Arrays.copyOf(queue, size, Object[].class);
} finally {
//释放锁
lock.unlock();
}
}
//与上方方法相同只不过传入指定的数组进行copy,这样copy可以返回指定的类型,上面返回的是Object
@SuppressWarnings("unchecked")
public <T> T[] toArray(T[] a) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (a.length < size)
return (T[]) Arrays.copyOf(queue, size, a.getClass());
System.arraycopy(queue, 0, a, 0, size);
if (a.length > size)
a[size] = null;
return a;
} finally {
lock.unlock();
}
}
//获取队列的迭代器
public Iterator<Runnable> iterator() {
return new Itr(Arrays.copyOf(queue, size));
}
//快照迭代器,获取迭代器的时候会将当前队列的状态复制一份进行返回
private class Itr implements Iterator<Runnable> {
final RunnableScheduledFuture<?>[] array;
//迭代器的下标记录光标
int cursor = 0;
//最后一次调用next所产生的光标,用于调用remove的时候进行删除,这样每次迭代获取到迭代器删除都会是正确的
int lastRet = -1;
//构造器需要传入需要迭代的数组,从iterator可以看出,他对队列copy了一份传入到此构造器中
Itr(RunnableScheduledFuture<?>[] array) {
this.array = array;
}
//判断是否还有下一个值,如果当前光标大于等于length则返回false代表没有数据继续迭代
public boolean hasNext() {
return cursor < array.length;
}
//获取下一个数据
public Runnable next() {
//当前的光标大于等于数组长度则抛出异常告诉调用者没有找到元素
if (cursor >= array.length)
throw new NoSuchElementException();
//将本次获取的元素下标进行保存到最后一次获取的记录中
lastRet = cursor;
//之所以这样赋值是因为i++的特性 先返回后执行++从而获取的元素就是光标本身才进行++获取下一次的光标地址
return array[cursor++];
}
//删除
public void remove() {
//如果小于0则代表当前迭代器并没有进行迭代,因为默认值是-1
if (lastRet < 0)
throw new IllegalStateException();
//如果进行了迭代则删除在队列中的元素,这里需要知道为什么他没有删除迭代器中的数组元素,因为迭代器是一次性产物,迭代完就完了没有再次迭代一说,所以此处并没有对迭代器中的数据进行操作
DelayedWorkQueue.this.remove(array[lastRet]);
//并且设置为-1这样就不能再次调用remove从而导致重复删除的问题
lastRet = -1;
}
}
}
}
网友评论