美文网首页
NETTY引擎--NioEventLoop

NETTY引擎--NioEventLoop

作者: 小犇手K线研究员 | 来源:发表于2017-03-02 13:16 被阅读149次

    将NioEventLoop和NioEventLoopGroup称作Netty的引擎并不是官方的说法,只是从个人理解上来讲,netty所有的事件和io操作都是通过NioEventLoop来执行的,更底层来说,java nio中的selector是由NioEventLoop来打开,监听,关闭和管理的,selector是事件的源头,NioEventLoop是责任链的调用者。本篇博客主要关注NioEventLoop对selector的操作。

    UML

    Paste_Image.png

    如上图所示,NioEventLoop的层级比较多,它所继承的父类实现两个重要的功能:executor和Loop功能,这两个功能不涉及到IO,本篇不讲,有兴趣的带着“Netty为什么不选择java的Exetutors框架而是自己实现了多线程框架?”这个问题去探索一下。

    打开selector

    NioEventLoop在创建对象的时候打开, 通过调用openSeletor()完成。

        NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                     SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
            super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
            if (selectorProvider == null) {
                throw new NullPointerException("selectorProvider");
            }
            if (strategy == null) {
                throw new NullPointerException("selectStrategy");
            }
            provider = selectorProvider;
            selector = openSelector();
            selectStrategy = strategy;
        }
    

    openSelector()如下:

    private Selector openSelector() {
            final Selector selector;
            try {
                selector = provider.openSelector();
            } catch (IOException e) {
                throw new ChannelException("failed to open a new selector", e);
            }
    
            if (DISABLE_KEYSET_OPTIMIZATION) {
                return selector;
            }
    
            try {
                SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
    
                Class<?> selectorImplClass =
                        Class.forName("sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader());
    
                // Ensure the current selector implementation is what we can instrument.
                if (!selectorImplClass.isAssignableFrom(selector.getClass())) {
                    return selector;
                }
    
                Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
    
                selectedKeysField.setAccessible(true);
                publicSelectedKeysField.setAccessible(true);
    
                selectedKeysField.set(selector, selectedKeySet);
                publicSelectedKeysField.set(selector, selectedKeySet);
    
                selectedKeys = selectedKeySet;
                logger.trace("Instrumented an optimized java.util.Set into: {}", selector);
            } catch (Throwable t) {
                selectedKeys = null;
                logger.trace("Failed to instrument an optimized java.util.Set into: {}", selector, t);
            }
    
            return selector;
        }
    

    上面的代码中值得注意的是如果selector是sun.nio.ch.SelectorImpl,那么用netty自己定制化的set替换selectedKeys和publicSelectedKeys.
    为什么?我也不知道.
    NioEventLoop是“线程", 其并不是直接继承Thread, 而是里面包含了一个线程,但是其循环执行核心方法还是在run()中, 如下:

    protected void run() {
            for (;;) {
                try {
                    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                        case SelectStrategy.CONTINUE:
                            continue;
                        case SelectStrategy.SELECT:
                            select(wakenUp.getAndSet(false));
    
                            // 'wakenUp.compareAndSet(false, true)' is always evaluated
                            // before calling 'selector.wakeup()' to reduce the wake-up
                            // overhead. (Selector.wakeup() is an expensive operation.)
                            //
                            // However, there is a race condition in this approach.
                            // The race condition is triggered when 'wakenUp' is set to
                            // true too early.
                            //
                            // 'wakenUp' is set to true too early if:
                            // 1) Selector is waken up between 'wakenUp.set(false)' and
                            //    'selector.select(...)'. (BAD)
                            // 2) Selector is waken up between 'selector.select(...)' and
                            //    'if (wakenUp.get()) { ... }'. (OK)
                            //
                            // In the first case, 'wakenUp' is set to true and the
                            // following 'selector.select(...)' will wake up immediately.
                            // Until 'wakenUp' is set to false again in the next round,
                            // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                            // any attempt to wake up the Selector will fail, too, causing
                            // the following 'selector.select(...)' call to block
                            // unnecessarily.
                            //
                            // To fix this problem, we wake up the selector again if wakenUp
                            // is true immediately after selector.select(...).
                            // It is inefficient in that it wakes up the selector for both
                            // the first case (BAD - wake-up required) and the second case
                            // (OK - no wake-up required).
    
                            if (wakenUp.get()) {
                                selector.wakeup();
                            }
                        default:
                            // fallthrough
                    }
    
                    cancelledKeys = 0;
                    needsToSelectAgain = false;
                    final int ioRatio = this.ioRatio;
                    if (ioRatio == 100) {
                        processSelectedKeys();
                        runAllTasks();
                    } else {
                        final long ioStartTime = System.nanoTime();
    
                        processSelectedKeys();
    
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
    
                    if (isShuttingDown()) {
                        closeAll();
                        if (confirmShutdown()) {
                            break;
                        }
                    }
                } catch (Throwable t) {
                    logger.warn("Unexpected exception in the selector loop.", t);
    
                    // Prevent possible consecutive immediate failures that lead to
                    // excessive CPU consumption.
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // Ignore.
                    }
                }
            }
        }
    

    首先检测当前调用策略,更新wakeUp状态,然后调用processSelectedKey():

    private void processSelectedKeys() {
            if (selectedKeys != null) {
                processSelectedKeysOptimized(selectedKeys.flip());
            } else {
                processSelectedKeysPlain(selector.selectedKeys());
            }
        }
    

    如果不为空, 再调用processSelectedKeysOptimized()方法:

    private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
            for (int i = 0;; i ++) {
                final SelectionKey k = selectedKeys[i];
                if (k == null) {
                    break;
                }
                // null out entry in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                selectedKeys[i] = null;
    
                final Object a = k.attachment();
    
                if (a instanceof AbstractNioChannel) {
                    processSelectedKey(k, (AbstractNioChannel) a);
                } else {
                    @SuppressWarnings("unchecked")
                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                    processSelectedKey(k, task);
                }
    
                if (needsToSelectAgain) {
                    // null out entries in the array to allow to have it GC'ed once the Channel close
                    // See https://github.com/netty/netty/issues/2363
                    for (;;) {
                        i++;
                        if (selectedKeys[i] == null) {
                            break;
                        }
                        selectedKeys[i] = null;
                    }
    
                    selectAgain();
                    // Need to flip the optimized selectedKeys to get the right reference to the array
                    // and reset the index to -1 which will then set to 0 on the for loop
                    // to start over again.
                    //
                    // See https://github.com/netty/netty/issues/1523
                    selectedKeys = this.selectedKeys.flip();
                    i = -1;
                }
            }
        }
    

    然后逐一调用processSelectedKey(k, (AbstractNioChannel) a),在这段代码中才真正的检测selectKey的四种状态:op_read, op_write, op_accept, op_connect, 然后调用channel的unsafe对用的方法,将该事件床传递给channel, channelPipeline以及各个handler.

        private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
            final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
            if (!k.isValid()) {
                final EventLoop eventLoop;
                try {
                    eventLoop = ch.eventLoop();
                } catch (Throwable ignored) {
                    // If the channel implementation throws an exception because there is no event loop, we ignore this
                    // because we are only trying to determine if ch is registered to this event loop and thus has authority
                    // to close ch.
                    return;
                }
                // Only close ch if ch is still registerd to this EventLoop. ch could have deregistered from the event loop
                // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
                // still healthy and should not be closed.
                // See https://github.com/netty/netty/issues/5125
                if (eventLoop != this || eventLoop == null) {
                    return;
                }
                // close the channel if the key is not valid anymore
                unsafe.close(unsafe.voidPromise());
                return;
            }
    
            try {
                int readyOps = k.readyOps();
                // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
                // to a spin loop
                if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    unsafe.read();
                    if (!ch.isOpen()) {
                        // Connection already closed - no need to handle write.
                        return;
                    }
                }
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                    ch.unsafe().forceFlush();
                }
                if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                    // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                    // See https://github.com/netty/netty/issues/924
                    int ops = k.interestOps();
                    ops &= ~SelectionKey.OP_CONNECT;
                    k.interestOps(ops);
    
                    unsafe.finishConnect();
                }
            } catch (CancelledKeyException ignored) {
                unsafe.close(unsafe.voidPromise());
            }
        }
    

    值得注意的是, 在run()方法中,有如下几行代码:

    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                        case SelectStrategy.CONTINUE:
                            continue;
                        case SelectStrategy.SELECT:
                            select(wakenUp.getAndSet(false));
                            if (wakenUp.get()) {
                                selector.wakeup();
                            }
                        default:
                            // fallthrough
                    }
    

    上面的代码的核心是通过调用select()方法获取已经准备好的selectedKeys, 如下:

    private void select(boolean oldWakenUp) throws IOException {
            Selector selector = this.selector;
            try {
                int selectCnt = 0;
                long currentTimeNanos = System.nanoTime();
                long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
                for (;;) {
                    long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                    if (timeoutMillis <= 0) {
                        if (selectCnt == 0) {
                            selector.selectNow();
                            selectCnt = 1;
                        }
                        break;
                    }
    
                    // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
                    // Selector#wakeup. So we need to check task queue again before executing select operation.
                    // If we don't, the task might be pended until select operation was timed out.
                    // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
                    if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                        selector.selectNow();
                        selectCnt = 1;
                        break;
                    }
    
                    int selectedKeys = selector.select(timeoutMillis);
                    selectCnt ++;
    
                    if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                        // - Selected something,
                        // - waken up by user, or
                        // - the task queue has a pending task.
                        // - a scheduled task is ready for processing
                        break;
                    }
                    if (Thread.interrupted()) {
                        // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                        // As this is most likely a bug in the handler of the user or it's client library we will
                        // also log it.
                        //
                        // See https://github.com/netty/netty/issues/2426
                        if (logger.isDebugEnabled()) {
                            logger.debug("Selector.select() returned prematurely because " +
                                    "Thread.currentThread().interrupt() was called. Use " +
                                    "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                        }
                        selectCnt = 1;
                        break;
                    }
    
                    long time = System.nanoTime();
                    if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                        // timeoutMillis elapsed without anything selected.
                        selectCnt = 1;
                    } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                            selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                        // The selector returned prematurely many times in a row.
                        // Rebuild the selector to work around the problem.
                        logger.warn(
                                "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                                selectCnt, selector);
    
                        rebuildSelector();
                        selector = this.selector;
    
                        // Select again to populate selectedKeys.
                        selector.selectNow();
                        selectCnt = 1;
                        break;
                    }
    
                    currentTimeNanos = time;
                }
    
                if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                selectCnt - 1, selector);
                    }
                }
            } catch (CancelledKeyException e) {
                if (logger.isDebugEnabled()) {
                    logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                            selector, e);
                }
                // Harmless exception - log anyway
            }
        }
    

    在上面的代码中,如果选择成功,由于java io的bug, 其需要执行rebuildSelector(), 如下:

    /**
         * Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work
         * around the infamous epoll 100% CPU bug.
         */
        public void rebuildSelector() {
            if (!inEventLoop()) {
                execute(new Runnable() {
                    @Override
                    public void run() {
                        rebuildSelector();
                    }
                });
                return;
            }
    
            final Selector oldSelector = selector;
            final Selector newSelector;
    
            if (oldSelector == null) {
                return;
            }
    
            try {
                newSelector = openSelector();
            } catch (Exception e) {
                logger.warn("Failed to create a new Selector.", e);
                return;
            }
    
            // Register all channels to the new Selector.
            int nChannels = 0;
            for (;;) {
                try {
                    for (SelectionKey key: oldSelector.keys()) {
                        Object a = key.attachment();
                        try {
                            if (!key.isValid() || key.channel().keyFor(newSelector) != null) {
                                continue;
                            }
    
                            int interestOps = key.interestOps();
                            key.cancel();
                            SelectionKey newKey = key.channel().register(newSelector, interestOps, a);
                            if (a instanceof AbstractNioChannel) {
                                // Update SelectionKey
                                ((AbstractNioChannel) a).selectionKey = newKey;
                            }
                            nChannels ++;
                        } catch (Exception e) {
                            logger.warn("Failed to re-register a Channel to the new Selector.", e);
                            if (a instanceof AbstractNioChannel) {
                                AbstractNioChannel ch = (AbstractNioChannel) a;
                                ch.unsafe().close(ch.unsafe().voidPromise());
                            } else {
                                @SuppressWarnings("unchecked")
                                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                                invokeChannelUnregistered(task, key, e);
                            }
                        }
                    }
                } catch (ConcurrentModificationException e) {
                    // Probably due to concurrent modification of the key set.
                    continue;
                }
    
                break;
            }
    
            selector = newSelector;
    
            try {
                // time to close the old selector as everything else is registered to the new one
                oldSelector.close();
            } catch (Throwable t) {
                if (logger.isWarnEnabled()) {
                    logger.warn("Failed to close the old Selector.", t);
                }
            }
    
            logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
        }
    

    总结,NioEventLoop在实例化的时候打开selector,在run方法中不断去监听已经准备好的selectedKeys, 然后根据准备好事件调用channel的处理逻辑,为了fix java的bug, 提升io性能,netty做了一些优化处理,如定制set, rebuildSelector, 由此我们可以看出,要自己用java写好一个io程序,会遇到很多坑的.

    相关文章

      网友评论

          本文标题:NETTY引擎--NioEventLoop

          本文链接:https://www.haomeiwen.com/subject/iqbhgttx.html