美文网首页
Netty源码_NioEventLoop详解

Netty源码_NioEventLoop详解

作者: wo883721 | 来源:发表于2021-10-27 18:47 被阅读0次

    这一章我们将讲解 netty 中真正使用的事件轮询器 NioEventLoop,通过这一章,你将了解:

    • netty 是如何通过一个事件轮询器管理多个嵌套字 Socket 的通道 channel
    • 又是如何即处理通道的 channelIO 事件,以及添加事件轮询器上任务的。

    一. 选择器 Selector

    netty 的事件轮询器也是通过的 java nio 的选择器 Selector 来管理多个嵌套字 Socket 的通道 channel
    那么选择器 Selector 是如何与通道 channel,并管理它们的呢。

    1.1 注册选择器

    SelectableChannel 类用 register 方法将通道注册到选择器 Selector 中。

        /**
         *  将此通道注册到给定的选择器,并返回一个选择键SelectionKey。
         */
        public abstract SelectionKey register(Selector sel, int ops, Object att)
            throws ClosedChannelException;
    
    1. sel : 选择器,多个通道 channel 可以注册同一个选择器,因此一个选择器就可以管理多个通道了。
    2. ops: 该通道 channel 关注的 IO 事件类型,分为读事件OP_READ,写事件OP_WRITE,连接事件OP_CONNECT和 接受事件OP_ACCEPT。可以关注一个或者多个事件。
    3. att: 可以绑定在返回值 选择键 SelectionKey 上的值,可以从选择键中获取它。
    4. 返回的选择键 SelectionKey,它其实代表的是 IO 事件类型。

    如果当前通道已经注册到给定的选择器 sel 上了,调用这个方法,更改了这个通道关注的 IO 事件类型 ops,和绑定的对象 att
    AbstractSelectableChannel 实现中我们看到:

        public final SelectionKey register(Selector sel, int ops,
                                           Object att)
            throws ClosedChannelException
        {
            synchronized (regLock) {
                if (!isOpen())
                    throw new ClosedChannelException();
                if ((ops & ~validOps()) != 0)
                    throw new IllegalArgumentException();
                if (blocking)
                    throw new IllegalBlockingModeException();
                // 已经注册到给定的 选择器 sel
                SelectionKey k = findKey(sel);
                if (k != null) {
                    k.interestOps(ops);
                    k.attach(att);
                }
                if (k == null) {
                    // New registration
                    synchronized (keyLock) {
                        if (!isOpen())
                            throw new ClosedChannelException();
                        k = ((AbstractSelector)sel).register(this, ops, att);
                        addKey(k);
                    }
                }
                return k;
            }
        }
    

    1.2 选择键 SelectionKey

    public abstract class SelectionKey {
    
        /**
         * 构造该类的实例
         */
        protected SelectionKey() { }
    
    
        // -- Channel and selector operations --
    
        /**
         * 返回为其创建键的通道 Channel。
         * 即使在该选择键被取消之后,此方法仍将继续返回通道 Channel。
         *
         * @return  This key's channel
         */
        public abstract SelectableChannel channel();
    
        /**
         * 返回为其创建键的选择器 Selector。
         * 这个方法将继续返回选择器,即使在选择键被取消之后。
         *
         * @return  This key's selector
         */
        public abstract Selector selector();
    
        /**
         * 返回这个键是否有效。
         * 键在创建时是有效的,
         * 直到它被取消、它的通道关闭或它的选择器关闭后无效。
         */
        public abstract boolean isValid();
    
        /**
         * 请求取消该选择键的通道与其选择器的注册。
         * 方法返回后该选择键将是无效的,并将它添加到其选择器的 cancelled-key的集合。
         * 在下一次选择操作期间,该键将从所有选择器的键集中删除。
         * 如果此选择键已被取消,则调用此方法没有任何效果。一旦取消,选择键将永远无效。
         *
         * 这个方法可以在任何时候被调用。
         * 选择器的 cancelled-key 集合是进行过同步处理的,
         * 因此如果同时调用涉及相同选择器的取消或选择操作,可能会短暂阻塞。
         */
        public abstract void cancel();
    
    
        // -- Operation-set accessors --
    
        /**
         * 返回此选择键关注的 IO 事件类型集合。
         * 可以保证返回的集合只包含对这个键的通道有效的操作位。
         * 这个方法可以在任何时候被调用。
         * 它是否阻塞以及阻塞多长时间取决于具体实现。
         *
         * 如果这个键被取消,则抛出 CancelledKeyException 异常。
         */
        public abstract int interestOps();
    
        /**
         * 重新设置此选择键关注的 IO 事件类型集合 ops。
         * 这个方法可以在任何时候被调用。
         * 它是否阻塞以及阻塞多长时间取决于实现。
         *
         * 如果设置的 ops,不是键的通道所支持的操作,抛出 IllegalArgumentException 异常
         * 如果这个键被取消,则抛出 CancelledKeyException 异常。
         */
        public abstract SelectionKey interestOps(int ops);
    
        /**
         * 返回此键的就绪操作 ready-operation 集合。
         * 可以保证返回的集合只包含对这个键的通道有效的操作位。
         *
         * 如果这个键被取消,则抛出 CancelledKeyException 异常。
         */
        public abstract int readyOps();
    
    
        // -- Operation bits and bit-testing convenience methods --
    
        /**
         * 用于读操作的操作位。
         */
        public static final int OP_READ = 1 << 0;
    
        /**
         * 写操作的操作位。
         */
        public static final int OP_WRITE = 1 << 2;
    
        /**
         * socket-connect 操作的操作位。
         */
        public static final int OP_CONNECT = 1 << 3;
    
        /**
         * socket-accept 操作的操作位。
         */
        public static final int OP_ACCEPT = 1 << 4;
    
        /**
         * 测试此键的通道是否已准备好读取。
         */
        public final boolean isReadable() {
            return (readyOps() & OP_READ) != 0;
        }
    
        /**
         * 测试此键的通道是否已准备好写入。
         */
        public final boolean isWritable() {
            return (readyOps() & OP_WRITE) != 0;
        }
    
        /**
         * 测试此键的通道是否已完成或未能完成其套接字连接操作。
         */
        public final boolean isConnectable() {
            return (readyOps() & OP_CONNECT) != 0;
        }
    
        /**
         * 测试此键的通道是否已准备好接受新的套接字连接。
         */
        public final boolean isAcceptable() {
            return (readyOps() & OP_ACCEPT) != 0;
        }
    
    
        // -- Attachments --
    
        private volatile Object attachment = null;
    
        private static final AtomicReferenceFieldUpdater<SelectionKey,Object>
            attachmentUpdater = AtomicReferenceFieldUpdater.newUpdater(
                SelectionKey.class, Object.class, "attachment"
            );
    
        /**
         * 将给定对象附加到此键上。
         */
        public final Object attach(Object ob) {
            return attachmentUpdater.getAndSet(this, ob);
        }
    
        /**
         * 返回此键上当前的附加对象
         */
        public final Object attachment() {
            return attachment;
        }
    
    }
    

    选择键 SelectionKey 的功能并不复杂,主要是和通道channelIO 事件有关,分为四种:

    1. OP_READ = 1 << 0 读事件。
    2. OP_WRITE = 1 << 2 写事件。
    3. OP_CONNECT = 1 << 3 连接事件。
    4. OP_ACCEPT = 1 << 4 接收事件。

    1.3 选择器 Selector

    public abstract class Selector implements Closeable {
    
        /**
         * 初始化该类的新实例。
         */
        protected Selector() { }
    
        /**
         * 静态方法创建一个选择器 Selector
         */
        public static Selector open() throws IOException {
            return SelectorProvider.provider().openSelector();
        }
    
        /**
         * 返回这个选择器是否开启。
         */
        public abstract boolean isOpen();
    
        /**
         * 返回创建次选择器的 provider
         */
        public abstract SelectorProvider provider();
    
        /**
         * 返回这个选择器的选择键集合。
         *
         * 不能直接修改这个返回集合。只有当选择键被取消且其通道被注销后,这个选择键才会从集合中删除。
         * 任何修改这个集合的尝试都会导致抛出 UnsupportedOperationException 异常。
         *
         * 这个选择键集合不是线程安全的。
         */
        public abstract Set<SelectionKey> keys();
    
        /**
         * 返回选择器的选定键集合,即准备好IO 事件的选择键集合。
         *
         * 可以从选定键集合中删除选择键,但是不能添加键到选定键集合中,
         * 任何向选定键集合添加对象的尝试都会导致抛出 UnsupportedOperationException 异常。
         *
         * 这个选定键集合也不是线程安全的。
         */
        public abstract Set<SelectionKey> selectedKeys();
    
        /**
         * 立即返回当前选择器中已准备好 IO 事件通道channel 的数量。
         * 如果此时选择器中没有准备好IO 事件的通道 channel,
         * 那么该方法直接返回 0,不会阻塞当前线程。
         *
         * 当返回值大于 0 的时候,调用 selectedKeys() 方法,
         * 获取已准备好 IO 事件的选择键集合。
         */
        public abstract int selectNow() throws IOException;
    
        /**
         * 返回当前选择器中已准备好 IO 事件通道 channel 的数量。
         * 如果此时选择器中没有准备好IO 事件的通道channel,
         * 那么该方法将阻塞当前线程,直到
         * 1. 有准备好IO 事件的通道channel
         * 2. 使用 Selector.wakeup 唤醒
         * 3. 阻塞线程被中断
         * 4. 超时时间 timeout 到了
         *
         * 当返回值大于 0 的时候,调用 selectedKeys() 方法,
         * 获取已准备好 IO 事件的选择键集合。
         */
        public abstract int select(long timeout)
            throws IOException;
    
        /**
         * 返回当前选择器中已准备好 IO 事件通道 channel 的数量。
         * 如果此时选择器中没有准备好IO 事件的通道channel,
         * 那么该方法将阻塞当前线程,直到
         * 1. 有准备好IO 事件的通道channel
         * 2. 使用 Selector.wakeup 唤醒
         * 3. 阻塞线程被中断
         *
         * 当返回值大于 0 的时候,调用 selectedKeys() 方法,
         * 获取已准备好 IO 事件的选择键集合。
         */
        public abstract int select() throws IOException;
    
        /**
         * 唤醒被 select() 和 select(long timeout) 阻塞的等待线程
         */
        public abstract Selector wakeup();
    
        /**
         * 关闭当前选择器
         */
        public abstract void close() throws IOException;
    }
    

    选择器的主要功能就是获取已经准备好的通道 channel

    1.3.1 获取准备好的通道数量

    一共有三个方法 int selectNow(),int select(long timeout)int select()

    int selectNow() 是立即返回当前准备好的通道数量,如果没有,那就返回0,不阻塞当前线程。
    int select(long timeout)int select() 都会阻塞当前线程,直到有准备好的通道,或者阻塞线程被中断,或者其他线程调用选择器的 wakeup 方法唤醒。 int select(long timeout) 方法还多了一个超时返回。

    1.3.2 获取准备好的通道集合

    Set<SelectionKey> selectedKeys() 方法返回准备好的选择键集合,通过选择键就可以得到对应的通道 channel

    1.3.3 唤醒阻塞

    Selector wakeup() 可以唤醒被 select()select(long timeout) 阻塞的等待线程。

    二. NioEventLoop 中的选择器

    2.1 开启选择器

    NioEventLoop 的成员变量中有两个选择器实例 unwrappedSelectorselector

    那是因为 netty 可能会优化选择器的选择键 SelectionKey, 所以就有了两个选择器。

    1. unwrappedSelector 通过 provider.openSelector() 方法获取的原始选择器。
    2. selector:如果不优化选择键 SelectionKeyselector就是 unwrappedSelector对象;如果优化选择键,那么 selector 就是 new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet) 对象,包装了 unwrappedSelector 对象,并优化了选择键。

    这些都是在 openSelector() 方法中实现。

    2.2 将通道注册到选择器

    1. register(...) 方法
       /**
        * 向这个事件循环器的 Selector 注册一个任意的 SelectableChannel (不一定是由Netty创建的)。
        * 一旦注册了指定的 SelectableChannel,当 SelectableChannel 准备好时,该事件循环器将执行指定的任务。
        */
       public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
           ObjectUtil.checkNotNull(ch, "ch");
           if (interestOps == 0) {
               throw new IllegalArgumentException("interestOps must be non-zero.");
           }
           if ((interestOps & ~ch.validOps()) != 0) {
               throw new IllegalArgumentException(
                       "invalid interestOps: " + interestOps + "(validOps: " + ch.validOps() + ')');
           }
           ObjectUtil.checkNotNull(task, "task");
      
           if (isShutdown()) {
               throw new IllegalStateException("event loop shut down");
           }
      
           if (inEventLoop()) {
               // 在事件轮询器线程中,直接调用 register0 方法注册
               register0(ch, interestOps, task);
           } else {
               try {
                   // 调用 submit  方法,确保在轮询器线程中注册
                   submit(new Runnable() {
                       @Override
                       public void run() {
                           register0(ch, interestOps, task);
                       }
                   }).sync();
               } catch (InterruptedException ignore) {
                   // 即使被中断了,我们也会调度它,所以只需将线程标记为中断。
                   Thread.currentThread().interrupt();
               }
           }
       }
      

      确保在轮询器线程中注册

    2. register0(...) 方法
       private void register0(SelectableChannel ch, int interestOps, NioTask<?> task) {
           try {
               // 将通道 ch 注册到选择器 unwrappedSelector 中
               ch.register(unwrappedSelector, interestOps, task);
           } catch (Exception e) {
               throw new EventLoopException("failed to register a channel", e);
           }
       }
      

      调用 SelectableChannelregister 方法,将通道注册到选择器unwrappedSelector 中去。

    2.3 获取准备好的通道数量

    1. 立即获取,不阻塞
       /**
        * 返回注册在该 Selector 上的已经准备好进行I/O操作的通道 channel 的数量。
        * 如果还没有准备好的通道 channel ,那么直接返回 0,不会阻塞当前线程。
        */
       int selectNow() throws IOException {
           return selector.selectNow();
       }
      
    2. 阻塞获取
       private int select(long deadlineNanos) throws IOException {
           // 如果截止时间 deadlineNanos 是NONE(无限大)
           // 那么就使用 selector.select() 方法,不设置超时,
           // 一直阻塞等待,直到有注册在该 selector 上通道 channel 已经准备好进行I/O操作,
           // 才停止阻塞,返回准备好I/O操作 channel 的数量。
           if (deadlineNanos == NONE) {
               return selector.select();
           }
           // 计算调用 selector.select(timeoutMillis) 的超时阻塞等待时间。
           // 如果截止时间在5微秒内,超时时间将为0
           long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
           return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
       }
      
    3. 再次获取
       /**
        * 重新获取 IO 事件,即再次调用 selector.selectNow(), 不阻塞线程
        */
       private void selectAgain() {
           needsToSelectAgain = false;
           try {
               selector.selectNow();
           } catch (Throwable t) {
               logger.warn("Failed to update SelectionKeys.", t);
           }
       }
      

    三. 事件轮询

    3.1 run 方法

    事件轮询器如何实现事件轮询的,就是主要看它的 run 方法实现:

        @Override
        protected void run() {
            int selectCnt = 0;
            // 必须使用死循环不断进行事件轮询,获取任务和通道的 IO 事件
            for (;;) {
                try {
                    int strategy;
                    try {
                        /**
                         * 返回处理策略,就分为两种:
                         * 有任务 hasTasks() == true,就不能等待IO事件了,先直接调用 selectNow() 方法,
                         * 获取当前准备好IO 的通道channel 的数量(0 表示一个都没有),处理 IO 事件 和任务。
                         *
                         * 没有任务 hasTasks() == false,返回 SelectStrategy.SELECT (是负数),
                         * 没有要及时处理的任务,先阻塞等待 IO 事件
                         */
                        strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                        switch (strategy) {
                        case SelectStrategy.CONTINUE:
                            continue;
    
                        case SelectStrategy.BUSY_WAIT:
                            // fall-through to SELECT since the busy-wait is not supported with NIO
    
                        case SelectStrategy.SELECT:
                            // 返回下一个计划任务准备运行的截止时间纳秒值
                            long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                            if (curDeadlineNanos == -1L) {
                                // 返回 -1,说明没有下一个计划任务,
                                // 将 curDeadlineNanos 设置为 NONE,
                                // 调用 selector.select 方法时,就没有超时,
                                // 要无限等待了,除非被唤醒或者有准备好的 IO 事件。
                                curDeadlineNanos = NONE;
                            }
                            // 设置 超时等待时间
                            nextWakeupNanos.set(curDeadlineNanos);
                            try {
                                if (!hasTasks()) {
                                    // 当前没有任务,那么就通过 selector 查看有没有 IO 事件
                                    // 并设置超时时间,超时时间到了那么就要执行计划任务了
                                    // 如果 curDeadlineNanos 是 NONE,就没有超时,无限等待。
                                    strategy = select(curDeadlineNanos);
                                }
                            } finally {
                                // 这个更新只是为了帮助阻止不必要的选择器唤醒,
                                // 所以使用lazySet是可以的(没有竞争条件)
                                nextWakeupNanos.lazySet(AWAKE);
                            }
                            // fall through
                        default:
                        }
                    } catch (IOException e) {
                        // 如果我们在这里接收到IOException,那是因为Selector搞错了。
                        // 让我们重新构建选择器并重试。
                        // https://github.com/netty/netty/issues/8566
                        rebuildSelector0();
                        selectCnt = 0;
                        handleLoopException(e);
                        continue;
                    }
                    /**
                     * 代码走到这里,
                     * 要么有 IO 事件,即 strategy >0
                     * 要么就是有任务要运行。
                     * 如果两个都不是,那么就有可能是 JDK 的 epoll 的空轮询 BUG
                     */
    
                    selectCnt++;
                    cancelledKeys = 0;
                    needsToSelectAgain = false;
                    final int ioRatio = this.ioRatio;
                    boolean ranTasks;
                    if (ioRatio == 100) {
                        // 如果 ioRatio
                        try {
                            if (strategy > 0) {
                                processSelectedKeys();
                            }
                        } finally {
                            // 确保运行了所有待执行任务,包括当前时间已经过期的计划任务
                            ranTasks = runAllTasks();
                        }
                    } else if (strategy > 0) {
                        // strategy > 0 说明有 IO 事件,
                        // 那么需要调用 processSelectedKeys() 方法,执行 IO 时间
                        final long ioStartTime = System.nanoTime();
                        try {
                            processSelectedKeys();
                        } finally {
                            // 计算 IO 操作花费的时间
                            final long ioTime = System.nanoTime() - ioStartTime;
                            // 按照比例计算可以运行任务的超时时间 ioTime * (100 - ioRatio) / ioRatio,
                            // 超时时间到了,即使还有任务没有运行,也直接返回了,等下一个周期在运行这些任务
                            ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                        }
                    } else {
                        // strategy == 0 说明没有 IO 事件,不用处理 IO 了
                        // 调用 runAllTasks(0) 方法,超时时间为0,这将运行最小数量的任务
                        ranTasks = runAllTasks(0);
                    }
    
                    if (ranTasks || strategy > 0) {
                        // 要么有任务运行,要么有 IO 事件处理
                        if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                            logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                    selectCnt - 1, selector);
                        }
                        selectCnt = 0;
                    } else if (unexpectedSelectorWakeup(selectCnt)) {
                        // 即没有任务运行,也没有IO 事件处理,就有可能是 JDK 的 epoll 的空轮询 BUG
                        // 调用 unexpectedSelectorWakeup(selectCnt) 方法处理。
                        // 可能会重新建立 Select
    
                        selectCnt = 0;
                    }
                } catch (CancelledKeyException e) {
                    // Harmless exception - log anyway
                    if (logger.isDebugEnabled()) {
                        logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                                selector, e);
                    }
                } catch (Error e) {
                    throw e;
                } catch (Throwable t) {
                    handleLoopException(t);
                } finally {
                    // Always handle shutdown even if the loop processing threw an exception.
                    try {
                        if (isShuttingDown()) {
                            // 如果事件轮询器开始 shutdown,就要关闭 IO 资源
                            closeAll();
                            if (confirmShutdown()) {
                                return;
                            }
                        }
                    } catch (Error e) {
                        throw e;
                    } catch (Throwable t) {
                        handleLoopException(t);
                    }
                }
            }
        }
    

    我们知道事件轮询器要处理两种事件:通道的 IO 事件 和 任务(包括计划任务和待执行任务),那么就要合理分配时间:

    1. 选择策略 selectStrategy,根据有无待执行任务(hasTasks() ) 进行划分:

      • 有待执行任务,那么不用等待 IO 事件,先直接调用 selectNow() 方法,获取当前准备好IO事件的通道channel ,然后处理 IO 事件和待执行任务。
      • 没有待执行任务,那么就需要阻塞等待准备好IO事件的通道;先获取下一个计划任务的截止时间 curDeadlineNanos,调用 select(curDeadlineNanos) 方法。
    2. 开始处理 IO 事件和待执行任务

      这里面有一个非常重要的属性 ioRatio,它表示事件循环中,处理IO 事件所占的时间比例。这个值越大,处理非IO 事件(待执行任务)的时间就越少;但是如果值变成 100 ,就会禁用该特性,事件循环将不会尝试平衡IO 事件和非I/O事件的时间。

      • ioRatio == 100,如果有IO事件(strategy > 0),通过processSelectedKeys() 处理IO事件,最后调用 runAllTasks() 运行全部的待执行任务。
      • 然后判断IO事件(strategy > 0),通过processSelectedKeys() 处理IO事件,计算处理IO事件的花费的时间 ioTime,根据这个时间,计算出执行非IO事件(即待执行任务)最多花费的时间ioTime * (100 - ioRatio) / ioRatio,这样就可以控制运行任务的时间了。
      • 没有 IO 事件(strategy == 0),调用 runAllTasks(0) 方法,运行最小数量的任务。
      • 最后需要考虑,如果这次被唤醒,即没有任务运行,也没有IO事件处理,那么就有可能是 JDKepoll 的空轮询 BUG;需要重新注册选择器。

    3.2 处理 IO 事件

    1. processSelectedKeys
       /**
        * 处理 IO 事件
        */
       private void processSelectedKeys() {
           if (selectedKeys != null) {
               // 如果是优化的 Select, 调用 processSelectedKeysOptimized 方法
               processSelectedKeysOptimized();
           } else {
               // 如果没有优化,
               // 直接调用 selector.selectedKeys() 获取IO事件的channel
               processSelectedKeysPlain(selector.selectedKeys());
           }
       }
      
    2. processSelectedKeysOptimized
       private void processSelectedKeysOptimized() {
           for (int i = 0; i < selectedKeys.size; ++i) {
               final SelectionKey k = selectedKeys.keys[i];
               // 将数组中的条目空出来,以便在通道关闭后对其进行GC
               // See https://github.com/netty/netty/issues/2363
               selectedKeys.keys[i] = null;
      
               final Object a = k.attachment();
      
               if (a instanceof AbstractNioChannel) {
                   processSelectedKey(k, (AbstractNioChannel) a);
               } else {
                   @SuppressWarnings("unchecked")
                   NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                   processSelectedKey(k, task);
               }
      
               if (needsToSelectAgain) {
                   // null out entries in the array to allow to have it GC'ed once the Channel close
                   // See https://github.com/netty/netty/issues/2363
                   selectedKeys.reset(i + 1);
      
                   selectAgain();
                   i = -1;
               }
           }
       }
      
    3. processSelectedKeysPlain
       private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
           // 检查集合是否为空,如果为空,则直接返回。
           // See https://github.com/netty/netty/issues/597
           if (selectedKeys.isEmpty()) {
               return;
           }
      
           // 得到 通道channel IO事件 SelectionKey 的迭代器
           Iterator<SelectionKey> i = selectedKeys.iterator();
           // 循环遍历, 这里使用 for 死循环,
           // 因为可能会再次调用 selector.selectNow() 获取IO 事件
           // 需要继续处理这些 IO 事件
           for (;;) {
               final SelectionKey k = i.next();
               final Object a = k.attachment();
               // 必须调用 remove() 方法,
               // 将这个 SelectionKey 从迭代器中移除
               i.remove();
      
               if (a instanceof AbstractNioChannel) {
                   // 如果 AbstractNioChannel 事件
                   processSelectedKey(k, (AbstractNioChannel) a);
               } else {
                   @SuppressWarnings("unchecked")
                   NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                   // 如果 NioTask 事件
                   processSelectedKey(k, task);
               }
      
               // 没有 IO 事件了,跳出循环
               if (!i.hasNext()) {
                   break;
               }
      
               if (needsToSelectAgain) {
                   // 如果needsToSelectAgain == true, 需要重新获取IO事件,
                   selectAgain();
                   // 再次获取 IO 事件的 selectedKeys
                   selectedKeys = selector.selectedKeys();
      
                   if (selectedKeys.isEmpty()) {
                       // 没有新的 IO 事件,就直接返回。
                       break;
                   } else {
                       // 重新获取新的迭代器, 以避免ConcurrentModificationException
                       i = selectedKeys.iterator();
                   }
               }
           }
       }
      
    4. processSelectedKey
       /**
        * 处理通道 AbstractNioChannel 的IO事件
        */
       private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
           final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
           // 如果 SelectionKey 无效
           if (!k.isValid()) {
               final EventLoop eventLoop;
               try {
                   eventLoop = ch.eventLoop();
               } catch (Throwable ignored) {
                   // 如果通道实现因为没有事件循环器而抛出异常,则忽略此异常,
                   // 因为我们只是试图确定ch是否注册到该事件循环器,从而有权关闭ch。
                   return;
               }
               // 只有当ch仍然注册到这个EventLoop时才关闭ch。
               // ch可能已经从事件循环中注销,因此SelectionKey可以作为注销过程的一部分被取消,
               // 但通道仍然健康,不应该关闭。
               // See https://github.com/netty/netty/issues/5125
               if (eventLoop == this) {
                   // 关闭这个通道 channel
                   unsafe.close(unsafe.voidPromise());
               }
               return;
           }
      
           try {
               // 获取 IO 事件类型
               int readyOps = k.readyOps();
               // 首先判断是不是连接的IO事件 OP_CONNECT
               // 在尝试触发read(…)或write(…)之前,
               // 我们首先需要调用finishConnect(),
               // 否则NIO JDK通道实现可能抛出 NotYetConnectedException 异常。
               if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                   // See https://github.com/netty/netty/issues/924
                   int ops = k.interestOps();
                   // 删除OP_CONNECT,否则Selector.select(..)将始终返回而不阻塞
                   ops &= ~SelectionKey.OP_CONNECT;
                   k.interestOps(ops);
      
                   unsafe.finishConnect();
               }
      
               // 首先处理写事件 OP_WRITE,因为我们可以写一些队列缓冲区,从而释放内存。
               if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                   // 调用forceFlush,即使没有东西可写,它也会清除OP_WRITE
                   ch.unsafe().forceFlush();
               }
      
               // 最后处理读事件
               // 还要检查 readOps 是否为0,以解决可能导致旋转循环的JDK错误
               if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                   unsafe.read();
               }
           } catch (CancelledKeyException ignored) {
               unsafe.close(unsafe.voidPromise());
           }
       }
      
       /**
        * 处理 NioTask 任务,
        */
       private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
           int state = 0;
           try {
               task.channelReady(k.channel(), k);
               state = 1;
           } catch (Exception e) {
               k.cancel();
               invokeChannelUnregistered(task, k, e);
               state = 2;
           } finally {
               switch (state) {
               case 0:
                   k.cancel();
                   invokeChannelUnregistered(task, k, null);
                   break;
               case 1:
                   if (!k.isValid()) { // Cancelled by channelReady()
                       invokeChannelUnregistered(task, k, null);
                   }
                   break;
               default:
                    break;
               }
           }
       }
      

    四. 总结

    NioEventLoop 基本逻辑已经说清楚了,我们知道它是如何平衡处理 IO 事件和 待执行的任务的。

    相关文章

      网友评论

          本文标题:Netty源码_NioEventLoop详解

          本文链接:https://www.haomeiwen.com/subject/ylpualtx.html