美文网首页Java
Netty源码_EventExecutor和EventLoop抽

Netty源码_EventExecutor和EventLoop抽

作者: wo883721 | 来源:发表于2021-10-25 19:38 被阅读0次

上一章我们详细介绍EventExecutorEventLoop接口原理,这一章我们来讲解它们的实现原理。

一. AbstractEventExecutor 类

/**
 * EventExecutor 实现的抽象基类。
 */
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
    .....
}

这个类是 EventExecutor 接口实现的抽象基类,它基础自 AbstractExecutorService 抽样类。

AbstractExecutorService 抽样类实现了 ExecutorService 接口除了 execute(Runnable) 方法外的所有方法,具体请看Executor与ExecutorService原理分析

/**
 * EventExecutor 实现的抽象基类。
 */
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEventExecutor.class);

    // 调用 shutdownGracefully 方法默认安静期时间
    static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2;
    // 调用 shutdownGracefully 方法默认 shutdown 超时时间
    static final long DEFAULT_SHUTDOWN_TIMEOUT = 15;

    // 父的 EventExecutorGroup,由构造方法中传递
    private final EventExecutorGroup parent;
    // 只包含自身的集合 selfCollection
    private final Collection<EventExecutor> selfCollection = Collections.<EventExecutor>singleton(this);

    protected AbstractEventExecutor() {
        this(null);
    }

    protected AbstractEventExecutor(EventExecutorGroup parent) {
        this.parent = parent;
    }

    @Override
    public EventExecutorGroup parent() {
        return parent;
    }

    /**
     * @return 返回值就是自身
     */
    @Override
    public EventExecutor next() {
        return this;
    }

    @Override
    public boolean inEventLoop() {
        return inEventLoop(Thread.currentThread());
    }

    /**
     *
     * @return 只返回包含自身的迭代器
     */
    @Override
    public Iterator<EventExecutor> iterator() {
        return selfCollection.iterator();
    }

    /**
     * 调用 shutdownGracefully(long, long, TimeUnit) 方法, 使用默认值
     */
    @Override
    public Future<?> shutdownGracefully() {
        return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
    }

    /**
     * 被 @deprecated,期望使用者调用 shutdownGracefully(long, long, TimeUnit) 或 shutdownGracefully()
     */
    @Override
    @Deprecated
    public abstract void shutdown();

    /**
     * 被 @deprecated,期望使用者调用 shutdownGracefully(long, long, TimeUnit) 或 shutdownGracefully()
     */
    @Override
    @Deprecated
    public List<Runnable> shutdownNow() {
        // 调用 shutdown 方法
        shutdown();
        // 返回空集合
        return Collections.emptyList();
    }

    /**
     * 直接返回新创建的 DefaultPromise 对象
     */
    @Override
    public <V> Promise<V> newPromise() {
        return new DefaultPromise<V>(this);
    }

    /**
     * 直接返回新创建的 DefaultProgressivePromise 对象
     */
    @Override
    public <V> ProgressivePromise<V> newProgressivePromise() {
        return new DefaultProgressivePromise<V>(this);
    }

    /**
     * 直接返回新创建的 SucceededFuture 对象
     */
    @Override
    public <V> Future<V> newSucceededFuture(V result) {
        return new SucceededFuture<V>(this, result);
    }

    /**
     * 直接返回新创建的 FailedFuture 对象
     */
    @Override
    public <V> Future<V> newFailedFuture(Throwable cause) {
        return new FailedFuture<V>(this, cause);
    }

    /**
     * @param task
     * @return 要将返回值变成 netty 的 Future
     */
    @Override
    public Future<?> submit(Runnable task) {
        return (Future<?>) super.submit(task);
    }

    /**
     * @return  要将返回值变成 netty 的 Future
     */
    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return (Future<T>) super.submit(task, result);
    }

    /**
     * @return  要将返回值变成 netty 的 Future
     */
    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return (Future<T>) super.submit(task);
    }

    /**
     * 必须复写 newTaskFor 方法,改变 AbstractExecutorService 的实现;
     * 将返回值的 RunnableFuture 变成 netty 的 RunnableFuture 实现类 PromiseTask
     */
    @Override
    protected final <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new PromiseTask<T>(this, runnable, value);
    }

    /**
     * 必须复写 newTaskFor 方法,改变 AbstractExecutorService 的实现;
     * 将返回值的 RunnableFuture 变成 netty 的 RunnableFuture 实现类 PromiseTask
     */
    @Override
    protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new PromiseTask<T>(this, callable);
    }

    /**
     * 当前 AbstractEventExecutor 不支持延时和周期性任务
     */
    @Override
    public ScheduledFuture<?> schedule(Runnable command, long delay,
                                       TimeUnit unit) {
        throw new UnsupportedOperationException();
    }

    /**
     * 当前 AbstractEventExecutor 不支持延时和周期性任务
     */
    @Override
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
        throw new UnsupportedOperationException();
    }

    /**
     * 当前 AbstractEventExecutor 不支持延时和周期性任务
     */
    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
        throw new UnsupportedOperationException();
    }

    /**
     * 当前 AbstractEventExecutor 不支持延时和周期性任务
     */
    @Override
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
        throw new UnsupportedOperationException();
    }

    /**
     * 尝试执行给定的Runnable并记录它是否抛出Throwable。
     */
    protected static void safeExecute(Runnable task) {
        try {
            task.run();
        } catch (Throwable t) {
            logger.warn("A task raised an exception. Task: {}", task, t);
        }
    }

    /**
     * 与execute(Runnable)类似,但不保证任务将运行,直到非惰性任务被执行或执行器关闭。
     * 默认实现只是委托执行(Runnable)
     */
    @UnstableApi
    public void lazyExecute(Runnable task) {
        execute(task);
    }

    /**
     * Runnable的标记接口,指示它应该排队等待执行,但不需要立即运行。
     */
    @UnstableApi
    public interface LazyRunnable extends Runnable { }
}

