美文网首页
netty源码分析(三) - NioEventLoop - 2执

netty源码分析(三) - NioEventLoop - 2执

作者: 进击的蚂蚁zzzliu | 来源:发表于2020-10-09 17:30 被阅读0次

    概述

    执行过程主要有以下4步:

    1. 执行入口逻辑
    2. 轮询IO事件
    3. 处理IO事件
    4. 处理tasks
      下面对这四个步骤进行详细解析

    1. 执行入口逻辑

    NioEventLoop入口有两个:
    A: 服务端启动绑定端口时(netty源码分析(二) - 服务端启动 - 2中有详细分析)
    B: 新连接接入通过chooser绑定一个NioEventLoop时(下一章节详细分析)
    本节先分析A

    private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {
        //channel.eventLoop() -> NioEventLoop extend SingleThreadEventExecutor
        channel.eventLoop().execute(new Runnable() {
            //从execute里面代码可以看出,该runnable被当作task被add到taskQueue中; 在NioEventLoop run方法ranTasks = runAllTasks(0);时执行
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    //执行绑定端口逻辑,上一章节已经分析过
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }
    
    • channel.eventLoop():实际是当前NioServerSocketChannel的NioEventLoop,execute在父类SingleThreadEventExecutor中
    • execute(new Runnable():从下面execute代码分析可以看出,该runnable被当作task添加到taskQueue中; 在NioEventLoop run方法ranTasks = runAllTasks(0);时执行
    • channel.bind:绑定端口逻辑,上一章节已经分析过

    private void execute(Runnable task, boolean immediate) {
        //当前时main线程,executor thread = null
        boolean inEventLoop = inEventLoop();
        //提交过来的task并没有执行,只是放在了taskQueue中
        addTask(task);
        if (!inEventLoop) {
            startThread();
        }
    }
    
    • inEventLoop:判断NioEventLoop的thread成员变量(父类SingleThreadEventExecutor中)当前线程是否等于当前线程,此处thread为null返回false
    • addTask:传过来的runnable并没有立即执行,只是放在了taskQueue中
    • startThread:启动线程

    private void doStartThread() {
        //executor: ThreadPerTaskExecutor  上一节分析创建流程时的创建一个fastThreadLocalThread并启动
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                boolean success = false;
                //更新线程最后执行时间
                updateLastExecutionTime();
                try {
                    //真正开始执行NioEventLoop中run方法
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    //忽略非重点逻辑
                }
            }
        });
    }
    
    • executor:创建NioEventLoop是创建的ThreadPerTaskExecutor,该executor.execute直接创建一个线程直接start;(细节参考上一篇创建流程)
    • SingleThreadEventExecutor.this.run():开始真正执行NioEventLoop中run方法
    • finally:finally中主要是监控executor中线程状态,里面涉及FastThreadLocal/CountDownLatch等知识点可以自己分析下

    run方法整体代码如下,下面对其中select / processSelectedKeys / runAllTasks进行详细分析

    protected void run() {
        //为了解决jdk空轮询bug设置的轮询次数
        int selectCnt = 0;
        for (;;) {
            try {
                int strategy;
                try {
                    //selectStrategy: 上一节创建流程中创建的calculateStrategy,服务端启动执行时task中有任务(前面1. 执行入口逻辑中add进去的)
                    //此时此处为0,不执行select操作;执行完下面的ranTasks = runAllTasks(0);,第二次循环会返回-1,执行select
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.BUSY_WAIT:
                    case SelectStrategy.SELECT:
                        //下一次定时任务触发截至时间,默认不是定时任务,返回 -1L
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                        if (curDeadlineNanos == -1L) {
                            curDeadlineNanos = NONE; // nothing on the calendar
                        }
                        nextWakeupNanos.set(curDeadlineNanos);
                        try {
                            if (!hasTasks()) {
                                //2. taskQueue中任务执行完,开始执行select进行阻塞
                                strategy = select(curDeadlineNanos);
                            }
                        } finally {
                            nextWakeupNanos.lazySet(AWAKE);
                        }
                    default:
                    }
                } catch (IOException e) {
                    //IOException时,重构selector(重写打开一个selector,把原selector上的channel重新注册,然后关闭原selector)
                    rebuildSelector0();
                    selectCnt = 0;
                    //防止连续报错导致cpu过载每次IOException后sleep 1s再继续轮询
                    handleLoopException(e);
                    continue;
                }
                selectCnt++;
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                boolean ranTasks;
                //IO事件执行时间占比,默认50
                if (ioRatio == 100) {
                    try {
                        if (strategy > 0) {
                            processSelectedKeys();
                        }
                    } finally {
                        ranTasks = runAllTasks();
                    }
                } else if (strategy > 0) {
                    final long ioStartTime = System.nanoTime();
                    try {
                        //3. 轮询到IO事件,进行处理
                        processSelectedKeys();
                    } finally {
                        final long ioTime = System.nanoTime() - ioStartTime;
                        //4. 执行外部任务(非IO任务)
                        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                } else {
                    //strategy <= 0,只执行外部任务,不处理IO事件
                    ranTasks = runAllTasks(0); 
                }
                
                //执行到了task(ranTasks = true)或 轮询到的IO事件的个数strategy > 0,正常情况肯定会走下面if的逻辑
                //每次把selectCnt置0;当出现jkd空轮询bug(即strategy=0)会走unexpectedSelectorWakeup解决该bug
                if (ranTasks || strategy > 0) {
                    selectCnt = 0;
                } else if (unexpectedSelectorWakeup(selectCnt)) {
                    selectCnt = 0;
                }
            } catch (CancelledKeyException e) {
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }
    
    • selectCnt:是为了解决jdk空轮询bug设置的轮询次数
    • selectStrategy: 上一节创建流程中创建的calculateStrategy,服务端启动执行时task中有任务(前面1. 执行入口逻辑中add进去的)此时此处为0,不执行select操作;执行完下面的ranTasks = runAllTasks(0);,第二次循环会返回-1,执行select
    • curDeadlineNanos:下一次定时任务触发截至时间,默认不是定时任务,返回 -1L
    • ioRatio:执行IO事件时间占比,默认为50

    2. 轮询IO事件

    private int select(long deadlineNanos) throws IOException {
          if (deadlineNanos == NONE) {
              return selector.select();
          }
          long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
          return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
    }
    
    • deadlineNanos:默认为NONE(Long类型最大值)
    • selector.select():阻塞,直到有IO事件到来
    • selector.selectNow():立即返回,没有IO事件返回0
    • selector.select(timeoutMillis):最长阻塞timeoutMillis时间后返回

    再来分析下netty是如何解决jdk空轮询bug的

    //执行到了task(ranTasks = true)或 轮询到的IO事件的个数strategy > 0,正常情况肯定会走下面if的逻辑
    //每次把selectCnt置0;当出现jkd空轮询bug(即strategy=0)会走unexpectedSelectorWakeup解决该bug
    if (ranTasks || strategy > 0) {
        selectCnt = 0;
    } else if (unexpectedSelectorWakeup(selectCnt)) {
        selectCnt = 0;
    }
    
    private boolean unexpectedSelectorWakeup(int selectCnt) {
        //空轮询次数达到阈值默认512次时,重构selector
        if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
            rebuildSelector();
            return true;
        }
        return false;
    }
    
    • ranTasks || strategy > 0:执行到了task(ranTasks = true)或 轮询到的IO事件的个数strategy > 0,正常情况肯定会返回true;当连续多次轮询到事件都为0(即strategy=0)说明可能触发了jdk空轮询bug
    • unexpectedSelectorWakeup:当空轮询次数大于512次时会重构当前selector(重写打开一个selector,把原selector上的channel重新注册,然后关闭原selector)
      可以看出netty其实并没有解决这个bug,只是用了一种讨巧的方式规避了这个bug。

    3. 处理IO事件

    private void processSelectedKeys() {
        if (selectedKeys != null) {
            //经过优化的方式
            processSelectedKeysOptimized();
        } else {
            //平庸的方式
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }
    
    • selectedKeys:上一节创建NioEventLoop,执行构造方法时创建(openSelector();),此处不为null

    先来分析下selectedKeys创建过程

    private SelectorTuple openSelector() {
        final Selector unwrappedSelector;
        try {
            //jdk原生selector,内部selectedKeys是一个set集合
            unwrappedSelector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }
        //是否取消优化,默认false
        if (DISABLE_KEY_SET_OPTIMIZATION) {
            return new SelectorTuple(unwrappedSelector);
        }
    Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
                @Override
                public Object run() {
                    try {
                        return Class.forName(
                                "sun.nio.ch.SelectorImpl",
                                false,
                                PlatformDependent.getSystemClassLoader());
                    } catch (Throwable cause) {
                        return cause;
                    }
                }
            });
        //原生SelectorImpl类
        final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
        //netty提供的set,实际上是数组实现,add时间复杂度O(1),效率更高
        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
        Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
                    //jdk9以上的版本直接操作对象内存偏移量设置selectedKeySet
                    if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
                        long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
                        long publicSelectedKeysFieldOffset = PlatformDependent.objectFieldOffset(publicSelectedKeysField);
                        if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
                            PlatformDependent.putObject(unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
                            PlatformDependent.putObject(unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
                            return null;
                        }
                    }
                    //jdk9以下版本通过反射设置selectedKeySet
                    selectedKeysField.set(unwrappedSelector, selectedKeySet);
                    publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                    return null;
                } 
            }
        });
        //保存到成员变量selectedKeys
        selectedKeys = selectedKeySet;
        logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
        return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
    }
    
    • unwrappedSelector:jdk原生selector,内部selectedKeys是一个HashSet集合
    • selectedKeySet:netty提供的set,实际上是数组实现,操作简洁,效率更高
    • SelectorImpl:通过反射或对象内存地址偏移量操作把SelectorImpl的selectedKeys / publicSelectedKeys替换成selectedKeySet

    下面接着分析processSelectedKeysOptimized

    private void processSelectedKeysOptimized() {
        for (int i = 0; i < selectedKeys.size; ++i) {
            //1. 取出IO事件以及对应的channel
            final SelectionKey k = selectedKeys.keys[i];
            //k的引用置null,便于gc回收
            selectedKeys.keys[i] = null;
            //attachment是《netty源码分析(二) - 服务端启动 - 2》中注册selector时放进去的NioServerSocketChannel
            final Object a = k.attachment();
            //2. 处理该channel
            if (a instanceof AbstractNioChannel) {
                //对于boss NioEventLoop,轮询到的基本是连接事件,后续的事情就是通过他的pipeline将连接扔给一个worker NioEventLoop处理
                //对于worker NioEventLoop来说,轮循道的基本商是IO读写事件,后续的事情就是通过他的pipeline将读取到的字节流传递给每个channelHandler来处理
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }
            //3. 注销时需要再次轮询
            if (needsToSelectAgain) {
                //清空数组条目(数组元素都置成null)便于gc回收
                selectedKeys.reset(i + 1);
                selectAgain();
                i = -1;
            }
        }
    }
    
    • selectedKeys:遍历selectedKeys内数组处理轮询到的事件
    • k.attachment():attachment是《netty源码分析(二) - 服务端启动 - 2》中注册selector时放进去的NioServerSocketChannel
    • processSelectedKey:具体处理事件(后续新连接建立流程再具体分析)

    4. 处理tasks

    protected boolean runAllTasks(long timeoutNanos) {
        //把定时任务队列(截至时间次序的优先队列PriorityQueue)中可以执行的任务放到taskQueue中
        fetchFromScheduledTaskQueue();
        //取出第一个task
        Runnable task = pollTask();
        if (task == null) {
            //任务队列为空,返回false
            afterRunningAllTasks();
            return false;
        }
    
        final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
        long runTasks = 0;
        long lastExecutionTime;
        //循环执行taskQueue中所有任务
        for (;;) {
            //task.run()执行任务
            safeExecute(task);
            runTasks ++;
            // 每执行64个任务计算以下超时时间nanoTime()
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }
            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }
        afterRunningAllTasks();
        //更新最后执行时间
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }
    
    • runAllTasks逻辑比较简单,就是执行taskQueue中所有task以及定时任务队列中到时的task

    至此,NioEventLoop主要流程分析完毕

    相关文章

      网友评论

          本文标题:netty源码分析(三) - NioEventLoop - 2执

          本文链接:https://www.haomeiwen.com/subject/paokpktx.html