美文网首页Java World即时通讯Netty
netty源码分析之揭开reactor线程的面纱(一)

netty源码分析之揭开reactor线程的面纱(一)

作者: 简书闪电侠 | 来源:发表于2016-11-20 12:47 被阅读9004次

    netty最核心的就是reactor线程,对应项目中使用广泛的NioEventLoop,那么NioEventLoop里面到底在干些什么事?netty是如何保证事件循环的高效轮询和任务的及时执行?又是如何来优雅地fix掉jdk的nio bug?带着这些疑问,本篇文章将庖丁解牛,带你逐步了解netty reactor线程的真相[源码基于4.1.6.Final]

    reactor 线程的启动

    NioEventLoop的run方法是reactor线程的主体,在第一次添加任务的时候被启动

    NioEventLoop 父类 SingleThreadEventExecutor 的execute方法

    @Override
    public void execute(Runnable task) {
        ...
        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();
            addTask(task);
            ...
        }
        ...
    }
    

    外部线程在往任务队列里面添加任务的时候执行 startThread() ,netty会判断reactor线程有没有被启动,如果没有被启动,那就启动线程再往任务队列里面添加任务

    private void startThread() {
        if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                doStartThread();
            }
        }
    }
    

    SingleThreadEventExecutor 在执行doStartThread的时候,会调用内部执行器executor的execute方法,将调用NioEventLoop的run方法的过程封装成一个runnable塞到一个线程中去执行

    private void doStartThread() {
        ...
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                ...
                    SingleThreadEventExecutor.this.run();
                ...
            }
        }
    }
    

    该线程就是executor创建,对应netty的reactor线程实体。executor 默认是ThreadPerTaskExecutor

    默认情况下,ThreadPerTaskExecutor 在每次执行execute 方法的时候都会通过DefaultThreadFactory创建一个FastThreadLocalThread线程,而这个线程就是netty中的reactor线程实体

    ThreadPerTaskExecutor

    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
    

    关于为啥是 ThreadPerTaskExecutorDefaultThreadFactory的组合来new一个FastThreadLocalThread,这里就不再详细描述,通过下面几段代码来简单说明

    标准的netty程序会调用到NioEventLoopGroup的父类MultithreadEventExecutorGroup的如下代码

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
    }
    

    然后通过newChild的方式传递给NioEventLoop

    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }
    

    关于reactor线程的创建和启动就先讲这么多,我们总结一下:netty的reactor线程在添加一个任务的时候被创建,该线程实体为 FastThreadLocalThread(这玩意以后会开篇文章重点讲讲),最后线程执行主体为NioEventLooprun方法。

    reactor 线程的执行

    那么下面我们就重点剖析一下 NioEventLoop 的run方法

    @Override
    protected void run() {
        for (;;) {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                        // fallthrough
                }
                processSelectedKeys();
                runAllTasks(...);
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            ...
        }
    

    我们抽取出主干,reactor线程做的事情其实很简单,用下面一幅图就可以说明

    reactor action

    reactor线程大概做的事情分为对三个步骤不断循环

    1.首先轮询注册到reactor线程对用的selector上的所有的channel的IO事件

    select(wakenUp.getAndSet(false));
    if (wakenUp.get()) {
        selector.wakeup();
    }
    

    2.处理产生网络IO事件的channel

    processSelectedKeys();
    

    3.处理任务队列

    runAllTasks(...);
    

    下面对每个步骤详细说明

    select操作

    select(wakenUp.getAndSet(false));
    if (wakenUp.get()) {
          selector.wakeup();
    }
    

    wakenUp 表示是否应该唤醒正在阻塞的select操作,可以看到netty在进行一次新的loop之前,都会将wakeUp 被设置成false,标志新的一轮loop的开始,具体的select操作我们也拆分开来看

    1.定时任务截止事时间快到了,中断本次轮询

    int selectCnt = 0;
    long currentTimeNanos = System.nanoTime();
    long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
    
    for (;;) {
        long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
        if (timeoutMillis <= 0) {
            if (selectCnt == 0) {
                selector.selectNow();
                selectCnt = 1;
            }
            break;
        }
        ....
    }
    

    我们可以看到,NioEventLoop中reactor线程的select操作也是一个for循环,在for循环第一步中,如果发现当前的定时任务队列中有任务的截止事件快到了(<=0.5ms),就跳出循环。此外,跳出之前如果发现目前为止还没有进行过select操作(if (selectCnt == 0)),那么就调用一次selectNow(),该方法会立即返回,不会阻塞

    这里说明一点,netty里面定时任务队列是按照延迟时间从小到大进行排序, delayNanos(currentTimeNanos)方法即取出第一个定时任务的延迟时间

    protected long delayNanos(long currentTimeNanos) {
        ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
        if (scheduledTask == null) {
            return SCHEDULE_PURGE_INTERVAL;
        }
        return scheduledTask.delayNanos(currentTimeNanos);
     }
    

    关于netty的任务队列(包括普通任务,定时任务,tail task)相关的细节后面会另起一片文章,这里不过多展开

    2.轮询过程中发现有任务加入,中断本次轮询

    for (;;) {
        // 1.定时任务截至事时间快到了,中断本次轮询
        ...
        // 2.轮询过程中发现有任务加入,中断本次轮询
        if (hasTasks() && wakenUp.compareAndSet(false, true)) {
            selector.selectNow();
            selectCnt = 1;
            break;
        }
        ....
    }
    

    netty为了保证任务队列能够及时执行,在进行阻塞select操作的时候会判断任务队列是否为空,如果不为空,就执行一次非阻塞select操作,跳出循环

    3.阻塞式select操作

    for (;;) {
        // 1.定时任务截至事时间快到了,中断本次轮询
        ...
        // 2.轮询过程中发现有任务加入,中断本次轮询
        ...
        // 3.阻塞式select操作
        int selectedKeys = selector.select(timeoutMillis);
        selectCnt ++;
        if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
            break;
        }
        ....
    }
    

    执行到这一步,说明netty任务队列里面队列为空,并且所有定时任务延迟时间还未到(大于0.5ms),于是,在这里进行一次阻塞select操作,截止到第一个定时任务的截止时间

    这里,我们可以问自己一个问题,如果第一个定时任务的延迟非常长,比如一个小时,那么有没有可能线程一直阻塞在select操作,当然有可能!But,只要在这段时间内,有新任务加入,该阻塞就会被释放

    外部线程调用execute方法添加任务

    @Override
    public void execute(Runnable task) { 
        ...
        wakeup(inEventLoop); // inEventLoop为false
        ...
    }
    

    调用wakeup方法唤醒selector阻塞

    protected void wakeup(boolean inEventLoop) {
        if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
            selector.wakeup();
        }
    }
    

    可以看到,在外部线程添加任务的时候,会调用wakeup方法来唤醒 selector.select(timeoutMillis)

    阻塞select操作结束之后,netty又做了一系列的状态判断来决定是否中断本次轮询,中断本次轮询的条件有

    • 轮询到IO事件 (selectedKeys != 0
    • oldWakenUp 参数为true
    • 任务队列里面有任务(hasTasks
    • 第一个定时任务即将要被执行 (hasScheduledTasks()
    • 用户主动唤醒(wakenUp.get()

    4.解决jdk的nio bug

    关于该bug的描述见 http://bugs.java.com/bugdatabase/view_bug.do?bug_id=6595055)

    该bug会导致Selector一直空轮询,最终导致cpu 100%,nio server不可用,严格意义上来说,netty没有解决jdk的bug,而是通过一种方式来巧妙地避开了这个bug,具体做法如下

    long currentTimeNanos = System.nanoTime();
    for (;;) {
        // 1.定时任务截止事时间快到了,中断本次轮询
        ...
        // 2.轮询过程中发现有任务加入,中断本次轮询
        ...
        // 3.阻塞式select操作
        selector.select(timeoutMillis);
        // 4.解决jdk的nio bug
        long time = System.nanoTime();
        if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
            selectCnt = 1;
        } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
    
            rebuildSelector();
            selector = this.selector;
            selector.selectNow();
            selectCnt = 1;
            break;
        }
        currentTimeNanos = time; 
        ...
     }
    

    netty 会在每次进行 selector.select(timeoutMillis) 之前记录一下开始时间currentTimeNanos,在select之后记录一下结束时间,判断select操作是否至少持续了timeoutMillis秒(这里将time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos改成time - currentTimeNanos >= TimeUnit.MILLISECONDS.toNanos(timeoutMillis)或许更好理解一些),
    如果持续的时间大于等于timeoutMillis,说明就是一次有效的轮询,重置selectCnt标志,否则,表明该阻塞方法并没有阻塞这么长时间,可能触发了jdk的空轮询bug,当空轮询的次数超过一个阀值的时候,默认是512,就开始重建selector

    空轮询阀值相关的设置代码如下

    int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
    if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
        selectorAutoRebuildThreshold = 0;
    }
    SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
    

    下面我们简单描述一下netty 通过rebuildSelector来fix空轮询bug的过程,rebuildSelector的操作其实很简单:new一个新的selector,将之前注册到老的selector上的的channel重新转移到新的selector上。我们抽取完主要代码之后的骨架如下

    public void rebuildSelector() {
        final Selector oldSelector = selector;
        final Selector newSelector;
        newSelector = openSelector();
    
        int nChannels = 0;
         try {
            for (;;) {
                    for (SelectionKey key: oldSelector.keys()) {
                        Object a = key.attachment();
                         if (!key.isValid() || key.channel().keyFor(newSelector) != null) {
                             continue;
                         }
                         int interestOps = key.interestOps();
                         key.cancel();
                         SelectionKey newKey = key.channel().register(newSelector, interestOps, a);
                         if (a instanceof AbstractNioChannel) {
                             ((AbstractNioChannel) a).selectionKey = newKey;
                          }
                         nChannels ++;
                    }
                    break;
            }
        } catch (ConcurrentModificationException e) {
            // Probably due to concurrent modification of the key set.
            continue;
        }
        selector = newSelector;
        oldSelector.close();
    }
    

    首先,通过openSelector()方法创建一个新的selector,然后执行一个死循环,只要执行过程中出现过一次并发修改selectionKeys异常,就重新开始转移

    具体的转移步骤为

    1. 拿到有效的key
    2. 取消该key在旧的selector上的事件注册
    3. 将该key对应的channel注册到新的selector上
    4. 重新绑定channel和新的key的关系

    转移完成之后,就可以将原有的selector废弃,后面所有的轮询都是在新的selector进行

    最后,我们总结reactor线程select步骤做的事情:不断地轮询是否有IO事件发生,并且在轮询的过程中不断检查是否有定时任务和普通任务,保证了netty的任务队列中的任务得到有效执行,轮询过程顺带用一个计数器避开了了jdk空轮询的bug,过程清晰明了

    由于篇幅原因,下面两个过程将分别放到一篇文章中去讲述,尽请期待

    process selected keys

    未完待续

    run tasks

    未完待续

    最后,通过文章开头一副图,我们再次熟悉一下netty的reactor线程做的事儿


    reactor action
    1. 轮询IO事件
    2. 处理轮询到的事件
    3. 执行任务队列中的任务

    最后,仅在简书博客的最后,介绍一下今天新开的知识星球,希望能给你的源码之旅多一点陪伴和指导。

    1.每天会回答大家有关netty网络编程方面的问题,如果你有困惑,尽管提出来,每一条我都会回答
    2.不定期分享netty以及程序员效率相关的知识,确保每条知识对你都受用
    3.星球试运营不做推广,目前所有的用户都来自我的简书博客,志同道合的人在一起,擦出不一样的火花
    4.一个人的学习是寂寞的,低效的。不断的与交流,思考,并适当的接受指导和挑战,这样的学习才会有乐趣
    5.最后,透露一下,星球第一条知识可以给你免费的惊喜

    image.png

    相关文章

      网友评论

      • 快乐王子_ea4e:你好,再请教个问题,NioServerSocketChannel绑定的时候只能绑定是绑定到一个Boss NioEventLoop么,发现只是在ServerBootstrap bind时候,将一个NioServerSocketChannel绑定到一个Boss NioEventLoop,如果是这样,那Boss group指定多个NioEventLoop有什么意义呢
        快乐王子_ea4e:@简书闪电侠 OK
        简书闪电侠:@快乐王子_ea4e 其实最终只会创建一个线程,因为reactor线程的创建是懒创建模式,如果考虑优化到极致的话,那么指定参数为1即可
      • 蓝汝丶琪:在第一次执行任务才创建线程是不是懒加载模式?在Netty4.0版本的时候,都是在初始化的时候就创建线程了。
        简书闪电侠:@蓝汝丶琪 对的,是懒加载模式,直到连接数到达2*cpu,会一直创建线程
      • 28911c019847:也就是说react线程永远不需要阻塞,即使没有任务,它都是按照那三个步骤循环下去?
      • 快乐王子_ea4e:selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
        doRegister中的ops参数为什么是0
        简书闪电侠:@陌生爱_73d4 如果没有连接,select这一步每次都会阻塞一段时间,其他的两步没有阻塞
        28911c019847:也就是说react线程永远不需要阻塞,即使没有任务,它都是按照那三个步骤循环下去?
        简书闪电侠:这个时候ops为0表示不关心任何事件,等一切准备工作就绪之后,后面会通过其他的方式来注册ACCEPT或者READ事件
      • ExploringKing:请问本文的图是用什么软件画的? 效果很赞啊!
        ExploringKing:@the_flash 我也是用的这软件,基本只能画线条,原来还可以画这么好的图:+1:
        简书闪电侠:@哥大休一 mac上一款画图软件,叫ominigraff
      • 84b79e5bba2f:bossGroup = new NioEventLoopGroup(2) ,
        2个child , NioEventLoop 什么时候开始执行doStartThread() , debug时候2个child 好像只启动了一个 .
        简书闪电侠:@帝君凝目 一般一个端口一个线程,监听端口的负载一般很小,一个线程完全够。多个线程多个端口
        简书徐小耳:@简书闪电侠 那是不是可以理解就算我bossgroup 设置多个线程也没有意义 因为一个端口只能绑定一个 那么当某个端口的netty连接事件很多 一个线程也够吗? 一般netty服务端都启动几个线程绑定不同的端口
        简书闪电侠:1.主线程在执行bind操作的时候,会经历以下流程

        final ChannelFuture initAndRegister() {
        //... 选择一个NioEventLoop
        ChannelFuture regFuture = config().group().register(channel);
        //...
        }

        所以,执行一次bind操作,会选择一个NioEventLoop,目前,大多数操作系统是不支持多线程bind同一个端口,所以即使boos设置了两个NioEventLoop,也永远只会使用到一个

        2.register()之后,会路过下面一段代码


        AbstractChannel.java

        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        //...省略了非关键部分
        if (eventLoop.inEventLoop()) {
        register0(promise);
        } else {
        eventLoop.execute(new Runnable() {
        public void run() {
        register0(promise);
        }
        });
        }
        }

        由于当前是在主线程里面,所以eventLoop.inEventLoop()返回false,进入到else分支

        SingleThreadEventExecutor.java

        public void execute(Runnable task) {
        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
        addTask(task);
        } else {
        startThread();
        addTask(task);
        if (isShutdown() && removeTask(task)) {
        reject();
        }
        }
        }
        以上同样省去了非关键代码

        至于为啥会经过这些代码,可自行debug来看,具体方法可参考:http://www.jianshu.com/p/c5068caab217,这篇文章也是讲述 服务端启动过程

      本文标题:netty源码分析之揭开reactor线程的面纱(一)

      本文链接:https://www.haomeiwen.com/subject/ixzepttx.html