在这个抽象基类中:

  • 实现了一些基础方法,例如创建一些 FuturePromise 方法等。
  • 复写了 AbstractExecutorServicenewTaskFor 方法,返回 nettyRunnableFuture 实现类 PromiseTask
  • 不支持 schedule 系列方法。

二. AbstractScheduledEventExecutor 类

/**
 * 要支持调度的 EventExecutor 的抽象基类。
 */
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
      ....
}

AbstractScheduledEventExecutor 继承自 AbstractEventExecutor 类,它来实现 EventExecutor 延迟或周期性任务的功能。

我们首先思考一个问题,如何实现延迟或周期性的任务呢?

如果你看过我的 ScheduledThreadPoolExecutor原理分析 这篇文章,你会发现要实现延迟或周期性的功能,其实也很简单,这里面最重要的就是优先级队列

优先级队列就是队列中的数据是按照优先级排列的,我们每次从优先级队列中获取数据,一定是优先级最低(或者最高的,这个取决于队列的排列顺序),这样我们就很轻松实现延迟或周期性任务的功能了。

  • 将延迟任务的延迟时间和周期性任务这次周期时间作为优先级,这样从优先级队列获取的任务,一定是队列中最快要执行的任务。
  • 对于周期性任务,一次执行完成之后,分为两种情况:
    • 固定延时,将周期时间作为延时时间,将这个任务再次加入到 优先级队列中。
    • 固定周期,比较周期时间减去任务执行时间的结果作为延时时间,将这个任务再次加入到 优先级队列中;如果结果小于 0,那说明不需要延时,直接执行任务。

我们先来讲解一下 AbstractScheduledEventExecutor 中的 优先级队列 PriorityQueue

2.1 优先级队列

jdkScheduledThreadPoolExecutor 实现中,使用 DelayedWorkQueue 作为优先级队列的实现。

但是因为ScheduledThreadPoolExecutor需要考虑并发问题,DelayedWorkQueue 是实现了 BlockingQueue 接口,里面就使用了并发锁 ReentrantLock
EventExecutor 管理的任务,一定只会在EventExecutor依赖的线程中执行,不存在并发冲突问题,所以它的优先级队列实现是不需要加锁的。

如何实现一个优先级队列,最简单高效的方法就是使用堆排序

2.1.1 堆排序

常用排序算法总结DelayedWorkQueue 这两篇文章中,都详细介绍了堆排序,这里简单地说一下。

2.1.1.1 堆的介绍

  • 堆是一个完全二叉树,即除了最后一层节点不是满的,其他层节点都是满的,即左右节点都有。
  • 堆左和右节点的值都比父节点值小(或者都比父节点值大,取决于从大到小排序,还是从小到大排序)。
  • 堆可以实现快速的插入和删除,效率都在(log N)左右,所以它可以实现优先级队列。

因为堆是一个完全二叉树,所以可以使用数组去存储,即

// 对于n位置的节点来说:
int left =  2 * n + 1; // 左子节点,一般使用位运算 (n << 1) + 1
int right = 2 * n + 2; // 右子节点  一般使用位运算 (n << 1) + 2

// 父节点,当然n要大于0,根节点是没有父节点的。
//  一般使用位运算 (n - 1) >>> 1
int parent = (n - 1) / 2; 

2.1.1.2 堆的插入

