美文网首页金融基础技术与业务netty
Netty系列(三):说说NioEventLoop

Netty系列(三):说说NioEventLoop

作者: marsjhe | 来源:发表于2019-01-29 11:20 被阅读102次

    前言

    本来想先写下NioServerSocketChannel以及NioSocketChannel的注册流程的,但是最后发现始终离不开NioEventLoop这个类,所以在这之前必须得先讲解下NioEventLoop这个类到底是用来做啥的。其实在第一篇文章里面有提及到它的,但是没有详细的去讲解,接下来会对它分析一波。


    设计模型

    在进入正文之前,先简单的了解下NioEventLoop的工作模型(服务端):


    假设一个NioEventLoopGroup(这里服务端会用两个Group)里面有4个NioEventLoop,那么netty中的实际工作模型就如上图所示,服务端会用默认的选择规则从Group1中选择出一个NioEventLoop注册ServerChannel,并绑定一个OP_ACCEPT用于监听客户端发起的连接请求,一旦有新的连接进来,服务端则会从Group2中按一定的规则选出一个NioEventLoop来注册SocketChannel,并绑定OP_READ兴趣事件,这里注意,一个NioEventLoop可以绑定多个SocketChannel。具体的注册流程我会在下一篇文章中写出来。

    下面进入正题。


    构造流程

    NioEventLoop具体的构造流程大家可以去我的Netty系列(一):NioEventLoopGroup源码解析中去看一下,里面说的还算蛮详细的。下面是其调用的构造函数,咱们可以观察到其身上会绑定一个选择器Selector,供后期channel注册的时候使用的,这一块是JAVA NIO相关的知识点。


    内部还维护着一个executor去开启执行线程的,以及taskQueue任务队列和一个tailTasks尾部队列(这个队列里面的任务是在每次执行taskQueue任务队列中的任务结束后都会去调用的,不多介绍)。上面介绍的三个四个内部结构Selector,executor,taskQueue,tailTasks会在后面多次提起。
    下图是NioEventLoop的简单的层级结构(下图取之于Netty in Action):


    NioEventLoop.execute

    这里我们先看一下NioEventLoop的execute方法。实际上这个方法是在其父类SingleThreadEventExecutor中。这个方法的功能就是将任务丢到taskQueue中。

        public void execute(Runnable task) {
            if (task == null) {
                throw new NullPointerException("task");
            }
    
            boolean inEventLoop = inEventLoop();
            addTask(task);
            if (!inEventLoop) {
                // 开启工作线程,实际上也就是执行NioEventLoop中的run方法,下面会介绍
                startThread();
                if (isShutdown()) {
                    boolean reject = false;
                    try {
                        if (removeTask(task)) {
                            reject = true;
                        }
                    } catch (UnsupportedOperationException e) {
                        // The task queue does not support removal so the best thing we can do is to just move on and
                        // hope we will be able to pick-up the task before its completely terminated.
                        // In worst case we will log on termination.
                    }
                    if (reject) {
                        reject();
                    }
                }
            }
    
            if (!addTaskWakesUp && wakesUpForTask(task)) {
                wakeup(inEventLoop);
            }
        }
    
    
    1. 添加task到执行队列中,也就是咱们上文提起的taskQueue中。
    2. 判断这个NioEventLoop中的是否已经开启过线程。
    3. 若未开启,则必须先启动线程任务,也就是我们下文会介绍的run方法。
    4. 首次初始化会在taskQueue中丢一个空任务去唤醒线程。

    NioEventLoop的工作模式实际上就是开启一个单线程跑一个死循环,然后一直轮询taskQueue队列是否有任务添加进来,然后就去处理任务,还有就是如果注册在selector上的channel有兴趣事件进来,也会去处理selectorKeys,这一块下面会做介绍。


    NioEventLoop.run

    现在看看NioEventLoop中的run方法

        protected void run() {
            for (;;) {
                try {
                    try {
                        // 按默认配置的话要么返回select.selectNow(),
                        //要么返回SelectStrategy.SELECT
                       switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                        case SelectStrategy.CONTINUE:
                            continue;
    
                        case SelectStrategy.BUSY_WAIT:
                        case SelectStrategy.SELECT:
                            select(wakenUp.getAndSet(false));
    
                            if (wakenUp.get()) {
                                selector.wakeup();
                            }
                            // fall through
                        default:
                        }
                    } catch (IOException e) {
                        rebuildSelector0();
                        handleLoopException(e);
                        continue;
                    }
    
                    cancelledKeys = 0;
                    needsToSelectAgain = false;
                    final int ioRatio = this.ioRatio;
                    if (ioRatio == 100) {
                        try {
                            // IO操作,根据selectedKeys去处理
                            processSelectedKeys();
                        } finally {
                            // 保证执行完所有的任务
                            runAllTasks();
                        }
                    } else {
                        final long ioStartTime = System.nanoTime();
                        try {
                            // IO操作,根据selectedKeys去处理
                            processSelectedKeys();
                        } finally {
                            // 按一定的比例去处理任务,有可能遗留一部分任务下次进行处理
                            final long ioTime = System.nanoTime() - ioStartTime;
                            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                        }
                    }
                } catch (Throwable t) {
                    handleLoopException(t);
                }
                // Always handle shutdown even if the loop processing threw an exception.
                try {
                    // 释放资源,将注册的channel全部关闭掉。
                    if (isShuttingDown()) {
                        closeAll();
                        if (confirmShutdown()) {
                            return;
                        }
                    }
                } catch (Throwable t) { 
                    handleLoopException(t);
                }
            }
        }
    
    
    1. 这里有个默认的计算策略:
      return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT

    1.有任务便会直接返回select.selectNow(),则会直接去跑任务或者是去处理selectorKeys
    2.若没任务,则会走select(wakenUp.getAndSet(false))方法,里面会有一个timeout超时处理,selector.select(timeoutMillis),超时后也会去跑任务或者是去处理selectorKeys

    这一块具体细节也很多,只是说一下处理流程。

    1. 注意上面有个ioRatio == 100这个判断条件,如果为100,则会将任务全部处理完成;否则会与io操作按一定的比例去执行任务。

    这里的IO操作就是processSelectedKeys()方法,代码虽然很长,但是干的活就是根据不同的兴趣事件干不同的活,里面有对OP_READ OP_ACCEPT OP_WRITE OP_CONNECT等等不同兴趣事件的不同处理方法,这一块应该是JAVA NIO里面的相关知识点。感兴趣的朋友可以debug针对某个触发事件研究一下。


    runAllTasks

    执行任务的代码如下(下面是runAllTasks的代码):

        protected boolean runAllTasks() {
            assert inEventLoop();
            boolean fetchedAll;
            boolean ranAtLeastOne = false;
    
            do {
                // 这里会从定时任务队列中将达到执行事件的task丢到taskQueue中去
                fetchedAll = fetchFromScheduledTaskQueue();
                // 执行taskQueue中所有的task
                if (runAllTasksFrom(taskQueue)) {
                    ranAtLeastOne = true;
                }
            } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
    
            if (ranAtLeastOne) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
            }
            // 这个是执行上面所说的tailTasks中的task
            afterRunningAllTasks();
            return ranAtLeastOne;
        }
    

    这一块的逻辑是:
    先执行fetchFromScheduledTaskQueue方法,将到期的定时任务丢到taskQueue队列中,这个fetchFromScheduledTaskQueue方法里面有个小细节,当taskQueue队列满了之后,它就会重新塞到scheduledTaskQueue队列中,然后再外圈循环,taskQueue队列消费完毕,则继续执行fetchFromScheduledTaskQueue方法,直到把所有到期的任务都丢到taskQueue队列中执行完毕为止。如下图所示:

    netty_runTasks.png

    这一部分到这里就结束了,下一篇会对NioServerSocketChannel的注册以及服务端创建NioSocketChannel进行分析。


    End

    相关文章

      网友评论

        本文标题:Netty系列(三):说说NioEventLoop

        本文链接:https://www.haomeiwen.com/subject/dxbvjqtx.html