美文网首页netty
Netty 之 NioEventLoop 源码分析

Netty 之 NioEventLoop 源码分析

作者: jijs | 来源:发表于2019-02-28 08:10 被阅读100次

    每一个 NioEventLoop 开启一个线程,线程启动时会调用 NioEventLoop 的 run 方法,执行I/O任务和非I/O任务

    I/O任务

    I/O 任务就是处理 Nio 中 Selector 中注册的 4 种事件。

    SelectionKey.OP_READ
    SelectionKey.OP_WRITE
    SelectionKey.OP_CONNECT
    SelectionKey.OP_ACCEPT
    

    非IO任务

    • 系统 Task:通过调用 NioEventLoop 的 excute(Runnable task) 方法实现, Netty 有很多系统 Task,创建它们的主要原因:当 I/O 线程和用户线程同时操作网络资源时,为了防止并发操作导致的锁竞争,将用户线程操作封装成 Task 放入消息队列中,由 NioEventLoop 线程执行,由同一个线程执行,不需要考虑多线程并发问题。
    • 定时任务:通过调用 NioEventLoop 的 schedule(Runnable command,long delay,TimeUnit unit) 方法实现。

    NioEventLoop 源码分析

    public final class NioEventLoop extends SingleThreadEventLoop {
    
        private Selector selector;
        private Selector unwrappedSelector;
        private SelectedSelectionKeySet selectedKeys;
    
        private final SelectorProvider provider;
        ......
    

    从 NioEventLoop 类中可用看到内部使用了 java.nio.channels.Selector。 由 Selector 处理网络 I/O 读写操作操作。

    初始化 Selector

        NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                     SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
            super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
            ......
            final SelectorTuple selectorTuple = openSelector();
            selector = selectorTuple.selector;
        }
    

    获取 Selector

        private SelectorTuple openSelector() {
            final Selector unwrappedSelector;
            try {
                //1、 创建 Selector
                unwrappedSelector = provider.openSelector();
            } catch (IOException e) {
                throw new ChannelException("failed to open a new selector", e);
            }
            // 2、判断是否开启优化开关,默认没有开启直接返回 Selector
            if (DISABLE_KEY_SET_OPTIMIZATION) {
                return new SelectorTuple(unwrappedSelector);
            }
    
            // 3、反射创建 SelectorImpl 对象
            Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
                @Override
                public Object run() {
                    try {
                        return Class.forName( "sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader());
                    } catch (Throwable cause) {
                        return cause;
                    }
                }
            });
            // 省略代码 ......
    
    
            final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
            // 3、使用优化后的 SelectedSelectionKeySet 对象将 JDK 的 sun.nio.ch.SelectorImpl.selectedKeys 替换掉。
            final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
    
            Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
                @Override
                public Object run() {
                    try {
                        Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                        Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
    
                        // 省略代码 ......
    
                        Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
                        if (cause != null) {
                            return cause;
                        }
                        cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
                        if (cause != null) {
                            return cause;
                        }
    
                        selectedKeysField.set(unwrappedSelector, selectedKeySet);
                        publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                        return null;
                    } catch (NoSuchFieldException e) {
                        return e;
                    } catch (IllegalAccessException e) {
                        return e;
                    }
                }
            });
    
            if (maybeException instanceof Exception) {
                selectedKeys = null;
                Exception e = (Exception) maybeException;
                logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
                return new SelectorTuple(unwrappedSelector);
            }
            selectedKeys = selectedKeySet;
            logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
            return new SelectorTuple(unwrappedSelector,
                                     new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
        }
    

    1、通过 Nio 的 java.nio.channels.spi.SelectorProvider 创建 Selector。
    2、判断是否开启 Selector 的优化开关,默认是不开启,则直接返回已经创建的 Selector。
    3、如果开启优化则通过反射加载 sun.nio.ch.SelectorImpl 对象,并通过已经优化过的 SelectedSelectionKeySet 替换 sun.nio.ch.SelectorImpl 中的 selectedKeys 和 publicSelectedKeys 两个 HashSet 集合。

    NioEventLoop 启动运行

    当 NioEventLoop 初始化后,开始运行会调用 run() 方法。

    @Override
    protected void run() {
        for (;;) {
            try {
                try {
                    // 1、通过 hasTasks() 判断当前消息队列中是否还有未处理的消息
                    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
    
                    case SelectStrategy.BUSY_WAIT:
                        // fall-through to SELECT since the busy-wait is not supported with NIO
    
                    //hasTasks() 没有任务则执行 select() 处理网络IO
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
    
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                        // fall through
                    default:
                    }
                } catch (IOException e) {
                    // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                    // the selector and retry. https://github.com/netty/netty/issues/8566
                    rebuildSelector0();
                    handleLoopException(e);
                    continue;
                }
    
                cancelledKeys = 0;
                needsToSelectAgain = false;
                // 处理IO事件所需的时间和花费在处理 task 时间的比例,默认为 50%
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        // 如果 IO 的比例是100,表示每次都处理完IO事件后,才执行所有的task
                        processSelectedKeys();
                    } finally {
                        // 执行 task 任务
                        runAllTasks();
                    }
                } else {
                    // 记录处理 IO 开始的执行时间
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // 计算处理 IO 所花费的时间
                        final long ioTime = System.nanoTime() - ioStartTime;
                        // 执行 task 任务,判断执行 task 任务时间是否超过配置的比例,如果超过则停止执行 task 任务
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }
    
    // io.netty.channel.DefaultSelectStrategy#calculateStrategy
    public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
        //如果 hasTask 没有任务则调用则返回  SelectStrategy.SELECT,否则调用 selectNow
        return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
    }
    
    // io.netty.channel.nio.NioEventLoop#selectNowSupplier
    private final IntSupplier selectNowSupplier = new IntSupplier() {
        @Override
        public int get() throws Exception {
            // selectNow 是否非阻塞的,返回可操作的 Channel 的个数,如果没有返回 0 。
            return selectNow();
        }
    };
    

    1、调用selectStrategy.calculateStrategy 判断是否有 Task任务,如果没有则调用 SelectorImpl.selectNow() 方法,该方法是非阻塞的,判断是否有需要处理的 Channel。如果没有则返回 SelectStrategy.SELECT,然后执行 select(wakenUp.getAndSet(false)) 方法,阻塞等待可处理的 IO 就绪事件。

    2、如果有 Task 任务,则判断 ioRatio 的比率值,该值为 EventLoop 处理 IO 和 处理 Task 任务的时间的比率。默认比率为 50%。

    • 如果 ioRatio == 100,则说明优先处理所有的 IO 任务,处理完所有的IO事件后才会处理所有的 Task 任务。
    • 如果 ioRatio <> 100, 则优先处理所有的IO任务,处理完所有的IO事件后,才会处理所有的Task 任务,但处理所有的Task 任务的时候会判断执行 Task 任务的时间比率,如果超过配置的比率则中断处理 Task 队列中的任务。

    从中可以发现,什么情况下都会优先处理 IO任务,但处理非 IO 任务时,会判断非 IO 任务执行的时间不能超过 ioRatio 的阈值。

    NioEventLoop.Select()

    private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
            // 计算出 NioEventLoop 定时任务最近执行的时间(还有多少 ns 执行),单位 ns
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
    
            for (;;) {
                // 为定时任务中的时间加上0.5毫秒,将时间换算成毫秒
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                // 对定时任务的超时时间判断,如果到时间或超时,则需要立即执行 selector.selectNow()
                if (timeoutMillis <= 0) {
                    if (selectCnt == 0) {
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }
                // 轮询过程中发现有任务加入,中断本次轮询
                if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }
                // Nio 的 阻塞式 select 操作
                int selectedKeys = selector.select(timeoutMillis);
                // select 次数 ++ , 通过该次数可以判断是否出发了 JDK Nio中的 Selector 空轮循 bug
                selectCnt ++;
    
                 // 如果selectedKeys不为空、或者被用户唤醒、或者队列中有待处理任务、或者调度器中有任务,则break
                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                    break;
                }
                //如果线程被中断则重置selectedKeys,同时break出本次循环,所以不会陷入一个繁忙的循环。
                if (Thread.interrupted()) {
                    selectCnt = 1;
                    break;
                }
    
                long time = System.nanoTime();
                // 如果超时,把 selectCnt 置为 1,开始下一次的循环
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    // timeoutMillis elapsed without anything selected.
                    selectCnt = 1;
                }
                //  如果 selectCnt++ 超过 默认的 512 次,说明触发了 Nio Selector 的空轮训 bug,则需要重新创建一个新的 Selector,并把注册的 Channel 迁移到新的 Selector 上
                else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    // 重新创建一个新的 Selector,并把注册的 Channel 迁移到新的 Selector 上
                    selector = selectRebuildSelector(selectCnt);
                    selectCnt = 1;
                    break;
                }
    
                currentTimeNanos = time;
            }
    
            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                            selectCnt - 1, selector);
                }
            }
        } catch (CancelledKeyException e) {
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                        selector, e);
            }
        }
    }
    

    1、通过 delayNanos(currentTimeNanos) 计算出 定时任务队列中第一个任务的执行时间。
    2、判断是否到期,如果到期则执行 selector.selectNow(),退出循环
    3、如果定时任务未到执行时间,则通过 hasTasks() 判断是否有可执行的任务,如果有则中断本次循环。
    4、既没有到期的定时任务、也没有可执行的Task,则调用 selector.select(timeoutMillis) 方法阻塞,等待注册到 Selector 上感兴趣的事件。
    5、每次 select() 后都会 selectCnt++。通过该次数可以判断是否出发了 JDK Nio中的 Selector 空轮询 bug
    6、如果selectedKeys不为空、或者被用户唤醒、或者队列中有待处理任务、或者调度器中有任务,则break。
    7、通过 selectCnt 判断是否触发了 JDK Selector 的空轮询 bug,SELECTOR_AUTO_REBUILD_THRESHOLD 默认为 512, 可修改。
    8、通过 selectRebuildSelector() 方法解决 Selector 空轮询 bug。

    selectRebuildSelector() 解决空轮询bug

    private Selector selectRebuildSelector(int selectCnt) throws IOException {
        // 重新创建 Selector,并把原 Selector 上注册的 Channel 迁移到新的 Selector 上
        rebuildSelector();
        Selector selector = this.selector;
    
        selector.selectNow();
        return selector;
    }
    

    重新创建 Selector,并把原 Selector 上注册的 Channel 迁移到新的 Selector 上

    private void rebuildSelector0() {
        final Selector oldSelector = selector;
        final SelectorTuple newSelectorTuple;
        ......
        try {
            // 创建新的 Selector
            newSelectorTuple = openSelector();
        } catch (Exception e) {
            logger.warn("Failed to create a new Selector.", e);
            return;
        }
    
    
        int nChannels = 0;
        // 循环原 Selector 上注册的所有的 SelectionKey
        for (SelectionKey key: oldSelector.keys()) {
            Object a = key.attachment();
            try {
    
                int interestOps = key.interestOps();
                key.cancel();
                SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
                ......
                nChannels ++;
            } catch (Exception e) {
                ......
            }
        }
        // 将新的 Selector 替换 原 Selector
        selector = newSelectorTuple.selector;
        unwrappedSelector = newSelectorTuple.unwrappedSelector;
        ......
    }
    

    1、创建新的 Selector
    2、循环把原 Selector 上所有的 SelectorKey 注册到 新的 Selector 上
    3、将新的 Selector 替换掉原来的 Selector

    处理 IO 任务

    NioEventLoop 调用 processSelectedKeys 处理 IO 任务

    private void processSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized();
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }
    

    默认没有使用优化的 Set,所有调用 processSelectedKeysPlain() 方法进行处理 IO 任务

    private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
    
        if (selectedKeys.isEmpty()) {
            return;
        }
    
        Iterator<SelectionKey> i = selectedKeys.iterator();
        for (;;) {
            final SelectionKey k = i.next();
            final Object a = k.attachment();
            i.remove();
    
            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }
    
            if (!i.hasNext()) {
                break;
            }
    
            if (needsToSelectAgain) {
                selectAgain();
                selectedKeys = selector.selectedKeys();
    
                // Create the iterator again to avoid ConcurrentModificationException
                if (selectedKeys.isEmpty()) {
                    break;
                } else {
                    i = selectedKeys.iterator();
                }
            }
        }
    }
    

    循环处理每个 selectionKey,每个selectionKey的处理首先根据attachment的类型来进行分发处理发,这里我们只分析 attachment 为 AbstractNioChannel 的处理过程。

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    
        // 省略代码 ......
    
        try {
            int readyOps = k.readyOps();
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
    
                unsafe.finishConnect();
            }
    
    
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
    
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }
    

    1、首先获取 Channel 的 NioUnsafe,所有的读写等操作都在 Channel 的 unsafe 类中操作。
    2、获取 SelectionKey 就绪事件,如果是 OP_CONNECT,则说明已经连接成功,并把注册的 OP_CONNECT 事件取消。
    3、如果是 OP_WRITE 事件,说明可以继续向 Channel 中写入数据,当写完数据后用户自己吧 OP_WRITE 事件取消掉。
    4、如果是 OP_READ 或 OP_ACCEPT 事件,则调用 unsafe.read() 进行读取数据。unsafe.read() 中会调用到 ChannelPipeline 进行读取数据。

    private final class NioMessageUnsafe extends AbstractNioUnsafe {
    
            @Override
            public void read() {
                // 省略代码 ......
                // 获取 Channel 对应的 ChannelPipeline
                final ChannelPipeline pipeline = pipeline();
    
                boolean closed = false;
                Throwable exception = null;
                try {
                    // 省略代码 ......
                    int size = readBuf.size();
                    for (int i = 0; i < size; i ++) {
                        readPending = false;
                        // 委托给 pipeline 中的 Handler 进行读取数据
                        pipeline.fireChannelRead(readBuf.get(i));
                    }
    

    当 NioEventLoop 读取数据的时候会委托给 Channel 中的 unsafe 对象进行读取数据。
    Unsafe中真正读取数据是交由 ChannelPipeline 来处理。
    ChannelPipeline 中是注册的我们自定义的 Handler,然后由 ChannelPipeline中的 Handler 一个接一个的处理请求的数据。
    下一篇我们来分析 ChannelPipeline 原理。

    相关文章

      网友评论

        本文标题:Netty 之 NioEventLoop 源码分析

        本文链接:https://www.haomeiwen.com/subject/wipkeqtx.html