必须保证插入后的堆还是完全二叉树;父节点的值不能小于子节点的值(或者不能大于子节点的值)。

  public void insert(int value) {
         // 第一步将插入的值,直接放在最后一个位置。并将长度加一
         store[size++] = value;
         // 得到新插入值所在位置。
         int index = size - 1;
         while(index > 0) {
             // 它的父节点位置坐标
             int parentIndex = (index - 1) / 2;
             // 如果父节点的值小于子节点的值,你不满足堆的条件,那么就交换值
             if (store[index] > store[parentIndex]) {
                 swap(store, index, parentIndex);
                 index = parentIndex;
             } else {
                 // 否则表示这条路径上的值已经满足降序,跳出循环
                 break;
             }
         }
     }

2.1.1.3 堆的删除

必须保证删除后的堆还是完全二叉树;父节点的值不能小于子节点的值(或者不能大于子节点的值)。

   public int remove() {
          // 将根的值记录,最后返回
          int result = store[0];
          // 将最后位置的值放到根节点位置
          store[0] = store[--size];
          int index = 0;
          // 通过循环,保证父节点的值不能小于子节点。
          while(true) {
              int leftIndex = 2 * index + 1; // 左子节点
              int rightIndex = 2 * index + 2; // 右子节点
              // leftIndex >= size 表示这个子节点还没有值。
              if (leftIndex >= size) break;
              int maxIndex = leftIndex;
              if (store[leftIndex] < store[rightIndex]) maxIndex = rightIndex;
              if (store[index] < store[maxIndex]) {
                  swap(store, index, maxIndex);
                  index = maxIndex;
              } else {
                  break;
              }
          }
          return result;
      }

2.1.2 PriorityQueue 接口

public interface PriorityQueue<T> extends Queue<T> {
    /**
     * 与remove(Object)方法相同,但使用泛型进行类型化。
     */
    boolean removeTyped(T node);

    /**
     * 与contains(Object)方法相同,但使用泛型进行类型化。
     */
    boolean containsTyped(T node);

    /**
     * 通知队列节点的优先级已更改。
     * 队列将进行调整,以确保优先队列属性得到维护。
     * @param node 队列中的对象,该对象的优先级可能发生了变化。
     */
    void priorityChanged(T node);

    /**
     * 与 clear() 方法相比,本方法只是将size 重新设置成 0;
     * clear() 方法还会将原先队列中所有节点的优先级改变,以及清空队列数据。
     * 具体可以看 DefaultPriorityQueue 的实现
     *
     * 只有当确定节点不会被重新插入到这个或任何其他的 PriorityQueue 中,
     * 并且知道 PriorityQueue 本身将在调用之后被垃圾回收时,才应该使用这个方法。
     */
    void clearIgnoringIndexes();
}

基本上都是队列 Queue 接口的方法,提供了两个泛型的方法,以及节点优先级变化的通知方法。

2.1.3 PriorityQueueNode 接口

/**
 * 为 DefaultPriorityQueue 提供方法以维护内部状态。
 * 这些方法通常不应在 DefaultPriorityQueue 范围之外使用。
 */
public interface PriorityQueueNode {
    /**
     * 表示节点不在队列中
     */
    int INDEX_NOT_IN_QUEUE = -1;

    /**
     * 获取由 priorityQueueIndex(DefaultPriorityQueue, int) 为queue对应的值设置的最后一个值。
     */
    int priorityQueueIndex(DefaultPriorityQueue<?> queue);

    /**
     * 由 DefaultPriorityQueue 用于维护队列中元素的状态。
     */
    void priorityQueueIndex(DefaultPriorityQueue<?> queue, int i);
}

表示优先级队列中节点的接口类,主要提供两个设置节点索引的方法。

2.1.4 ScheduledFutureTask 类

/**
 * 它是一个优先级PriorityQueueNode 节点,也代表一个延时或者周期性任务 PromiseTask
 */
@SuppressWarnings("ComparableImplementedButEqualsNotOverridden")
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V>, PriorityQueueNode {
     .......
}

它即使优先级队列中的一个节点 PriorityQueueNode,也代表一个延时或者周期性任务 PromiseTask

2.1.4.1 基准时间 START_TIME

我们不管是延时任务还是周期性任务都有任务执行的截止时间 deadlineNanos,截止时间到了就会执行任务。
而一般这个时间值会很大,通过 System.nanoTime() 得到。
所以 netty 设置了一个基准时间 START_TIME

    /**
     * 程序开始时间的纳秒值,因此所有的延时时间都是在这个 START_TIME 基础上定义的。
     */
    private static final long START_TIME = System.nanoTime();

