美文网首页
Netty线程源码分析(一)

Netty线程源码分析(一)

作者: 北京的小毛驴 | 来源:发表于2017-06-16 18:03 被阅读0次

    一、NioEventLoopGroup

    继承关系图1-1:


    image

    继承关系:


    11

    NioEventLoop初始化

    NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
        super(parent, threadFactory, false);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        provider = selectorProvider;
        selector = openSelector();
    }
    

    1、调用父类方法构造一个taskQueue,它是一个LinkedBlockingQueue

    2、openSelector(): Netty是基于Nio实现的,所以也离不开selector。

    3、DISABLE_KEYSET_OPTIMIZATION: 判断是否需要对sun.nio.ch.SelectorImpl中的selectedKeys进行优化, 不做配置的话默认需要优化,通过反射将selectedKeySet与sun.nio.ch.SelectorImpl中的两个field绑定

    4、主要优化在哪: SelectorImpl原来的selectedKeys和publicSelectedKeys数据结构是HashSet,大家知道HashSet的数据结构是数组+链表,新的数据结构是由2个数组A、B组成,初始大小是1024,避免了HashSet扩容带来的性能问题。除了扩容外,遍历效率也是一个原因,对于需要遍历selectedKeys的全部元素, 数组效率无疑是最高的。

    private Selector openSelector() {
        final Selector selector;
        try {
            selector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }
    
        if (DISABLE_KEYSET_OPTIMIZATION) {
            return selector;
        }
    
        try {
            SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
    
            Class<?> selectorImplClass =
                    Class.forName("sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader());
    
            // Ensure the current selector implementation is what we can instrument.
            if (!selectorImplClass.isAssignableFrom(selector.getClass())) {
                return selector;
            }
    
            Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
            Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
    
            selectedKeysField.setAccessible(true);
            publicSelectedKeysField.setAccessible(true);
    
            selectedKeysField.set(selector, selectedKeySet);
            publicSelectedKeysField.set(selector, selectedKeySet);
    
            selectedKeys = selectedKeySet;
            logger.trace("Instrumented an optimized java.util.Set into: {}", selector);
        } catch (Throwable t) {
            selectedKeys = null;
            logger.trace("Failed to instrument an optimized java.util.Set into: {}", selector, t);
        }
    
        return selector;
    }
    

    NioEventLoop的启动

    在上一遍讲过NioEventLoop中维护了一个线程,线程启动时会调用NioEventLoop的run方法,loop会不断循环一个过程:select -> processSelectedKeys(IO任务) -> runAllTasks(非IO任务)

    • I/O任务: 即selectionKey中ready的事件,如accept、connect、read、write等

    • 非IO任务: 添加到taskQueue中的任务,如bind、channelActive等

    @Override
    protected void run() {
        for (;;) {
            boolean oldWakenUp = wakenUp.getAndSet(false);
            try {
                // 判断是否有非IO任务,如果有立刻返回
                if (hasTasks()) {
                    selectNow();
                } else {
                    select(oldWakenUp);
    
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                }
    
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    // IO任务
                    processSelectedKeys();
                    // 非IO任务
                    runAllTasks();
                } else {
                    // 用以控制IO任务与非IO任务的运行时间比
                    final long ioStartTime = System.nanoTime();
                    // IO任务
                    processSelectedKeys();
    
                    final long ioTime = System.nanoTime() - ioStartTime;
                    // 非IO任务
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
    
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        break;
                    }
                }
            } catch (Throwable t) {
                // Prevent possible consecutive immediate failures that lead to
                // excessive CPU consumption.
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // Ignore.
                }
            }
        }
    }
    

    1、wakenUp: 用来决定是否调用selector.wakeup(),只有当wakenUp未true时才会调用,目的是为了减少wake-up的负载,因为Selector.wakeup()是一个昂贵的操作。

    2、hasTask(): 判断是否有非IO任务,如果有的话,选择调用非阻塞的selectNow()让select立即返回, 否则以阻塞的方式调用select.timeoutMillis是阻塞时间

    3、ioRatio: 控制两种任务的执行时间,你可以通过它来限制非IO任务的执行时间, 默认值是50, 表示允许非IO任务获得和IO任务相同的执行时间,这个值根据自己的具体场景来设置.

    4、processSelectedKeys(): 处理IO事件

    5、runAllTasks(): 处理非IO任务

    6、isShuttingDown(): 检查state是否被标记为ST_SHUTTING_DOWN

    private void select(boolean oldWakenUp) throws IOException {
            Selector selector = this.selector;
            try {
                int selectCnt = 0;
                long currentTimeNanos = System.nanoTime();
                long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
                for (;;) {
                    long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                    if (timeoutMillis <= 0) {
                        if (selectCnt == 0) {
                            selector.selectNow();
                            selectCnt = 1;
                        }
                        break;
                    }
    
                    int selectedKeys = selector.select(timeoutMillis);
                    selectCnt ++;
    
                    if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                        // - Selected something,
                        // - waken up by user, or
                        // - the task queue has a pending task.
                        // - a scheduled task is ready for processing
                        break;
                    }
                    if (Thread.interrupted()) {
                        // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                        // As this is most likely a bug in the handler of the user or it's client library we will
                        // also log it.
                        //
                        // See https://github.com/netty/netty/issues/2426
                        if (logger.isDebugEnabled()) {
                            logger.debug("Selector.select() returned prematurely because " +
                                    "Thread.currentThread().interrupt() was called. Use " +
                                    "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                        }
                        selectCnt = 1;
                        break;
                    }
    
                    long time = System.nanoTime();
                    if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                        // timeoutMillis elapsed without anything selected.
                        selectCnt = 1;
                    } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                            selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                        // The selector returned prematurely many times in a row.
                        // Rebuild the selector to work around the problem.
                        logger.warn(
                                "Selector.select() returned prematurely {} times in a row; rebuilding selector.",
                                selectCnt);
    
                        rebuildSelector();
                        selector = this.selector;
    
                        // Select again to populate selectedKeys.
                        selector.selectNow();
                        selectCnt = 1;
                        break;
                    }
    
                    currentTimeNanos = time;
                }
    
                if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1);
                    }
                }
            } catch (CancelledKeyException e) {
                if (logger.isDebugEnabled()) {
                    logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e);
                }
            }
        }
    
    protected long delayNanos(long currentTimeNanos) {
        ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
        if (scheduledTask == null) {
            return SCHEDULE_PURGE_INTERVAL;
        }
    
        return scheduledTask.delayNanos(currentTimeNanos);
    }
    
    public long delayNanos(long currentTimeNanos) {
        return Math.max(0, deadlineNanos() - (currentTimeNanos - START_TIME));
    }
    
    public long deadlineNanos() {
        return deadlineNanos;
    }
    
    

    1、delayNanos(currentTimeNanos): 在父类SingleThreadEventExecutor中有一个延迟执行任务的队列,delayNanos就是去这个延迟队列里看是否有非IO任务未执行

    • 如果没有则返回1秒钟。
    • 如果延迟队列里有任务并且最终的计算出来的时间(selectDeadLineNanos - currentTimeNanos)小于500000L纳秒,就调用selectNow()直接返回,反之执行阻塞的select

    2、select如果遇到以下几种情况会立即返回

    if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                        // - Selected something,
                        // - waken up by user, or
                        // - the task queue has a pending task.
                        // - a scheduled task is ready for processing
                        break;
                    }
    
    1. Selected something 如果select到了就绪连接(selectedKeys > 0)
    2. waken up by user 被用户唤醒了
    3. the task queue has a pending task.任务队列来了一个新任务
    4. a scheduled task is ready for processing 延迟队列里面有个预约任务需要到期执行

    3、selectCnt: 记录select空转的次数(selectCnt),该方法解决了Nio中臭名昭著selector的select方法导致cpu100%的BUG,当空转的次数超过了512(定义一个阀值,这个阀值默认是512,可以在应用层通过设置系统属性io.netty.selectorAutoRebuildThreshold传入),Netty会重新构建新的Selector,将老Selector上注册的Channel转移到新建的Selector上,关闭老Selector,用新的Selector代替老Selector。详细看下面rebuildSelector()方法

    4、rebuildSelector(): 就是上面说过得。

    public void rebuildSelector() {
        if (!inEventLoop()) {
            execute(new Runnable() {
                @Override
                public void run() {
                    rebuildSelector();
                }
            });
            return;
        }
    
        final Selector oldSelector = selector;
        final Selector newSelector;
    
        if (oldSelector == null) {
            return;
        }
    
        try {
            newSelector = openSelector();
        } catch (Exception e) {
            logger.warn("Failed to create a new Selector.", e);
            return;
        }
    
        // Register all channels to the new Selector.
        int nChannels = 0;
        for (;;) {
            try {
                for (SelectionKey key: oldSelector.keys()) {
                    Object a = key.attachment();
                    try {
                        if (!key.isValid() || key.channel().keyFor(newSelector) != null) {
                            continue;
                        }
    
                        int interestOps = key.interestOps();
                        key.cancel();
                        SelectionKey newKey = key.channel().register(newSelector, interestOps, a);
                        if (a instanceof AbstractNioChannel) {
                            // Update SelectionKey
                            ((AbstractNioChannel) a).selectionKey = newKey;
                        }
                        nChannels ++;
                    } catch (Exception e) {
                        logger.warn("Failed to re-register a Channel to the new Selector.", e);
                        if (a instanceof AbstractNioChannel) {
                            AbstractNioChannel ch = (AbstractNioChannel) a;
                            ch.unsafe().close(ch.unsafe().voidPromise());
                        } else {
                            @SuppressWarnings("unchecked")
                            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                            invokeChannelUnregistered(task, key, e);
                        }
                    }
                }
            } catch (ConcurrentModificationException e) {
                // Probably due to concurrent modification of the key set.
                continue;
            }
    
            break;
        }
    
        selector = newSelector;
    
        try {
            // time to close the old selector as everything else is registered to the new one
            oldSelector.close();
        } catch (Throwable t) {
            if (logger.isWarnEnabled()) {
                logger.warn("Failed to close the old Selector.", t);
            }
        }
    
        logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
    }
    

    相关文章

      网友评论

          本文标题:Netty线程源码分析(一)

          本文链接:https://www.haomeiwen.com/subject/hjumqxtx.html