上一章我们详细介绍EventExecutor
和EventLoop
接口原理,这一章我们来讲解它们的实现原理。
一. AbstractEventExecutor 类
/**
* EventExecutor 实现的抽象基类。
*/
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
.....
}
这个类是 EventExecutor
接口实现的抽象基类,它基础自 AbstractExecutorService
抽样类。
AbstractExecutorService
抽样类实现了ExecutorService
接口除了execute(Runnable)
方法外的所有方法,具体请看Executor与ExecutorService原理分析
/**
* EventExecutor 实现的抽象基类。
*/
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEventExecutor.class);
// 调用 shutdownGracefully 方法默认安静期时间
static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2;
// 调用 shutdownGracefully 方法默认 shutdown 超时时间
static final long DEFAULT_SHUTDOWN_TIMEOUT = 15;
// 父的 EventExecutorGroup,由构造方法中传递
private final EventExecutorGroup parent;
// 只包含自身的集合 selfCollection
private final Collection<EventExecutor> selfCollection = Collections.<EventExecutor>singleton(this);
protected AbstractEventExecutor() {
this(null);
}
protected AbstractEventExecutor(EventExecutorGroup parent) {
this.parent = parent;
}
@Override
public EventExecutorGroup parent() {
return parent;
}
/**
* @return 返回值就是自身
*/
@Override
public EventExecutor next() {
return this;
}
@Override
public boolean inEventLoop() {
return inEventLoop(Thread.currentThread());
}
/**
*
* @return 只返回包含自身的迭代器
*/
@Override
public Iterator<EventExecutor> iterator() {
return selfCollection.iterator();
}
/**
* 调用 shutdownGracefully(long, long, TimeUnit) 方法, 使用默认值
*/
@Override
public Future<?> shutdownGracefully() {
return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
}
/**
* 被 @deprecated,期望使用者调用 shutdownGracefully(long, long, TimeUnit) 或 shutdownGracefully()
*/
@Override
@Deprecated
public abstract void shutdown();
/**
* 被 @deprecated,期望使用者调用 shutdownGracefully(long, long, TimeUnit) 或 shutdownGracefully()
*/
@Override
@Deprecated
public List<Runnable> shutdownNow() {
// 调用 shutdown 方法
shutdown();
// 返回空集合
return Collections.emptyList();
}
/**
* 直接返回新创建的 DefaultPromise 对象
*/
@Override
public <V> Promise<V> newPromise() {
return new DefaultPromise<V>(this);
}
/**
* 直接返回新创建的 DefaultProgressivePromise 对象
*/
@Override
public <V> ProgressivePromise<V> newProgressivePromise() {
return new DefaultProgressivePromise<V>(this);
}
/**
* 直接返回新创建的 SucceededFuture 对象
*/
@Override
public <V> Future<V> newSucceededFuture(V result) {
return new SucceededFuture<V>(this, result);
}
/**
* 直接返回新创建的 FailedFuture 对象
*/
@Override
public <V> Future<V> newFailedFuture(Throwable cause) {
return new FailedFuture<V>(this, cause);
}
/**
* @param task
* @return 要将返回值变成 netty 的 Future
*/
@Override
public Future<?> submit(Runnable task) {
return (Future<?>) super.submit(task);
}
/**
* @return 要将返回值变成 netty 的 Future
*/
@Override
public <T> Future<T> submit(Runnable task, T result) {
return (Future<T>) super.submit(task, result);
}
/**
* @return 要将返回值变成 netty 的 Future
*/
@Override
public <T> Future<T> submit(Callable<T> task) {
return (Future<T>) super.submit(task);
}
/**
* 必须复写 newTaskFor 方法,改变 AbstractExecutorService 的实现;
* 将返回值的 RunnableFuture 变成 netty 的 RunnableFuture 实现类 PromiseTask
*/
@Override
protected final <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new PromiseTask<T>(this, runnable, value);
}
/**
* 必须复写 newTaskFor 方法,改变 AbstractExecutorService 的实现;
* 将返回值的 RunnableFuture 变成 netty 的 RunnableFuture 实现类 PromiseTask
*/
@Override
protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new PromiseTask<T>(this, callable);
}
/**
* 当前 AbstractEventExecutor 不支持延时和周期性任务
*/
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay,
TimeUnit unit) {
throw new UnsupportedOperationException();
}
/**
* 当前 AbstractEventExecutor 不支持延时和周期性任务
*/
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
throw new UnsupportedOperationException();
}
/**
* 当前 AbstractEventExecutor 不支持延时和周期性任务
*/
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
throw new UnsupportedOperationException();
}
/**
* 当前 AbstractEventExecutor 不支持延时和周期性任务
*/
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
throw new UnsupportedOperationException();
}
/**
* 尝试执行给定的Runnable并记录它是否抛出Throwable。
*/
protected static void safeExecute(Runnable task) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);
}
}
/**
* 与execute(Runnable)类似,但不保证任务将运行,直到非惰性任务被执行或执行器关闭。
* 默认实现只是委托执行(Runnable)
*/
@UnstableApi
public void lazyExecute(Runnable task) {
execute(task);
}
/**
* Runnable的标记接口,指示它应该排队等待执行,但不需要立即运行。
*/
@UnstableApi
public interface LazyRunnable extends Runnable { }
}
在这个抽象基类中:
- 实现了一些基础方法,例如创建一些
Future
和Promise
方法等。- 复写了
AbstractExecutorService
的newTaskFor
方法,返回netty
的RunnableFuture
实现类PromiseTask
。- 不支持
schedule
系列方法。
二. AbstractScheduledEventExecutor 类
/**
* 要支持调度的 EventExecutor 的抽象基类。
*/
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
....
}
AbstractScheduledEventExecutor
继承自 AbstractEventExecutor
类,它来实现 EventExecutor
延迟或周期性任务的功能。
我们首先思考一个问题,如何实现延迟或周期性的任务呢?
如果你看过我的 ScheduledThreadPoolExecutor原理分析 这篇文章,你会发现要实现延迟或周期性的功能,其实也很简单,这里面最重要的就是优先级队列。
优先级队列就是队列中的数据是按照优先级排列的,我们每次从优先级队列中获取数据,一定是优先级最低(或者最高的,这个取决于队列的排列顺序),这样我们就很轻松实现延迟或周期性任务的功能了。
- 将延迟任务的延迟时间和周期性任务这次周期时间作为优先级,这样从优先级队列获取的任务,一定是队列中最快要执行的任务。
- 对于周期性任务,一次执行完成之后,分为两种情况:
- 固定延时,将周期时间作为延时时间,将这个任务再次加入到 优先级队列中。
- 固定周期,比较周期时间减去任务执行时间的结果作为延时时间,将这个任务再次加入到 优先级队列中;如果结果小于
0
,那说明不需要延时,直接执行任务。
我们先来讲解一下 AbstractScheduledEventExecutor
中的 优先级队列 PriorityQueue
。
2.1 优先级队列
在 jdk
的 ScheduledThreadPoolExecutor
实现中,使用 DelayedWorkQueue
作为优先级队列的实现。
但是因为
ScheduledThreadPoolExecutor
需要考虑并发问题,DelayedWorkQueue 是实现了BlockingQueue
接口,里面就使用了并发锁ReentrantLock
。
而EventExecutor
管理的任务,一定只会在EventExecutor
依赖的线程中执行,不存在并发冲突问题,所以它的优先级队列实现是不需要加锁的。
如何实现一个优先级队列,最简单高效的方法就是使用堆排序。
2.1.1 堆排序
在常用排序算法总结 和 DelayedWorkQueue 这两篇文章中,都详细介绍了堆排序,这里简单地说一下。
2.1.1.1 堆的介绍
- 堆是一个完全二叉树,即除了最后一层节点不是满的,其他层节点都是满的,即左右节点都有。
- 堆左和右节点的值都比父节点值小(或者都比父节点值大,取决于从大到小排序,还是从小到大排序)。
- 堆可以实现快速的插入和删除,效率都在(log N)左右,所以它可以实现优先级队列。
因为堆是一个完全二叉树,所以可以使用数组去存储堆,即
// 对于n位置的节点来说:
int left = 2 * n + 1; // 左子节点,一般使用位运算 (n << 1) + 1
int right = 2 * n + 2; // 右子节点 一般使用位运算 (n << 1) + 2
// 父节点,当然n要大于0,根节点是没有父节点的。
// 一般使用位运算 (n - 1) >>> 1
int parent = (n - 1) / 2;
2.1.1.2 堆的插入
必须保证插入后的堆还是完全二叉树;父节点的值不能小于子节点的值(或者不能大于子节点的值)。
public void insert(int value) {
// 第一步将插入的值,直接放在最后一个位置。并将长度加一
store[size++] = value;
// 得到新插入值所在位置。
int index = size - 1;
while(index > 0) {
// 它的父节点位置坐标
int parentIndex = (index - 1) / 2;
// 如果父节点的值小于子节点的值,你不满足堆的条件,那么就交换值
if (store[index] > store[parentIndex]) {
swap(store, index, parentIndex);
index = parentIndex;
} else {
// 否则表示这条路径上的值已经满足降序,跳出循环
break;
}
}
}
2.1.1.3 堆的删除
必须保证删除后的堆还是完全二叉树;父节点的值不能小于子节点的值(或者不能大于子节点的值)。
public int remove() {
// 将根的值记录,最后返回
int result = store[0];
// 将最后位置的值放到根节点位置
store[0] = store[--size];
int index = 0;
// 通过循环,保证父节点的值不能小于子节点。
while(true) {
int leftIndex = 2 * index + 1; // 左子节点
int rightIndex = 2 * index + 2; // 右子节点
// leftIndex >= size 表示这个子节点还没有值。
if (leftIndex >= size) break;
int maxIndex = leftIndex;
if (store[leftIndex] < store[rightIndex]) maxIndex = rightIndex;
if (store[index] < store[maxIndex]) {
swap(store, index, maxIndex);
index = maxIndex;
} else {
break;
}
}
return result;
}
2.1.2 PriorityQueue 接口
public interface PriorityQueue<T> extends Queue<T> {
/**
* 与remove(Object)方法相同,但使用泛型进行类型化。
*/
boolean removeTyped(T node);
/**
* 与contains(Object)方法相同,但使用泛型进行类型化。
*/
boolean containsTyped(T node);
/**
* 通知队列节点的优先级已更改。
* 队列将进行调整,以确保优先队列属性得到维护。
* @param node 队列中的对象,该对象的优先级可能发生了变化。
*/
void priorityChanged(T node);
/**
* 与 clear() 方法相比,本方法只是将size 重新设置成 0;
* clear() 方法还会将原先队列中所有节点的优先级改变,以及清空队列数据。
* 具体可以看 DefaultPriorityQueue 的实现
*
* 只有当确定节点不会被重新插入到这个或任何其他的 PriorityQueue 中,
* 并且知道 PriorityQueue 本身将在调用之后被垃圾回收时,才应该使用这个方法。
*/
void clearIgnoringIndexes();
}
基本上都是队列 Queue
接口的方法,提供了两个泛型的方法,以及节点优先级变化的通知方法。
2.1.3 PriorityQueueNode 接口
/**
* 为 DefaultPriorityQueue 提供方法以维护内部状态。
* 这些方法通常不应在 DefaultPriorityQueue 范围之外使用。
*/
public interface PriorityQueueNode {
/**
* 表示节点不在队列中
*/
int INDEX_NOT_IN_QUEUE = -1;
/**
* 获取由 priorityQueueIndex(DefaultPriorityQueue, int) 为queue对应的值设置的最后一个值。
*/
int priorityQueueIndex(DefaultPriorityQueue<?> queue);
/**
* 由 DefaultPriorityQueue 用于维护队列中元素的状态。
*/
void priorityQueueIndex(DefaultPriorityQueue<?> queue, int i);
}
表示优先级队列中节点的接口类,主要提供两个设置节点索引的方法。
2.1.4 ScheduledFutureTask 类
/**
* 它是一个优先级PriorityQueueNode 节点,也代表一个延时或者周期性任务 PromiseTask
*/
@SuppressWarnings("ComparableImplementedButEqualsNotOverridden")
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V>, PriorityQueueNode {
.......
}
它即使优先级队列中的一个节点 PriorityQueueNode
,也代表一个延时或者周期性任务 PromiseTask
。
2.1.4.1 基准时间 START_TIME
我们不管是延时任务还是周期性任务都有任务执行的截止时间 deadlineNanos
,截止时间到了就会执行任务。
而一般这个时间值会很大,通过 System.nanoTime()
得到。
所以 netty
设置了一个基准时间 START_TIME
/**
* 程序开始时间的纳秒值,因此所有的延时时间都是在这个 START_TIME 基础上定义的。
*/
private static final long START_TIME = System.nanoTime();
netty
中涉及到的延时时间都是在 START_TIME
基础上计算得到的:
- 当前程序的执行时间纳秒值
/** * 程序执行了多少纳秒值。 * 因为是当前纳秒值和程序开始时间纳秒值之差。 */ static long nanoTime() { return System.nanoTime() - START_TIME; }
- 用户定义延时时间对应的截止时间
/** * 将用户自定义的延时时间 转换成 截止时间 deadlineNanos,即 System.nanoTime() - START_TIME + delay */ static long deadlineNanos(long delay) { long deadlineNanos = nanoTime() + delay; // Guard against overflow return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos; }
- 获取当前剩余的时间
/** * 当前剩余的时间 * 即 使用截止时间deadlineNanos - 当前时间真实值 nanoTime() * @return */ public long delayNanos() { return deadlineToDelayNanos(deadlineNanos()); } /** * 截止时间转换成剩余时间 * @param deadlineNanos * @return */ static long deadlineToDelayNanos(long deadlineNanos) { // 最小都是 0 return deadlineNanos == 0L ? 0L : Math.max(0L, deadlineNanos - nanoTime()); }
- 获取指定时间戳后剩余的时间
/** * 获取到指定时间戳 currentTimeNanos 时,还剩余的时间纳秒值 * 即 截止时间deadlineNanos - ( currentTimeNanos - START_TIME) * @param currentTimeNanos * @return */ public long delayNanos(long currentTimeNanos) { // 最小都是 0 return deadlineNanos == 0L ? 0L : Math.max(0L, deadlineNanos() - (currentTimeNanos - START_TIME)); }
2.1.4.2 构造函数
- 创建延时任务
/** * 创建一个延时任务 */ ScheduledFutureTask(AbstractScheduledEventExecutor executor, Runnable runnable, long nanoTime) { super(executor, runnable); deadlineNanos = nanoTime; // 因为是延时任务,periodNanos 就是 0 periodNanos = 0; } /** * 创建一个延时任务 */ ScheduledFutureTask(AbstractScheduledEventExecutor executor, Callable<V> callable, long nanoTime) { super(executor, callable); deadlineNanos = nanoTime; // 因为是延时任务,periodNanos 就是 0 periodNanos = 0; }
- 创建周期性任务
/** * 创建一个周期性任务 */ ScheduledFutureTask(AbstractScheduledEventExecutor executor, Runnable runnable, long nanoTime, long period) { super(executor, runnable); deadlineNanos = nanoTime; periodNanos = validatePeriod(period); } /** * 创建一个周期性任务 */ ScheduledFutureTask(AbstractScheduledEventExecutor executor, Callable<V> callable, long nanoTime, long period) { super(executor, callable); deadlineNanos = nanoTime; periodNanos = validatePeriod(period); }
2.1.4.3 运行任务
// 任务运行
@Override
public void run() {
// 当前线程是否在执行器的线程
assert executor().inEventLoop();
try {
// delayNanos() > 0L 表示任务截止时间还没有到
if (delayNanos() > 0L) {
if (isCancelled()) {
// isCancelled() 为 true,任务已经被取消,那么就从列表中移除
scheduledExecutor().scheduledTaskQueue().removeTyped(this);
} else {
// 否则将任务重新放回队列
scheduledExecutor().scheduleFromEventLoop(this);
}
return;
}
//下面都是任务截止时间已经到了
if (periodNanos == 0) {
// periodNanos == 0 表示只是延时任务。
// 先将任务设置成不可取消
if (setUncancellableInternal()) {
// 执行任务
V result = runTask();
// 设置 PromiseTask 为成功,进行通知
setSuccessInternal(result);
}
} else {
// 检查任务是否被取消
if (!isCancelled()) {
// 执行任务
runTask();
// 如果执行器是否被终止
if (!executor().isShutdown()) {
if (periodNanos > 0) {
// periodNanos > 0 表示固定周期,那么下一次执行时间就是
// 本次截止时间deadlineNanos + 周期时间 periodNanos
// 但是这个值可能小于当前时间啊,只要任务执行时间比周期时间 periodNanos大,
// 那么这个值就小于当前时间。就代表会立即运行
deadlineNanos += periodNanos;
} else {
// periodNanos < 0 表示固定延时。
// 使用当前时间 nanoTime() 加上固定延时时间(- periodNanos)
deadlineNanos = nanoTime() - periodNanos;
}
if (!isCancelled()) {
scheduledExecutor().scheduledTaskQueue().add(this);
}
}
}
}
} catch (Throwable cause) {
setFailureInternal(cause);
}
}
2.1.5 DefaultPriorityQueue 类
/**
* 优先级队列使用元素的自然排序。
* 为了维护优先级队列中的索引,元素还需要是 PriorityQueueNode 类型。
*/
public final class DefaultPriorityQueue<T extends PriorityQueueNode> extends AbstractQueue<T>
implements PriorityQueue<T> {
private static final PriorityQueueNode[] EMPTY_ARRAY = new PriorityQueueNode[0];
// 使用比较器 Comparator 进行排序
private final Comparator<T> comparator;
// 使用数组存储堆
private T[] queue;
// 表示队列中节点的数量
private int size;
@SuppressWarnings("unchecked")
public DefaultPriorityQueue(Comparator<T> comparator, int initialSize) {
this.comparator = ObjectUtil.checkNotNull(comparator, "comparator");
queue = (T[]) (initialSize != 0 ? new PriorityQueueNode[initialSize] : EMPTY_ARRAY);
}
.......
}
使用数组存储堆的数据。
2.1.5.1 清除方法
@Override
public void clear() {
// 遍历队列中的元素节点
for (int i = 0; i < size; ++i) {
T node = queue[i];
if (node != null) {
// 设置节点的索引是 INDEX_NOT_IN_QUEUE,表示节点不在队列中了
node.priorityQueueIndex(this, INDEX_NOT_IN_QUEUE);
// 将引用设置为 null, 方便 gc
queue[i] = null;
}
}
size = 0;
}
@Override
public void clearIgnoringIndexes() {
// 只是将 size 变成 0
size = 0;
}
可以看出
clearIgnoringIndexes()
只是设置了size
的值,只有在确定节点不会被重新插入到这个或任何其他的PriorityQueue
中,并且这个PriorityQueue
队列本身将在调用之后被垃圾回收时,才应该使用这个方法。
2.1.5.2 包含节点
// 队列是否包含 o 对象
@Override
public boolean contains(Object o) {
// 不是 PriorityQueueNode 的实例,直接返回 false
if (!(o instanceof PriorityQueueNode)) {
return false;
}
PriorityQueueNode node = (PriorityQueueNode) o;
// 是 PriorityQueueNode 实例,就可以利用索引快速判断
return contains(node, node.priorityQueueIndex(this));
}
@Override
public boolean containsTyped(T node) {
return contains(node, node.priorityQueueIndex(this));
}
/**
* 判断节点是不是在队列中,
* 因为节点都是 PriorityQueueNode 类型,可以获取对应的索引 i,
* 这样就可以直接通过索引从数组queue中获取数据进行比较,提高效率。
*/
private boolean contains(PriorityQueueNode node, int i) {
return i >= 0 && i < size && node.equals(queue[i]);
}
利用
PriorityQueueNode
类型节点包含索引,可以快速判断,节点是不是包含在队列中。
2.1.5.3 bubbleDown
方法
/**
* 调用这个方法,表示当前 k 位置的节点值 node 可能比它的子节点的值大;
* 为了保持最小堆属性,因此向下遍历树,将节点值 node 放到合适的位置
*/
private void bubbleDown(int k, T node) {
// size 是树的最底一层, size >>> 1 就表示最底一层节点的父节点
final int half = size >>> 1;
// 通过循环,保证父节点的值不能大于子节点。
while (k < half) {
// 左子节点, 相当于 (k * 2) + 1
int iChild = (k << 1) + 1;
// 左节点存储的值 child
T child = queue[iChild];
// 右子节点
int rightChild = iChild + 1;
// 当右节点在队列中,且左节点大于右节点,右节点才是较小的子节点,那么进行交换
if (rightChild < size && comparator.compare(child, queue[rightChild]) > 0) {
child = queue[iChild = rightChild];
}
// 当 bubbleDown节点node 的值小于或者等于当前较小的子节点的值,
// 那么我们将通过在这里插入bubbleDown节点来保持最小堆属性。
// 直接 break 跳出循环
if (comparator.compare(node, child) <= 0) {
break;
}
// 将较小值的子节点移动到父节点
queue[k] = child;
// 通知这个子节点,索引位置改变了
child.priorityQueueIndex(this, k);
// 将较小值的子节点位置赋值给 k
// 即移动到树的下一层,寻找当前bubbleDown节点的位置
k = iChild;
}
//我们已经找到了节点的位置,并且仍然满足最小堆属性,因此将它放入队列中。
queue[k] = node;
node.priorityQueueIndex(this, k);
}
为了保持最小堆属性,向下遍历树,将节点值
node
放到合适的位置。因为当前k
位置的节点值node
可能比它的子节点的值大。
2.1.5.4 bubbleUp
方法
/**
* 调用这个方法,表示当前 k 位置的节点值 node 可能比它的父节点的值小;
* 为了保持最小堆属性,因此向上遍历树,将节点值 node 放到合适的位置
*/
private void bubbleUp(int k, T node) {
// 当k==0时,就到了堆二叉树的根节点了,跳出循环
while (k > 0) {
// 父节点位置坐标, 相当于(k - 1) / 2
int iParent = (k - 1) >>> 1;
// 获取父节点位置元素
T parent = queue[iParent];
// 如果 bubbleUp节点node 大于等于父节点,那么我们找到了一个插入点,并且仍然保持最小堆属性。
if (comparator.compare(node, parent) >= 0) {
break;
}
// 否则就将父节点元素存放到k位置
queue[k] = parent;
parent.priorityQueueIndex(this, k);
// 重新赋值k,寻找元素key应该插入到堆二叉树的那个节点
k = iParent;
}
//我们已经找到了节点的位置,并且仍然满足最小堆属性,因此将它放入队列中。
queue[k] = node;
node.priorityQueueIndex(this, k);
}
为了保持最小堆属性,向上遍历树,将节点值
node
放到合适的位置。因为当前k
位置的节点值node
可能比它的父节点的值小。
2.1.5.5 插入节点
@Override
public boolean offer(T e) {
if (e.priorityQueueIndex(this) != INDEX_NOT_IN_QUEUE) {
throw new IllegalArgumentException("e.priorityQueueIndex(): " + e.priorityQueueIndex(this) +
" (expected: " + INDEX_NOT_IN_QUEUE + ") + e: " + e);
}
// Check that the array capacity is enough to hold values by doubling capacity.
// 检查节点数量 size 是否已经超过数组queue容量。
// 如果是的话,就将数组queue容量扩大
if (size >= queue.length) {
// 使用允许初始容量为0的策略。
// 与JDK的优先级队列策略相同,“小”时加倍,“大”时增加50%。
queue = Arrays.copyOf(queue, queue.length + ((queue.length < 64) ?
(queue.length + 2) :
(queue.length >>> 1)));
}
// 计划在最后一个位置插入节点元素e,
// 然后向上遍历树,保持最小堆属性。
bubbleUp(size++, e);
return true;
}
2.1.5.6 得到堆顶元素
// 得到堆顶元素
@Override
public T poll() {
// 如果队列为空,返回 null
if (size == 0) {
return null;
}
// 记录树根节点
T result = queue[0];
// 设置树根节点的索引值
result.priorityQueueIndex(this, INDEX_NOT_IN_QUEUE);
// 得到堆最后一个节点, 并将队列节点数量 size 值减一
T last = queue[--size];
queue[size] = null;
if (size != 0) {
// 从树根向下遍历,保持最小堆属性。
bubbleDown(0, last);
}
return result;
}
2.1.5.7 查看堆顶节点
@Override
public T peek() {
return (size == 0) ? null : queue[0];
}
2.1.5.8 删除指定节点
public boolean remove(Object o) {
final T node;
try {
// 只有是 PriorityQueueNode 的实例,才有可能删除
node = (T) o;
} catch (ClassCastException e) {
return false;
}
return removeTyped(node);
}
public boolean removeTyped(T node) {
// 获取节点对应的索引,可以快速查找
int i = node.priorityQueueIndex(this);
// 队列是否包含这个节点node
if (!contains(node, i)) {
return false;
}
// 改变这个节点的索引
node.priorityQueueIndex(this, INDEX_NOT_IN_QUEUE);
if (--size == 0 || size == i) {
// 如果没有节点剩下,或者这是数组中的最后一个节点,就不涉及到树的改动了,直接删除并返回
queue[i] = null;
return true;
}
// 将最后一个节点值移动到要删除节点位置i
T moved = queue[i] = queue[size];
// 最后一个节点值设置为 null
queue[size] = null;
// 为了确保移动的节点仍然保留最小堆属性
if (comparator.compare(node, moved) < 0) {
// 删除节点值node 小于 最后一个节点值 moved,
// 这就说明 moved 放到 i 位置,肯定是大于 i 的父节点的值,
// 那么从 i 向上的树是满足最小堆属性的,
// 但是从 i 向下的树,就不一定了,
// 所以需要bubbleDown(i, moved) 方法,保持最小堆属性。
bubbleDown(i, moved);
} else {
// 删除节点值node 大于等于 最后一个节点值 moved,
// 这就说明 moved 放到 i 位置,肯定是大于 i 的左右子节点的值,
// 那么从 i 向下的树是满足最小堆属性的,
// 但是从 i 向上的树,就不一定了,
// 所以需要 bubbleUp(i, moved) 方法,保持最小堆属性。
bubbleUp(i, moved);
}
return true;
}
2.2 AbstractScheduledEventExecutor
重要方法
2.2.1 获取计划任务
-
获取正准备执行的计划任务
/** * 返回已经到了截止时间的计划任务,即准备执行的 Runnable * 如果没有,那么返回 null */ protected final Runnable pollScheduledTask() { return pollScheduledTask(nanoTime()); } /** * 返回准备用给定的 nanoTime 内要执行执行的计划任务 Runnable。 * 您应该使用 nanoTime() 来获取正确的 nanoTime 值。 */ protected final Runnable pollScheduledTask(long nanoTime) { assert inEventLoop(); // 获得最早需要执行的计划任务 ScheduledFutureTask<?> scheduledTask = peekScheduledTask(); // 最早需要执行的计划任务为 null, 或者计划任务的截止时间 deadlineNanos 大于 nanoTime 时间, // 那么就没有要执行的计划任务,返回 null if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) { return null; } // 从队列中移除 scheduledTaskQueue.remove(); // 将这个计划任务的截止时间设置为 0 scheduledTask.setConsumed(); return scheduledTask; }
如果有这个计划任务,会将它从计划任务队列
scheduledTaskQueue
中删除。 -
查看计划任务队列头的计划任务
/** * 获取优先级队列的头节点,即最早需要执行的计划任务,当然也有可能是 null */ final ScheduledFutureTask<?> peekScheduledTask() { Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue; return scheduledTaskQueue != null ? scheduledTaskQueue.peek() : null; }
-
获取队列头计划任务剩余时间
/** * 返回下一个计划任务准备运行的剩余时间的纳秒值, * 如果计划任务队列为空,那么返回 -1 */ protected final long nextScheduledTaskNano() { // 得到下一个计划任务 ScheduledFutureTask<?> scheduledTask = peekScheduledTask(); // 返回计划任务准备运行的剩余时间 return scheduledTask != null ? scheduledTask.delayNanos() : -1; }
-
获取队列头计划任务截止时间
/** * 返回下一个计划任务准备运行的截止时间纳秒值 deadlineNanos, * (注 剩余时间就是截止时间 deadlineNanos 减去当前时间 nanoTime()) * 如果计划任务队列为空,那么返回 -1 */ protected final long nextScheduledTaskDeadlineNanos() { // 得到下一个计划任务 ScheduledFutureTask<?> scheduledTask = peekScheduledTask(); return scheduledTask != null ? scheduledTask.deadlineNanos() : -1; }
-
取消所有计划任务
/** * 取消所有计划任务。 * 只有当inEventLoop()为真时才必须调用此方法。 */ protected void cancelScheduledTasks() { assert inEventLoop(); PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue; // 没有计划任务,直接返回 if (isNullOrEmpty(scheduledTaskQueue)) { return; } final ScheduledFutureTask<?>[] scheduledTasks = scheduledTaskQueue.toArray(new ScheduledFutureTask<?>[0]); for (ScheduledFutureTask<?> task: scheduledTasks) { // 取消计划任务 task.cancelWithoutRemove(false); } scheduledTaskQueue.clearIgnoringIndexes(); }
2.2.2 发布计划任务
- 发布延时任务
// 发布延时任务 @Override public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { ObjectUtil.checkNotNull(command, "command"); ObjectUtil.checkNotNull(unit, "unit"); // 延时时间不可能是负数 if (delay < 0) { delay = 0; } validateScheduled0(delay, unit); // 创建一个 ScheduledFutureTask 任务对象,通过 schedule 方法发布 return schedule(new ScheduledFutureTask<Void>( this, command, // 通过 deadlineNanos 方法,获取截止时间 deadlineNanos(unit.toNanos(delay)))); } // 发布延时任务 @Override public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { ObjectUtil.checkNotNull(callable, "callable"); ObjectUtil.checkNotNull(unit, "unit"); if (delay < 0) { delay = 0; } validateScheduled0(delay, unit); // 创建一个 ScheduledFutureTask 任务对象,通过 schedule 方法发布 // 通过 deadlineNanos 方法,获取截止时间 return schedule(new ScheduledFutureTask<V>(this, callable, deadlineNanos(unit.toNanos(delay)))); }
- 发布固定周期的周期性计划任务
// 发布固定周期的周期性计划任务 @Override public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { ObjectUtil.checkNotNull(command, "command"); ObjectUtil.checkNotNull(unit, "unit"); if (initialDelay < 0) { throw new IllegalArgumentException( String.format("initialDelay: %d (expected: >= 0)", initialDelay)); } if (period <= 0) { throw new IllegalArgumentException( String.format("period: %d (expected: > 0)", period)); } validateScheduled0(initialDelay, unit); validateScheduled0(period, unit); // 创建一个 ScheduledFutureTask 任务对象,通过 schedule 方法发布 // 通过 deadlineNanos 方法,获取截止时间 // 因为是 固定周期, 所以 period 是正数 unit.toNanos(period) return schedule(new ScheduledFutureTask<Void>( this, command, deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period))); }
- 发布固定延时的周期性计划任务
// 发布固定延时的周期性计划任务 @Override public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { ObjectUtil.checkNotNull(command, "command"); ObjectUtil.checkNotNull(unit, "unit"); if (initialDelay < 0) { throw new IllegalArgumentException( String.format("initialDelay: %d (expected: >= 0)", initialDelay)); } if (delay <= 0) { throw new IllegalArgumentException( String.format("delay: %d (expected: > 0)", delay)); } validateScheduled0(initialDelay, unit); validateScheduled0(delay, unit); // 创建一个 ScheduledFutureTask 任务对象,通过 schedule 方法发布 // 通过 deadlineNanos 方法,获取截止时间 // 因为是 固定周期, 所以 period 是负数 -unit.toNanos(period) return schedule(new ScheduledFutureTask<Void>( this, command, deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay))); }
- schedule(final ScheduledFutureTask<V> task)
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) { if (inEventLoop()) { // 如果当前线程就是执行器的线程,那么直接将计划任务 task 添加到计划任务队列中 scheduleFromEventLoop(task); } else { // 获取计划任务的截止时间 final long deadlineNanos = task.deadlineNanos(); if (beforeScheduledTaskSubmitted(deadlineNanos)) { // 如果任务未过期,则通过 execute 方法在运行时将自己添加到计划任务队列中 execute(task); } else { // 如果任务已经过期,通过 lazyExecute 方法将在运行时从计划任务队列中删除自己 lazyExecute(task); if (afterScheduledTaskSubmitted(deadlineNanos)) { execute(WAKEUP_TASK); } } } return task; }
2.2.3 添加计划任务
/**
* 将这个计划任务 task 添加到计划任务优先级队列中
* @param task
*/
final void scheduleFromEventLoop(final ScheduledFutureTask<?> task) {
// nextTaskId是一个long,所以它不可能溢出回 0
scheduledTaskQueue().add(task.setId(++nextTaskId));
}
2.2.4 删除计划任务
/**
* 从计划任务队列中删除 计划任务task
*/
final void removeScheduled(final ScheduledFutureTask<?> task) {
assert task.isCancelled();
if (inEventLoop()) {
// 将计划任务从计划任务队列中删除
scheduledTaskQueue().removeTyped(task);
} else {
// 任务将在运行时从计划任务队列中删除自己
lazyExecute(task);
}
}
三. 小结
下一章我们在讲解 EventExecutor
重要的实现基类 SingleThreadEventExecutor
。
网友评论