netty 中涉及到的延时时间都是在 START_TIME 基础上计算得到的:

  1. 当前程序的执行时间纳秒值
     /**
      * 程序执行了多少纳秒值。
      * 因为是当前纳秒值和程序开始时间纳秒值之差。
      */
     static long nanoTime() {
         return System.nanoTime() - START_TIME;
     }
    
  2. 用户定义延时时间对应的截止时间
     /**
      * 将用户自定义的延时时间 转换成 截止时间 deadlineNanos,即 System.nanoTime() - START_TIME + delay
      */
     static long deadlineNanos(long delay) {
         long deadlineNanos = nanoTime() + delay;
         // Guard against overflow
         return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos;
     }
    
  3. 获取当前剩余的时间
     /**
      * 当前剩余的时间
      * 即 使用截止时间deadlineNanos - 当前时间真实值 nanoTime()
      * @return
      */
     public long delayNanos() {
         return deadlineToDelayNanos(deadlineNanos());
     }
    
     /**
      * 截止时间转换成剩余时间
      * @param deadlineNanos
      * @return
      */
     static long deadlineToDelayNanos(long deadlineNanos) {
         // 最小都是 0
         return deadlineNanos == 0L ? 0L : Math.max(0L, deadlineNanos - nanoTime());
     }
    
  4. 获取指定时间戳后剩余的时间
     /**
      * 获取到指定时间戳 currentTimeNanos 时,还剩余的时间纳秒值
      * 即 截止时间deadlineNanos - ( currentTimeNanos - START_TIME)
      * @param currentTimeNanos
      * @return
      */
     public long delayNanos(long currentTimeNanos) {
         // 最小都是 0
         return deadlineNanos == 0L ? 0L
                 : Math.max(0L, deadlineNanos() - (currentTimeNanos - START_TIME));
     }
    

2.1.4.2 构造函数

  1. 创建延时任务
     /**
      * 创建一个延时任务
      */
     ScheduledFutureTask(AbstractScheduledEventExecutor executor,
             Runnable runnable, long nanoTime) {
    
         super(executor, runnable);
         deadlineNanos = nanoTime;
         // 因为是延时任务,periodNanos 就是 0
         periodNanos = 0;
     }
    
     /**
      * 创建一个延时任务
      */
     ScheduledFutureTask(AbstractScheduledEventExecutor executor,
             Callable<V> callable, long nanoTime) {
    
         super(executor, callable);
         deadlineNanos = nanoTime;
         // 因为是延时任务,periodNanos 就是 0
         periodNanos = 0;
     }
    
  2. 创建周期性任务
     /**
      * 创建一个周期性任务
      */
     ScheduledFutureTask(AbstractScheduledEventExecutor executor,
             Runnable runnable, long nanoTime, long period) {
    
         super(executor, runnable);
         deadlineNanos = nanoTime;
         periodNanos = validatePeriod(period);
     }
    
     /**
      * 创建一个周期性任务
      */
     ScheduledFutureTask(AbstractScheduledEventExecutor executor,
             Callable<V> callable, long nanoTime, long period) {
    
         super(executor, callable);
         deadlineNanos = nanoTime;
         periodNanos = validatePeriod(period);
     }
    

2.1.4.3 运行任务

    // 任务运行
    @Override
    public void run() {
        // 当前线程是否在执行器的线程
        assert executor().inEventLoop();
        try {
            // delayNanos() > 0L 表示任务截止时间还没有到
            if (delayNanos() > 0L) {
                if (isCancelled()) {
                    // isCancelled() 为 true,任务已经被取消,那么就从列表中移除
                    scheduledExecutor().scheduledTaskQueue().removeTyped(this);
                } else {
                    // 否则将任务重新放回队列
                    scheduledExecutor().scheduleFromEventLoop(this);
                }
                return;
            }
            //下面都是任务截止时间已经到了
            if (periodNanos == 0) {
                // periodNanos == 0 表示只是延时任务。
                // 先将任务设置成不可取消
                if (setUncancellableInternal()) {
                    // 执行任务
                    V result = runTask();
                    // 设置 PromiseTask 为成功,进行通知
                    setSuccessInternal(result);
                }
            } else {
                // 检查任务是否被取消
                if (!isCancelled()) {
                    // 执行任务
                    runTask();
                    // 如果执行器是否被终止
                    if (!executor().isShutdown()) {
                        if (periodNanos > 0) {
                            // periodNanos > 0 表示固定周期,那么下一次执行时间就是
                            // 本次截止时间deadlineNanos + 周期时间 periodNanos
                            // 但是这个值可能小于当前时间啊,只要任务执行时间比周期时间 periodNanos大,
                            // 那么这个值就小于当前时间。就代表会立即运行
                            deadlineNanos += periodNanos;
                        } else {
                            // periodNanos < 0 表示固定延时。
                            // 使用当前时间 nanoTime() 加上固定延时时间(- periodNanos)
                            deadlineNanos = nanoTime() - periodNanos;
                        }
                        if (!isCancelled()) {
                            scheduledExecutor().scheduledTaskQueue().add(this);
                        }
                    }
                }
            }
        } catch (Throwable cause) {
            setFailureInternal(cause);
        }
    }

