美文网首页nettyNettynetty
自顶向下深入分析Netty(四)--EventLoop-2

自顶向下深入分析Netty(四)--EventLoop-2

作者: Hypercube | 来源:发表于2016-10-13 15:11 被阅读2530次

    4.4 线程

    4.4.1 AbstractExecutorService

    AbstractExecutorService是JDK并发包中的类,实现了ExecutorService中的submit()和invoke***()方法,关键实现是其中的newTaskFor()方法,使用FutureTask包装一个Ruannble对象和结果或者一个Callable对象。注意,这个方法是一个protected方法,子类中可以覆盖这个实现。

        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask<T>(runnable, value);
        }
        
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new FutureTask<T>(callable);
        }
    

    4.4.2 AbstractEventExecutor

    AbstractEventExecutor继承自AbstractExecutorService并实现了EventExecutor接口,该类中只实现了一些简单的方法:

        public EventExecutor next() {
            return this;
        }    
    
        public boolean inEventLoop() {
            return inEventLoop(Thread.currentThread());
        }
        
        public Future<?> shutdownGracefully() {
            return shutdownGracefully(2, 15, TimeUnit.SECONDS);
        }
    

    next()方法在线程池的讲解中已经接触过,功能是选择线程池中的一个线程,将AbstractEventExecutor看为只有一个线程的线程池,所以next()返回它本身。inEventLoop()和shutdownGracefully()方法都调用它的有参方法,我们将在其子类实现中详细介绍,这里我们先了解其功能即可。inEventLoop()的功能使判断当前线程是否是EventExecutor原生线程,shutdownGracefully()即优雅关闭。
    AbstractEventExecutor类中有四个创建异步结果的方法,实现类似如下:

        public <V> Promise<V> newPromise() {
            return new DefaultPromise<V>(this);
        }
    

    AbstractEventExecutor类覆盖了父类的newTaskFor()方法:

        @Override
        protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new PromiseTask<T>(this, callable);
        }
    

    使用Netty的PromiseTask代替JDK的FutureTask,其中的差别,我们在下一节讲述。此外,还用Netty的Future对象覆盖了subimt()方法的返回值(原本为JDK的Future).

    4.4.3 AbstractScheduledEventExecutor

    从名字可以看出,AbstractScheduledEventExecutor类是关于Schedule的实现。如果要调度一堆任务,那么首先要有存放任务的容器,Netty中使用队列:

        Queue<ScheduledFutureTask<?>> scheduledTaskQueue;
        
        Queue<ScheduledFutureTask<?>> scheduledTaskQueue() {
            if (scheduledTaskQueue == null) {
                scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
            }
            return scheduledTaskQueue;
        }
    

    该调度任务队列是一个优先级队列,并使用了延迟加载。其核心的调度方法实现如下:

        <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
            if (inEventLoop()) {
                scheduledTaskQueue().add(task); // 原生线程直接向任务队列添加
            } else {
                execute(new Runnable() {    // 其他线程则提交一个添加调度任务的任务
                    @Override
                    public void run() {
                        scheduledTaskQueue().add(task);
                    }
                });
            }
            return task;
        }
    

    可以看出实现很简单,就是向调度任务队列中添加一个任务,为了弄明白具体的调度过程,我们需要明白ScheduledFutureTask,下面我们将详细介绍。

    ScheduledFutureTask

    首先看其中的静态字段和静态方法:

        // 调度任务ID生成器
        private static final AtomicLong nextTaskId = new AtomicLong();
        // 调度相对时间起点
        private static final long START_TIME = System.nanoTime();
        
        // 获取相对的当前时间
        static long nanoTime() {
            return System.nanoTime() - START_TIME;
        }
    
        // 获取相对的截止时间
        static long deadlineNanos(long delay) {
            return nanoTime() + delay;
        }
    

    注意:Netty使用了相对时间调度,时间起点为ScheduledFutureTask类第一次被类加载器加载的时间。
    然后我们看其中的私有字段:

        // 调度任务ID
        private final long id = nextTaskId.getAndIncrement();
        // 调度任务截止时间即到了改时间点任务将被执行
        private long deadlineNanos;
        // 任务时间间隔
        private final long periodNanos;
    

    这里的periodNanos字段还兼有标记的功能,0--表示调度任务不重复,>0--表示按固定频率重复(at fixed rate),<0--表示按固定延迟重复(with fixed delay)。这不是一个好的设计,但也没有暴露给用户程序员,算一个折中处理。
    接着我们看关键的run()方法:

        @Override
        public void run() {
            assert executor().inEventLoop();
            try {
                if (periodNanos == 0) { // 普通不重复的调度任务直接执行
                    if (setUncancellableInternal()) {
                        V result = task.call();
                        setSuccessInternal(result);
                    }
                } else {
                    if (!isCancelled()) {   // 重复的任务可能被取消
                        task.call();
                        if (!executor().isShutdown()) { // 线程已经关闭则不再添加新任务
                            long p = periodNanos;
                            if (p > 0) {
                                deadlineNanos += p; // 按固定频率重复
                            } else {
                                deadlineNanos = nanoTime() - p; // 按固定延迟重复
                            }
                            if (!isCancelled()) {
                                Queue<ScheduledFutureTask<?>> scheduledTaskQueue =
                                        ((AbstractScheduledEventExecutor) executor()).scheduledTaskQueue;
                                assert scheduledTaskQueue != null;
                                scheduledTaskQueue.add(this); // 下一个最近的重复任务添加到任务队列
                            }
                        }
                    }
                }
            } catch (Throwable cause) {
                setFailureInternal(cause);
            }
        }
    

    代码中的注释很好的解释了一个调度任务的执行过程,可能你会对按固定延迟重复的任务有疑问,即:

        deadlineNanos = nanoTime() - p;
    

    其中nanoTime()指当前时间(注意是相对时间),由于p是负值-p等价于:当前时间+delay时间。由于ScheduledFutureTask是添加到PriorityQueue中的对象,我们再看看其中的compareTo()方法:

        @Override
        public int compareTo(Delayed o) {
            if (this == o) {
                return 0;
            }
            ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
            long d = deadlineNanos() - that.deadlineNanos();
            if (d < 0) {
                return -1;
            } else if (d > 0) {
                return 1;
            } else if (id < that.id) {
                return -1;
            } else if (id == that.id) {
                throw new Error();
            } else {
                return 1;
            }
        }
    

    从代码可以看出,优先级队列的出队顺序是:截止时间最近的先出队,如果截止时间相同则ID小的先出队。
    分析完ScheduledFutureTask类,我们接着分析AbstractScheduledEventExecutor类中剩下的方法,由于其中的方法实现简单明了,不再列出代码实现,只列出其方法签名:

        // 返回当前时间(相对时间)
        protected static long nanoTime() {
            return ScheduledFutureTask.nanoTime(); // 使用ScheduledFutureTask的相对时间
        }
        // 取得并移除截止时间大于nanoTime的下一个调度任务
        protected final Runnable pollScheduledTask(long nanoTime);
        // 取得距离下一个调度任务执行的间隔时间
        protected final long nextScheduledTaskNano();
        // 取得但并不移除下一个调度任务
        final ScheduledFutureTask<?> peekScheduledTask();
        // 是否有将要执行的调度任务
        protected final boolean hasScheduledTasks();
        // 删除一个调度任务
        final void removeScheduled(final ScheduledFutureTask<?> task);
    

    4.4.4 SingleThreadEventExecutor

    SingleThreadEventExecutor类从名字可以看出,它是一个单线程的Executor实现。在介绍之前,我们先看Netty定义的线程状态:

        private static final int ST_NOT_STARTED = 1;    // 没有启动
        private static final int ST_STARTED = 2;    // 启动
        private static final int ST_SHUTTING_DOWN = 3;  // 正在关闭
        private static final int ST_SHUTDOWN = 4;   // 关闭
        private static final int ST_TERMINATED = 5; // 终止
    

    需要注意的有两点:
    (1).本类的实现中线程采用延迟启动(lazy start),只有当提交第一个任务时线程才启动,从而节省资源。
    (2).当调用shutdownGracefully()时,线程状态改变为ST_SHUTTING_DOWN;调用shutdown()时,线程状态改变为ST_SHUTDOWN。
    明白了线程状态,我们首先看一下类中的字段:

        private final EventExecutorGroup parent;    // 该Executor所属的线程池
        private final Queue<Runnable> taskQueue;    // 任务队列
        private final Thread thread;    // 改Executor所属的线程
        private final ThreadProperties threadProperties;    // 线程属性值
        private final Semaphore threadLock = new Semaphore(0);  // 一个信号量,注意初始值为0
        private final Set<Runnable> shutdownHooks = new LinkedHashSet<~>(); // 线程关闭钩子任务
        private final boolean addTaskWakesUp;   // 添加任务时是否唤醒线程
        private final int maxPendingTasks;  // 任务队列大小即未执行的最大任务数
        private final RejectedExecutionHandler rejectedExecutionHandler;    // 队列满时的阻止器
    
        private long lastExecutionTime; // 上一次执行时间
    
        private volatile int state = ST_NOT_STARTED;    // 线程状态,注意该字段由STATE_UPDATER修改
    
        // 线程终止异步结果
        private final Promise<?> terminationFuture = new DefaultPromise<Void>(
                                                                GlobalEventExecutor.INSTANCE);
    

    关于SingleThreadEventExecutor的构造方法,我们摘选下面的关键代码:

        thread = threadFactory.newThread(() -> {
                    updateLastExecutionTime();
                    try {
                        SingleThreadEventExecutor.this.run();   // 这是一个模板方法
                    } catch (Throwable t) {
                        logger.warn("Unexpected exception from an event executor: ", t);
                    } finally {
                        // shutdown
                    }
        });
        taskQueue = newTaskQueue(); // 这里使用该方法是为了子类可以优化
    

    其中使用了模板方法run(),由子类负责实现。taskQueue也由一个方法实例,主要是给子类提供一个优化的机会,关于Netty的优化,我们以后将专门讲解,这里taskQueue的默认实现是LinkedBlockingQueue。
    下面我们分析一个关键方法runAllTasks(long timeoutNanos),其功能是用给定的timeoutNanos时间执行任务队列中的任务,代码如下:

        protected boolean runAllTasks(long timeoutNanos) {
            fetchFromScheduledTaskQueue();  // 将调度任务队列中到期的任务移到任务队列
            Runnable task = pollTask(); // 从任务队列头部取出一个任务
            if (task == null) {
                return false;
            }
    
            final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; // 执行截止时间
            long runTasks = 0;
            long lastExecutionTime;
            for (;;) {
                try {
                    task.run();
                } catch (Throwable t) {
                    logger.warn("A task raised an exception.", t);
                }
                runTasks ++;
                // 每执行64个任务检查时候时间已到截止时间,0x3F = 64-1
                if ((runTasks & 0x3F) == 0) {
                    lastExecutionTime = ScheduledFutureTask.nanoTime();
                    if (lastExecutionTime >= deadline) {
                        break;
                    }
                }
    
                task = pollTask();
                if (task == null) { // 没有任务则退出
                    lastExecutionTime = ScheduledFutureTask.nanoTime();
                    break;
                }
            }
            // 更新上一次执行时间
            this.lastExecutionTime = lastExecutionTime;
            return true;
        }
    

    我们再看一下fetchFromScheduledTaskQueue()方法,它从调度任务队列取出所有到期的调度任务并加入到任务队列,除非任务队列满,代码如下:

        private boolean fetchFromScheduledTaskQueue() {
            // 等价于ScheduledFutureTask.nanoTime()
            long nanoTime = AbstractScheduledEventExecutor.nanoTime(); 
            Runnable scheduledTask  = pollScheduledTask(nanoTime);
            while (scheduledTask != null) {
                if (!taskQueue.offer(scheduledTask)) {
                    // 任务队列已满,则重新添加到调度任务队列
                    scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
                    return false;
                }
                scheduledTask  = pollScheduledTask(nanoTime);
            }
            return true;
        }
    

    runAllTasks()还有一个无参方法,其功能将所有到期的调度任务从调度任务队列移入任务队列,并执行任务队列中的所有任务(包括非调度任务),我们不再列出代码。
    SingleThreadEventExecutor类是一个通用框架,不仅可以执行异步任务,也能执行同步任务,下面我们分析其中用于执行同步任务的关键方法takeTask(),其功能是取出任务队列头部的任务,如果没有任务则会一直阻塞,代码如下:

        protected Runnable takeTask() {
            assert inEventLoop();
            if (!(taskQueue instanceof BlockingQueue)) {  // 任务队列必须是阻塞队列
                throw new UnsupportedOperationException();
            }
    
            BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
            for (;;) {
                // 取得调度任务队列的头部任务,注意peek并不移除
                ScheduledFutureTask<?> scheduledTask = peekScheduledTask(); 
                if (scheduledTask == null) { // 没有调度任务
                    Runnable task = null;
                    try {
                        task = taskQueue.take();  // 取得并移除任务队列的头部任务,没有则阻塞
                        if (task == WAKEUP_TASK) {
                            task = null;
                        }
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                    return task;
                } else {
                    long delayNanos = scheduledTask.delayNanos(); // 调度任务的到期时间间隔
                    Runnable task = null;
                    if (delayNanos > 0) {
                        try {   // 调度任务未到期,则从任务队列取一个任务,可能为null
                            task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
                        } catch (InterruptedException e) {
                            return null;
                        }
                    }
                    
                    // 注意这里执行有两种情况:1.任务队列中没有待执行任务,2.调度任务已到期
                    if (task == null) {
                        fetchFromScheduledTaskQueue();
                        task = taskQueue.poll();
                    }
    
                    if (task != null) {
                        return task;
                    }
                }
            }
        }
    

    特别关注一下15行代码,这里有一个WAKEUP_TASK,它是一个标记任务。使用这个标记任务是为了线程能正确退出,当线程需要关闭是,如果线程在take()方法上阻塞,就需要添加一个标记任务WAKEUP_TASK到任务队列,是线程从take()返回,从而正确关闭线程。

        protected void wakeup(boolean inEventLoop) {
            if (!inEventLoop || STATE_UPDATER.get(this) == ST_SHUTTING_DOWN) {
                // 非本类原生线程或者本类原生线程需要关闭时,添加一个标记任务使线程从take()返回。
                // offer失败表明任务队列已有任务,从而线程可以从take()返回故不处理
                taskQueue.offer(WAKEUP_TASK);
            }
        }
    

    本类覆盖了execute()方法,在这里实现了线程的延迟启动(lazy start),代码如下:

        public void execute(Runnable task) {
            boolean inEventLoop = inEventLoop();
            if (inEventLoop) {  // 原生线程直接添加
                addTask(task);  
            } else {    // 外部线程启动线程后添加
                startThread();
                addTask(task);
                if (isShutdown() && removeTask(task)) {
                    reject();   // 原生线程关闭时则阻止添加,抛出异常
                }
            }
            // 是否唤醒线程,addTaskWakesUp由构造方法配置,wakesUpForTask()可由子类覆盖,默认唤醒
            // 这里这个参数值addTaskWakesUp和其说明有出入,现在false反而唤醒?
            if (!addTaskWakesUp && wakesUpForTask(task)) {
                wakeup(inEventLoop);
            }
        }
    

    Netty线程关闭的代码较为繁琐,我们先不列出,以后专门使用一节讲述。此外,本类中其他需要说明的方法,我们列出方法签名和说明:

        // 取得并移除任务队列的头部任务,忽略WAKEUP_TASK标记任务
        protected Runnable pollTask();
        // 取得任务队列的头部任务
        protected Runnable peekTask();
        // 任务队列是否有任务即是否为空
        protected boolean hasTasks();
        // 挂起的任务数即任务队列大小
        public int pendingTasks();
        // 添加一个任务,线程关闭时抛出异常
        protected void addTask(Runnable task);
        final boolean offerTask(Runnable task);
        // 移除一个任务
        protected boolean removeTask(Runnable task);
        // 下一个调度任务到期的时间间隔
        protected long delayNanos(long currentTimeNanos);
        
        // 判断线程是否为该类的原生线程
        public boolean inEventLoop(Thread thread) {
            return thread == this.thread;
        }
    

    4.4.5 SingleThreadEventLoop

    SingleThreadEventLoop终于与Channel取得联系,其中最重要的便是register()方法,功能是将一个Channel对象注册到EventLoop上,其最终实现委托Channel对象的Unsafe对象完成,关于Unsafe我们将在下一章介绍。其代码实现如下:

        @Override
        public ChannelFuture register(Channel channel) {
            return register(channel, new DefaultChannelPromise(channel, this));
        }
    
        @Override
        public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
            // 代码中省略了NullPointer检查
            channel.unsafe().register(this, promise);
            return promise;
        }
    

    该类还覆盖了父类的wakesUpForTask(Runnable task)方法,实现如下:

        @Override
        protected boolean wakesUpForTask(Runnable task) {
            return !(task instanceof NonWakeupRunnable);
        }
        
        // 标记接口,用于标记不唤醒原生线程的任务
        interface NonWakeupRunnable extends Runnable { }
    

    4.4.6 NioEventLoop

    前面铺垫了这么多,终于到了我们的目的地NioEventLoop。NioEventLoop的功能是对注册到其中的Channnel的就绪事件以及对用户提交的任务进行处理,回忆第一章关于Java NIO的讲解,NioEventLoop正是要完成第一章中所示的代码的工作。首先我们从其中的字段开始:

        Selector selector;  // NIO中的多路复用器Selector
        private SelectedSelectionKeySet selectedKeys;   // 就绪事件的键值对,优化时使用
        private final SelectorProvider provider;    // selector的工厂
        // 唤醒标记,由于select()方法会阻塞
        private final AtomicBoolean wakenUp = new AtomicBoolean(); 
        private final SelectStrategy selectStrategy; // 选择策略
        private volatile int ioRatio = 50;  // IO任务占总任务(IO+普通任务)比例
        private int cancelledKeys;  // 取消的键数目
        private boolean needsToSelectAgain; 
    

    在讲解方法前,我们再回顾一下NioEventLoop的继承体系:
    (1).JDK的AbstractExecutorService类定义了任务的提交和执行,留下了newTaskFor()方法用于子类定义执行的任务;
    (2).Netty的AbstractEventExecutor类覆盖了newTaskFor()方法,使用PromiseTask表示待执行的任务;
    (3).AbstractScheduledEventExecutor类将待执行的调度任务封装为ScheduledFutureTask提交给调度任务队列;
    (4).SingleThreadEventExecutor类实现了任务执行器即线程,其覆盖了execute()方法,当使用execute()执行一个任务时,实质是向任务队列提交一个任务;该类中还有一个重要的模板方法run(),在这个方法中执行任务队列中的任务(调度任务队列中的待执行任务移入普通任务队列),留给子类实现;
    (5).SingleThreadEventLoop类实现对Channel对象的注册。
    从NioEventLoop继承体系的分析可以看出,NioEventLoop要实现的最关键方法就是基类的模板方法run()。是不是已经迫不及待了?好,我们直奔代码:

        @Override
        protected void run() {
            for (;;) { 
                try {
                    // 调用select()查询是否有就绪的IO事件
                    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                        case SelectStrategy.CONTINUE:
                            continue;
                        case SelectStrategy.SELECT:
                            select(wakenUp.getAndSet(false));
                            if (wakenUp.get()) {
                                selector.wakeup();
                            }
                        default:
                            // fallthrough
                    }
    
                    cancelledKeys = 0;
                    needsToSelectAgain = false;
                    final int ioRatio = this.ioRatio;
                    if (ioRatio == 100) {
                        processSelectedKeys();  // 处理就绪的IO事件
                        runAllTasks();  // 执行完任务队列中的任务
                    } else {
                        final long ioStartTime = System.nanoTime();
                        processSelectedKeys();  // 处理就绪的IO事件
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);    // 给定时间内执行任务
                    }
    
                    if (isShuttingDown()) { // 检测用户是否要终止线程
                        closeAll();
                        if (confirmShutdown()) {
                            break;
                        }
                    }
                } catch (Throwable t) {
                    logger.warn("Unexpected exception in the selector loop.", t);
                    try {
                        Thread.sleep(1000); // 防止连续异常过度消耗CPU
                    } catch (InterruptedException e) {
                        // Ignore.
                    }
                }
            }
        }
    

    从代码中可以看出NioEventLoop完成了三项任务:
    (1).轮训Channel选择就绪的IO事件。
    (2).处理就绪的IO事件。
    (3).处理任务队列中的普通任务(包含调度任务)。
    其中第(3)项,我们已经在SingleThreadEventExecutor类中分析过,不再赘述。我们看代码的6-16行即第(1)项,轮询Channel选择就绪的IO事件。这里使用接口SelectStrategy是用户可以选择具体的选择策略,我们主要看默认实现:

        @Override
        public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
            return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
        }
        
        private final IntSupplier selectNowSupplier = () -> { return selectNow(); };
    

    故默认策略是:如果有普通任务待执行,使用selectNow();否则使用select(boolean oldWakenUp)。NIO的Selector有三个select()方法,它们的区别如下:

    select() 阻塞直到有一个感兴趣的IO事件就绪
    select(long timeout) 与select()类似,但阻塞的最长时间为给定的timeout
    selectNow() 不会阻塞,直接返回而不管是否有IO事件就绪

    此外,还有一个重要的wakeUp()方法,其功能是唤醒一个阻塞在select()上的线程,使其继续运行。如果先调用了wakeUp()方法,那么下一个select()操作也会立即返回。此外,wakeUp()是一个昂贵的方法,应尽量减少其调用次数。
    有了这些基础知识,我们看本类中与selec()操作有关的方法,首先看selecNow()方法:

        int selectNow() throws IOException {
            try {
                return selector.selectNow();
            } finally {
                if (wakenUp.get()) {    // wakenUp标记字段为真时,唤醒下一次select()操作
                    selector.wakeup();
                }
            }
        }
    

    实现也很简单,我们主要看select(boolean oldWakenUp)方法:

        private void select(boolean oldWakenUp) throws IOException {
            Selector selector = this.selector;
            try {
                int selectCnt = 0;
                long currentTimeNanos = System.nanoTime();
                // delayNanos返回的是最近的一个调度任务的到期时间,没有调度任务返回1秒
                // selectDeadLineNanos指可以进行select操作的截止时间点
                long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
                for (;;) {
                    // 四舍五入将select操作时间换算为毫秒单位
                    long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                    if (timeoutMillis <= 0) {   // 时间不足1ms,不再进行select操作
                        if (selectCnt == 0) {   // 如果一次select操作没有进行
                            selector.selectNow();   // selecNow()之后返回
                            selectCnt = 1;
                        }
                        break;
                    }
    
                    // 此时有任务进入队列且唤醒标志为假
                    if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                        selector.selectNow(); // selectNow()返回,否则会耽误任务执行
                        selectCnt = 1;
                        break;
                    }
    
                    int selectedKeys = selector.select(timeoutMillis);  
                    selectCnt ++;
    
                    // 有就绪的IO事件,参数oldWakenUp为真,外部设置wakenUp为真
                    // 有待执行普通任务,有待执行调度任务
                    if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || 
                                                                            hasScheduledTasks()) {
                        break;
                    }
    
                    long time = System.nanoTime();
                    if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                        selectCnt = 1;  // 截止时间已到(这里可直接break退出)
                    } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                            selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                        rebuildSelector();  // 这里是对JDK BUG的处理
                        selector = this.selector;
                        selector.selectNow(); // 重建selector之后立即selectNow()
                        selectCnt = 1;
                        break;
                    }
                    currentTimeNanos = time;
                }
            } catch (CancelledKeyException e) {
            }
        }
    

    本来select操作的代码不会这么复杂,主要是由于JDK BUG导致select()方法并不阻塞而直接返回且返回值为0,从而出现空轮询使CPU完全耗尽。Netty解决的办法是:对select返回0的操作计数,如果次数大于阈值SELECTOR_AUTO_REBUILD_THRESHOLD就新建一个selector,将注册到老的selector上的channel重新注册到新的selector上。阈值SELECTOR_AUTO_REBUILD_THRESHOLD可由用户使用系统变量io.netty.selectorAutoRebuildThreshold配置,默认为512。这里注意for()循环中大量使用了break,含有break的部分才是关键操作,其他部分(其实就只有一处)是为了解决JDK BUG。
    为了完全理解这段代码,我们还将讲解一下wakeUp()方法,注意其中的21行和32行代码。回忆一下SingleThreadEventExecutor的execute()方法,其最后有一个wakeUp()方法,作用是添加一个任务后指示是否需要唤醒线程。在NioEventLoop中覆盖了它的实现:

        @Override
        protected void wakeup(boolean inEventLoop) {
            // 外部线程且唤醒标记为假时唤醒
            if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
                selector.wakeup(); // 注意此时唤醒标记为真
            }
        }
        
        select(wakenUp.getAndSet(false)); // run方法调用时
    

    当run方法调用select()方法时,每次都将唤醒标记设置为假,这样线程将阻塞在selector.select(timeoutMillis)方法上。阻塞期间如果用户使用外部线程提交一个任务,会调用上述的wakeup()方法,由于wakenUp唤醒标记为假,selector.wakeup()方法调用,线程唤醒从下一个break跳出,从而执行提交任务。阻塞期间如果外部线程提交多个任务,使用wakenUp唤醒标记使selector.wakeup()操作只执行一次,因为它是一个昂贵的操作,从而提高性能。21行代码进入if执行的前提是有任务且wakenUp唤醒标记为假,如果唤醒标记为真是什么情况呢?那说明由外部线程调用了selector.wakeup()方法,此时下一个select()操作会直接返回,继而从下一个break返回,所以也不会影响已有任务的执行。在run()方法select之后的操作还有这样两行代码:

        if (wakenUp.get()) {
            selector.wakeup();
        }
    

    根据注释的解释是:在select(wakenUp.getAndSet(false))操作set(false)和selector.select(timeout)之间如果有外部线程将唤醒标记wakenUp设置为真且执行selector.wakeup()方法,则selector.select(timeout)的第一个操作立即返回,然后会阻塞在第二次循环的select.select(timeout)方法上,此时唤醒标记wakenUp为真从而阻止外部线程添加任务时唤醒线程,从而造成不必要的阻塞操作。(但是代码在select(timeout)之后的一行使用了hasTasks()判断,如果外部线程提交了任务也能跳出循环。所以这部分代码和注释是不是已失效?)
    分析完select操作之后,我们接着分析Netty对IO事件的处理方法processSelectedKeys():

        private void processSelectedKeys() {
            if (selectedKeys != null) {
                processSelectedKeysOptimized(selectedKeys.flip());  // 使用优化
            } else {
                processSelectedKeysPlain(selector.selectedKeys());  // 普通处理
            }
        }
    

    关于优化,我们将在专门的章节讲述,我们先看普通处理:

        private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
            if (selectedKeys.isEmpty()) {
                return; // 选择键的集合为空直接返回
            }
    
            Iterator<SelectionKey> i = selectedKeys.iterator();
            for (;;) {
                final SelectionKey k = i.next();
                final Object a = k.attachment();
                i.remove();
    
                if (a instanceof AbstractNioChannel) {  // IO事件由Netty框架处理
                    processSelectedKey(k, (AbstractNioChannel) a);  
                } else {    // IO事件由用户自定义任务处理
                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                    processSelectedKey(k, task);
                }
    
                if (!i.hasNext()) {
                    break;
                }
    
                if (needsToSelectAgain) {
                    selectAgain();
                    selectedKeys = selector.selectedKeys();
                    if (selectedKeys.isEmpty()) {
                        break;
                    } else {
                        i = selectedKeys.iterator();
                    }
                }
            }
        }
    

    这一部分代码功能就是遍历选择键,其中对选择键的处理有两种方式:Netty框架处理和用户自定义处理。这两种处理方式由register()方式决定:

        // Netty框架处理
        public ChannelFuture register(final Channel channel, final ChannelPromise promise);
        // 用户自定义处理
        public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task);
    

    注意23-31行代码,什么时候需要再次执行select()操作呢?当取消的选择键达到一定数目时,这个数目在Netty中时CLEANUP_INTERVAL,值为256。也就是每取消256个选择键,Netty重新执行一个selectAgain()操作。这个操作实现使用selector.selectNow()并将needsToSelectAgain标记设置为假。cancle()代码如下:

        void cancel(SelectionKey key) {
            key.cancel();
            cancelledKeys ++;
            if (cancelledKeys >= CLEANUP_INTERVAL) {
                cancelledKeys = 0;
                needsToSelectAgain = true;
            }
        }
    

    接着分析最为关键的processSelectedKey()方法:

        private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
            final NioUnsafe unsafe = ch.unsafe();
            if (!k.isValid()) { // 选择键不再有效
                final EventLoop eventLoop;
                try {
                    eventLoop = ch.eventLoop(); 
                } catch (Throwable ignored) {
                    return;
                }
                // channel已不再该EventLoop,直接返回
                if (eventLoop != this || eventLoop == null) {
                    return; 
                }
                // channel还在EventLoop,关闭channel
                unsafe.close(unsafe.voidPromise());
                return;
            }
    
            try {
                int readyOps = k.readyOps();
                if ((readyOps & SelectionKey.OP_CONNECT) != 0) {    // 客户端连接事件
                    int ops = k.interestOps();
                    ops &= ~SelectionKey.OP_CONNECT;
                    k.interestOps(ops);     // 连接完成后客户端除了连接事件都感兴趣
                    unsafe.finishConnect();     // 完成连接
                }
                
                // readyOps == 0为对JDK Bug的处理, 防止死循环
                if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    unsafe.read();  // 读事件以及服务端的Accept事件都抽象为read()事件
                    if (!ch.isOpen()) {
                        return;
                    }
                }
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {  // 写事件
                    ch.unsafe().forceFlush();
                }
            } catch (CancelledKeyException ignored) {
                unsafe.close(unsafe.voidPromise());
            }
        }
    

    可以看出对IO事件的具体处理,委托给NioUnsafe对象处理,由read()、forceFlush()、finishConnect()和close()方法处理具体的IO事件,具体的处理过程,我们将在分析NioUnsafe时讲解。
    目前为止,我们已经讲解完了NioEventLoop实现的最关键部分,当然还有一些细节我们需要完善:

        protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
            return PlatformDependent.newMpscQueue(maxPendingTasks);
        }
    

    NioEventLoop由于不使用takeTask()方法,所以使用一个MPSC队列代替基类的LinkedBlockingQueue作为新的任务队列,大大提高了性能。如果你对MPSC(多个生产者一个消费者)队列感兴趣,可自行查看相关资料。

        @Override
        public int pendingTasks() {
            if (inEventLoop()) {
                return super.pendingTasks();
            } else {
                return submit(pendingTasksCallable).syncUninterruptibly().getNow(); // 同步等待结果
            }
        }
        
        private final Callable<Integer> pendingTasksCallable = () -> {
                return NioEventLoop.super.pendingTasks();
        };
    

    这一部分代码是使用MPSC队列的副作用,由于MPSC只能由NioEventLoop原生线程访问,否则会发生一些意外情况,所以查询队列大小,也向任务队列提交一个任务同时同步等待结果。

        @Override
        protected Runnable pollTask() {
            Runnable task = super.pollTask();
            if (needsToSelectAgain) {
                selectAgain();
            }
            return task;
        }
    

    NioEventLoop覆盖了pollTask()的实现,在适当时机执行selector.selectNow()操作。(由于pollTask是在执行普通任务时调用,是否有必要?就算selectNow()有结果也不能处理)
    Netty作为一个优化狂魔,将优化做到了极致。回忆处理选择键的事件时,需要遍历其存储容器selectedKeySet,这是一个HashSet,迭代性能不高,那么优化。Netty使用新的SelectedSelectionKeySet代替JDK的HashSet,具体怎么实现的呢?在方法openSelector()中实现,代码不在列出,其思路是:使用反射替换这个容器。
    下面我们分析SelectedSelectionKeySet,首先看字段:

        private SelectionKey[] keysA;
        private int keysASize;
        private SelectionKey[] keysB;
        private int keysBSize;
        private boolean isA = true;     // 标记字段,控制使用具体的数组
    

    可以看出SelectedSelectionKeySet使用双数组实现,为什么要这样设计呢?
    (1).使用数组提高遍历效率。
    (2).遍历时使用一个数组,此时可向另一个数组添加就绪的选择键,防止ConcurrentModificationException异常发生。
    再看其中的add()方法:

        @Override
        public boolean add(SelectionKey o) {
            if (o == null) {
                return false;   // 不支持null元素
            }
    
            if (isA) {
                int size = keysASize;
                keysA[size ++] = o; // 就绪的选择键放在末尾
                keysASize = size;
                if (size == keysA.length) {
                    doubleCapacityA();  // 双倍扩充容量
                }
            } else {
                int size = keysBSize;
                keysB[size ++] = o;
                keysBSize = size;
                if (size == keysB.length) {
                    doubleCapacityB();
                }
            }
            return true;
        }
    

    从代码中可以看出,两个双数组可以视为无限容量且不支持null元素。由于双数组一个用于遍历,一个用于添加新元素,我们看关键的两个数组切换的方法,其实现也很简单,代码如下:

        SelectionKey[] flip() {
            if (isA) {
                isA = false;
                keysA[keysASize] = null;    // 最末尾元素显示置为null
                keysBSize = 0;  // B数组清空,用于添加元素
                return keysA;   // A数组返回,用于遍历
            } else {
                isA = true;
                keysB[keysBSize] = null;
                keysASize = 0;
                return keysB;
            }
        }
    

    分析完对SelectedKeySet的优化,我们看在NioEventLoop中的使用:

        // 返回用于遍历的数组
        processSelectedKeysOptimized(selectedKeys.flip());
        
        private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
            for (int i = 0;; i ++) {
                final SelectionKey k = selectedKeys[i];
                if (k == null) {
                    break;  // 注意SelectedKeySet的实现置最末尾元素为null,故必能跳出
                }
                selectedKeys[i] = null; // 设置为null,帮助GC进行回收
    
                final Object a = k.attachment();
                if (a instanceof AbstractNioChannel) {
                    processSelectedKey(k, (AbstractNioChannel) a);  // Netty框架处理
                } else {
                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                    processSelectedKey(k, task);    // 用户自定义处理
                }
    
                if (needsToSelectAgain) {   // 有必要重新选择
                    for (;;) {
                        i++;
                        if (selectedKeys[i] == null) {
                            break;
                        }
                        // 将上一次遍历集合中未处理元素置null,帮助GC回收,防止泄露
                        selectedKeys[i] = null; 
                    }
    
                    selectAgain();  // 未处理元素也将添加到数组中
                    selectedKeys = this.selectedKeys.flip();    // 取出遍历数组
                    i = -1; // 遍历数组索引设置为-1是因为之后将执行i++从而还是从0开始遍历
                }
            }
        }
    

    到了这里,我们已经分析完大部分NioEventLoop的工作原理和实现,但Netty的实现远不止这些,比如全局任务执行器GlobalEventExecutor,默认执行器DefaultEventExecutor,以及其他的ThreadPerChannelEventLoop,LocalEventLoop等等,由于我们很懒,所以不再讲述。我们休整一会,然后前往下一个目的地:Netty的优雅退出机制。

    相关文章

      网友评论

      • LiLeiHanMeiMei:我不明白的是为什么select完成之后 还要调用selector.wakeup方法 都已经从select返回了, 这样的wakeup意义何在? 谢谢博主。
      • NighterHunter:你好,publickeys应该是注册的键集合,publicselectedkeys应该是就绪的键集合,现在都优化用selectkeys替换了,而且处理的时候,把剩余的键显示置为null,那这是不是丢失注册的键了?
        NighterHunter:还是这两个集合都是就绪键集合?
      • 周艺伟:博主现在搞清楚oldWakenUp和wakenUp这两个变量的用意了吗,我这边也是觉得hasTasks()方法就已经可以满足了
        周艺伟:@Hypercube oldWakenUp不是表示上一轮是被外部线程唤醒的吗,那为啥本轮还可能因为这个变量为true而跳出循环呢
        周艺伟:@Hypercube 那hasTasks()方法不是已经可以满足了吗
        Hypercube:wakenUp使用原子变量是因为selector.wakeup()性能开销比较大,所以应该减少调用,如果多个外部线程同时调用这个操作和只有一个操作其实是等价的,所以使用wakenUp标记保证只会等价调用一次。oldWakenUp是记录当eventLoop线程处理任务时外部线程发出的wakeUp请求,有这种请求是select立即返回,以便执行eventLoop中的任务。

      本文标题:自顶向下深入分析Netty(四)--EventLoop-2

      本文链接:https://www.haomeiwen.com/subject/iubwyttx.html