2.1.5 DefaultPriorityQueue 类

/**
 * 优先级队列使用元素的自然排序。
 * 为了维护优先级队列中的索引,元素还需要是 PriorityQueueNode 类型。
 */
public final class DefaultPriorityQueue<T extends PriorityQueueNode> extends AbstractQueue<T>
                                                                     implements PriorityQueue<T> {
    private static final PriorityQueueNode[] EMPTY_ARRAY = new PriorityQueueNode[0];
    // 使用比较器 Comparator 进行排序
    private final Comparator<T> comparator;
    // 使用数组存储堆
    private T[] queue;
    // 表示队列中节点的数量
    private int size;

    @SuppressWarnings("unchecked")
    public DefaultPriorityQueue(Comparator<T> comparator, int initialSize) {
        this.comparator = ObjectUtil.checkNotNull(comparator, "comparator");
        queue = (T[]) (initialSize != 0 ? new PriorityQueueNode[initialSize] : EMPTY_ARRAY);
    }
    .......
}

使用数组存储堆的数据。

2.1.5.1 清除方法

    @Override
    public void clear() {
        // 遍历队列中的元素节点
        for (int i = 0; i < size; ++i) {
            T node = queue[i];
            if (node != null) {
                // 设置节点的索引是 INDEX_NOT_IN_QUEUE,表示节点不在队列中了
                node.priorityQueueIndex(this, INDEX_NOT_IN_QUEUE);
                // 将引用设置为 null, 方便 gc
                queue[i] = null;
            }
        }
        size = 0;
    }

    @Override
    public void clearIgnoringIndexes() {
        // 只是将 size 变成 0
        size = 0;
    }

可以看出 clearIgnoringIndexes() 只是设置了 size的值,只有在确定节点不会被重新插入到这个或任何其他的PriorityQueue 中,并且这个PriorityQueue 队列本身将在调用之后被垃圾回收时,才应该使用这个方法。

2.1.5.2 包含节点

    // 队列是否包含 o 对象
    @Override
    public boolean contains(Object o) {
        // 不是 PriorityQueueNode 的实例,直接返回 false
        if (!(o instanceof PriorityQueueNode)) {
            return false;
        }
        PriorityQueueNode node = (PriorityQueueNode) o;
        // 是 PriorityQueueNode 实例,就可以利用索引快速判断
        return contains(node, node.priorityQueueIndex(this));
    }

    @Override
    public boolean containsTyped(T node) {
        return contains(node, node.priorityQueueIndex(this));
    }

    /**
     * 判断节点是不是在队列中,
     * 因为节点都是 PriorityQueueNode 类型,可以获取对应的索引 i,
     * 这样就可以直接通过索引从数组queue中获取数据进行比较,提高效率。
     */
    private boolean contains(PriorityQueueNode node, int i) {
        return i >= 0 && i < size && node.equals(queue[i]);
    }

利用 PriorityQueueNode 类型节点包含索引,可以快速判断,节点是不是包含在队列中。

2.1.5.3 bubbleDown 方法

    /**
     * 调用这个方法,表示当前 k 位置的节点值 node 可能比它的子节点的值大;
     * 为了保持最小堆属性,因此向下遍历树,将节点值 node 放到合适的位置
     */
    private void bubbleDown(int k, T node) {
        // size 是树的最底一层, size >>> 1 就表示最底一层节点的父节点
        final int half = size >>> 1;
        // 通过循环,保证父节点的值不能大于子节点。
        while (k < half) {
            // 左子节点, 相当于 (k * 2) + 1
            int iChild = (k << 1) + 1;
            // 左节点存储的值 child
            T child = queue[iChild];

            // 右子节点
            int rightChild = iChild + 1;
            // 当右节点在队列中,且左节点大于右节点,右节点才是较小的子节点,那么进行交换
            if (rightChild < size && comparator.compare(child, queue[rightChild]) > 0) {
                child = queue[iChild = rightChild];
            }
            // 当 bubbleDown节点node 的值小于或者等于当前较小的子节点的值,
            // 那么我们将通过在这里插入bubbleDown节点来保持最小堆属性。
            // 直接 break 跳出循环
            if (comparator.compare(node, child) <= 0) {
                break;
            }

            // 将较小值的子节点移动到父节点
            queue[k] = child;
            // 通知这个子节点,索引位置改变了
            child.priorityQueueIndex(this, k);

            // 将较小值的子节点位置赋值给 k
            // 即移动到树的下一层,寻找当前bubbleDown节点的位置
            k = iChild;
        }

        //我们已经找到了节点的位置,并且仍然满足最小堆属性,因此将它放入队列中。
        queue[k] = node;
        node.priorityQueueIndex(this, k);
    }

为了保持最小堆属性,向下遍历树,将节点值 node 放到合适的位置。因为当前 k 位置的节点值 node 可能比它的子节点的值大。

2.1.5.4 bubbleUp 方法

    /**
     * 调用这个方法,表示当前 k 位置的节点值 node 可能比它的父节点的值小;
     * 为了保持最小堆属性,因此向上遍历树,将节点值 node 放到合适的位置
     */
    private void bubbleUp(int k, T node) {
        // 当k==0时,就到了堆二叉树的根节点了,跳出循环
        while (k > 0) {
            // 父节点位置坐标, 相当于(k - 1) / 2
            int iParent = (k - 1) >>> 1;
            // 获取父节点位置元素
            T parent = queue[iParent];

            // 如果 bubbleUp节点node 大于等于父节点,那么我们找到了一个插入点,并且仍然保持最小堆属性。
            if (comparator.compare(node, parent) >= 0) {
                break;
            }

            // 否则就将父节点元素存放到k位置
            queue[k] = parent;
            parent.priorityQueueIndex(this, k);

            // 重新赋值k,寻找元素key应该插入到堆二叉树的那个节点
            k = iParent;
        }

        //我们已经找到了节点的位置,并且仍然满足最小堆属性,因此将它放入队列中。
        queue[k] = node;
        node.priorityQueueIndex(this, k);
    }

为了保持最小堆属性,向上遍历树,将节点值 node 放到合适的位置。因为当前 k 位置的节点值 node 可能比它的父节点的值小。

2.1.5.5 插入节点

    @Override
    public boolean offer(T e) {
        if (e.priorityQueueIndex(this) != INDEX_NOT_IN_QUEUE) {
            throw new IllegalArgumentException("e.priorityQueueIndex(): " + e.priorityQueueIndex(this) +
                    " (expected: " + INDEX_NOT_IN_QUEUE + ") + e: " + e);
        }

        // Check that the array capacity is enough to hold values by doubling capacity.
        // 检查节点数量 size 是否已经超过数组queue容量。
        // 如果是的话,就将数组queue容量扩大
        if (size >= queue.length) {
            // 使用允许初始容量为0的策略。
            // 与JDK的优先级队列策略相同,“小”时加倍,“大”时增加50%。
            queue = Arrays.copyOf(queue, queue.length + ((queue.length < 64) ?
                                                         (queue.length + 2) :
                                                         (queue.length >>> 1)));
        }

        // 计划在最后一个位置插入节点元素e,
        // 然后向上遍历树,保持最小堆属性。
        bubbleUp(size++, e);
        return true;
    }

2.1.5.6 得到堆顶元素

    // 得到堆顶元素
    @Override
    public T poll() {
        // 如果队列为空,返回 null
        if (size == 0) {
            return null;
        }
        // 记录树根节点
        T result = queue[0];
        // 设置树根节点的索引值
        result.priorityQueueIndex(this, INDEX_NOT_IN_QUEUE);

        // 得到堆最后一个节点, 并将队列节点数量 size 值减一
        T last = queue[--size];
        queue[size] = null;
        if (size != 0) { 
            // 从树根向下遍历,保持最小堆属性。
            bubbleDown(0, last);
        }

        return result;
    }

2.1.5.7 查看堆顶节点

    @Override
    public T peek() {
        return (size == 0) ? null : queue[0];
    }

2.1.5.8 删除指定节点

    public boolean remove(Object o) {
        final T node;
        try {
            // 只有是 PriorityQueueNode 的实例,才有可能删除
            node = (T) o;
        } catch (ClassCastException e) {
            return false;
        }
        return removeTyped(node);
    }

    public boolean removeTyped(T node) {
        // 获取节点对应的索引,可以快速查找
        int i = node.priorityQueueIndex(this);
        // 队列是否包含这个节点node
        if (!contains(node, i)) {
            return false;
        }

        // 改变这个节点的索引
        node.priorityQueueIndex(this, INDEX_NOT_IN_QUEUE);
        if (--size == 0 || size == i) {
            // 如果没有节点剩下,或者这是数组中的最后一个节点,就不涉及到树的改动了,直接删除并返回
            queue[i] = null;
            return true;
        }

        // 将最后一个节点值移动到要删除节点位置i
        T moved = queue[i] = queue[size];
        // 最后一个节点值设置为 null
        queue[size] = null;

        // 为了确保移动的节点仍然保留最小堆属性
        if (comparator.compare(node, moved) < 0) {
            // 删除节点值node 小于 最后一个节点值 moved,
            // 这就说明 moved 放到 i 位置,肯定是大于 i 的父节点的值,
            // 那么从 i 向上的树是满足最小堆属性的,
            // 但是从 i 向下的树,就不一定了,
            // 所以需要bubbleDown(i, moved) 方法,保持最小堆属性。
            bubbleDown(i, moved);
        } else {
            // 删除节点值node 大于等于 最后一个节点值 moved,
            // 这就说明 moved 放到 i 位置,肯定是大于 i 的左右子节点的值,
            // 那么从 i 向下的树是满足最小堆属性的,
            // 但是从 i 向上的树,就不一定了,
            // 所以需要 bubbleUp(i, moved) 方法,保持最小堆属性。
            bubbleUp(i, moved);
        }
        return true;
    }

2.2 AbstractScheduledEventExecutor 重要方法

2.2.1 获取计划任务

  1. 获取正准备执行的计划任务

     /**
      * 返回已经到了截止时间的计划任务,即准备执行的 Runnable
      * 如果没有,那么返回 null
      */
     protected final Runnable pollScheduledTask() {
         return pollScheduledTask(nanoTime());
     }
    
     /**
      * 返回准备用给定的 nanoTime 内要执行执行的计划任务 Runnable。
      * 您应该使用 nanoTime() 来获取正确的 nanoTime 值。
      */
     protected final Runnable pollScheduledTask(long nanoTime) {
         assert inEventLoop();
    
         // 获得最早需要执行的计划任务
         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
         // 最早需要执行的计划任务为 null, 或者计划任务的截止时间 deadlineNanos 大于 nanoTime 时间,
         // 那么就没有要执行的计划任务,返回 null
         if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {
             return null;
         }
         // 从队列中移除
         scheduledTaskQueue.remove();
         // 将这个计划任务的截止时间设置为 0
         scheduledTask.setConsumed();
         return scheduledTask;
     }
    

    如果有这个计划任务,会将它从计划任务队列 scheduledTaskQueue 中删除。

  2. 查看计划任务队列头的计划任务

     /**
      * 获取优先级队列的头节点,即最早需要执行的计划任务,当然也有可能是 null
      */
     final ScheduledFutureTask<?> peekScheduledTask() {
         Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
         return scheduledTaskQueue != null ? scheduledTaskQueue.peek() : null;
     }
    
  3. 获取队列头计划任务剩余时间

     /**
      * 返回下一个计划任务准备运行的剩余时间的纳秒值,
      * 如果计划任务队列为空,那么返回 -1
      */
     protected final long nextScheduledTaskNano() {
         // 得到下一个计划任务
         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
         // 返回计划任务准备运行的剩余时间
         return scheduledTask != null ? scheduledTask.delayNanos() : -1;
     }
    
  4. 获取队列头计划任务截止时间

     /**
      * 返回下一个计划任务准备运行的截止时间纳秒值 deadlineNanos,
      * (注 剩余时间就是截止时间 deadlineNanos 减去当前时间 nanoTime())
      * 如果计划任务队列为空,那么返回 -1
      */
     protected final long nextScheduledTaskDeadlineNanos() {
         // 得到下一个计划任务
         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
         return scheduledTask != null ? scheduledTask.deadlineNanos() : -1;
     }
    
  5. 取消所有计划任务

     /**
      * 取消所有计划任务。
      * 只有当inEventLoop()为真时才必须调用此方法。
      */
     protected void cancelScheduledTasks() {
         assert inEventLoop();
         PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
         // 没有计划任务,直接返回
         if (isNullOrEmpty(scheduledTaskQueue)) {
             return;
         }
    
         final ScheduledFutureTask<?>[] scheduledTasks =
                 scheduledTaskQueue.toArray(new ScheduledFutureTask<?>[0]);
    
         for (ScheduledFutureTask<?> task: scheduledTasks) {
             // 取消计划任务
             task.cancelWithoutRemove(false);
         }
    
         scheduledTaskQueue.clearIgnoringIndexes();
     }
    

2.2.2 发布计划任务

  1. 发布延时任务
     // 发布延时任务
     @Override
     public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
         ObjectUtil.checkNotNull(command, "command");
         ObjectUtil.checkNotNull(unit, "unit");
         // 延时时间不可能是负数
         if (delay < 0) {
             delay = 0;
         }
         validateScheduled0(delay, unit);
    
         // 创建一个 ScheduledFutureTask 任务对象,通过 schedule 方法发布
         return schedule(new ScheduledFutureTask<Void>(
                 this,
                 command,
                 // 通过 deadlineNanos 方法,获取截止时间
                 deadlineNanos(unit.toNanos(delay))));
     }
    
     // 发布延时任务
     @Override
     public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
         ObjectUtil.checkNotNull(callable, "callable");
         ObjectUtil.checkNotNull(unit, "unit");
         if (delay < 0) {
             delay = 0;
         }
         validateScheduled0(delay, unit);
    
         // 创建一个 ScheduledFutureTask 任务对象,通过 schedule 方法发布
         // 通过 deadlineNanos 方法,获取截止时间
         return schedule(new ScheduledFutureTask<V>(this, callable, deadlineNanos(unit.toNanos(delay))));
     }
    
  2. 发布固定周期的周期性计划任务
     // 发布固定周期的周期性计划任务
     @Override
     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
         ObjectUtil.checkNotNull(command, "command");
         ObjectUtil.checkNotNull(unit, "unit");
         if (initialDelay < 0) {
             throw new IllegalArgumentException(
                     String.format("initialDelay: %d (expected: >= 0)", initialDelay));
         }
         if (period <= 0) {
             throw new IllegalArgumentException(
                     String.format("period: %d (expected: > 0)", period));
         }
         validateScheduled0(initialDelay, unit);
         validateScheduled0(period, unit);
         // 创建一个 ScheduledFutureTask 任务对象,通过 schedule 方法发布
         // 通过 deadlineNanos 方法,获取截止时间
         // 因为是 固定周期, 所以 period 是正数 unit.toNanos(period)
         return schedule(new ScheduledFutureTask<Void>(
                 this, command, deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
     }
    
  3. 发布固定延时的周期性计划任务
      // 发布固定延时的周期性计划任务
     @Override
     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
         ObjectUtil.checkNotNull(command, "command");
         ObjectUtil.checkNotNull(unit, "unit");
         if (initialDelay < 0) {
             throw new IllegalArgumentException(
                     String.format("initialDelay: %d (expected: >= 0)", initialDelay));
         }
         if (delay <= 0) {
             throw new IllegalArgumentException(
                     String.format("delay: %d (expected: > 0)", delay));
         }
    
         validateScheduled0(initialDelay, unit);
         validateScheduled0(delay, unit);
         // 创建一个 ScheduledFutureTask 任务对象,通过 schedule 方法发布
         // 通过 deadlineNanos 方法,获取截止时间
         // 因为是 固定周期, 所以 period 是负数 -unit.toNanos(period)
         return schedule(new ScheduledFutureTask<Void>(
                 this, command, deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
     }
    
  4. schedule(final ScheduledFutureTask<V> task)
     private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
         if (inEventLoop()) {
             // 如果当前线程就是执行器的线程,那么直接将计划任务 task 添加到计划任务队列中
             scheduleFromEventLoop(task);
         } else {
             // 获取计划任务的截止时间
             final long deadlineNanos = task.deadlineNanos();
             if (beforeScheduledTaskSubmitted(deadlineNanos)) {
                 // 如果任务未过期,则通过 execute 方法在运行时将自己添加到计划任务队列中
                 execute(task);
             } else {
                 // 如果任务已经过期,通过 lazyExecute 方法将在运行时从计划任务队列中删除自己
                 lazyExecute(task);
                 if (afterScheduledTaskSubmitted(deadlineNanos)) {
                     execute(WAKEUP_TASK);
                 }
             }
         }
         return task;
     }
    

2.2.3 添加计划任务

    /**
     * 将这个计划任务 task 添加到计划任务优先级队列中
     * @param task
     */
    final void scheduleFromEventLoop(final ScheduledFutureTask<?> task) {
        // nextTaskId是一个long,所以它不可能溢出回 0
        scheduledTaskQueue().add(task.setId(++nextTaskId));
    }

2.2.4 删除计划任务

    /**
     * 从计划任务队列中删除 计划任务task
     */
    final void removeScheduled(final ScheduledFutureTask<?> task) {
        assert task.isCancelled();
        if (inEventLoop()) {
            // 将计划任务从计划任务队列中删除
            scheduledTaskQueue().removeTyped(task);
        } else {
            // 任务将在运行时从计划任务队列中删除自己
            lazyExecute(task);
        }
    }

三. 小结

下一章我们在讲解 EventExecutor 重要的实现基类 SingleThreadEventExecutor

相关文章

网友评论

    本文标题:Netty源码_EventExecutor和EventLoop抽

    本文链接:https://www.haomeiwen.com/subject/dtujaltx.html