美文网首页金融基础技术与业务netty
Netty系列(三):说说NioEventLoop

Netty系列(三):说说NioEventLoop

作者: marsjhe | 来源:发表于2019-01-29 11:20 被阅读102次

前言

本来想先写下NioServerSocketChannel以及NioSocketChannel的注册流程的,但是最后发现始终离不开NioEventLoop这个类,所以在这之前必须得先讲解下NioEventLoop这个类到底是用来做啥的。其实在第一篇文章里面有提及到它的,但是没有详细的去讲解,接下来会对它分析一波。


设计模型

在进入正文之前,先简单的了解下NioEventLoop的工作模型(服务端):


假设一个NioEventLoopGroup(这里服务端会用两个Group)里面有4个NioEventLoop,那么netty中的实际工作模型就如上图所示,服务端会用默认的选择规则从Group1中选择出一个NioEventLoop注册ServerChannel,并绑定一个OP_ACCEPT用于监听客户端发起的连接请求,一旦有新的连接进来,服务端则会从Group2中按一定的规则选出一个NioEventLoop来注册SocketChannel,并绑定OP_READ兴趣事件,这里注意,一个NioEventLoop可以绑定多个SocketChannel。具体的注册流程我会在下一篇文章中写出来。

下面进入正题。


构造流程

NioEventLoop具体的构造流程大家可以去我的Netty系列(一):NioEventLoopGroup源码解析中去看一下,里面说的还算蛮详细的。下面是其调用的构造函数,咱们可以观察到其身上会绑定一个选择器Selector,供后期channel注册的时候使用的,这一块是JAVA NIO相关的知识点。


内部还维护着一个executor去开启执行线程的,以及taskQueue任务队列和一个tailTasks尾部队列(这个队列里面的任务是在每次执行taskQueue任务队列中的任务结束后都会去调用的,不多介绍)。上面介绍的三个四个内部结构Selector,executor,taskQueue,tailTasks会在后面多次提起。
下图是NioEventLoop的简单的层级结构(下图取之于Netty in Action):


NioEventLoop.execute

这里我们先看一下NioEventLoop的execute方法。实际上这个方法是在其父类SingleThreadEventExecutor中。这个方法的功能就是将任务丢到taskQueue中。

    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        addTask(task);
        if (!inEventLoop) {
            // 开启工作线程,实际上也就是执行NioEventLoop中的run方法,下面会介绍
            startThread();
            if (isShutdown()) {
                boolean reject = false;
                try {
                    if (removeTask(task)) {
                        reject = true;
                    }
                } catch (UnsupportedOperationException e) {
                    // The task queue does not support removal so the best thing we can do is to just move on and
                    // hope we will be able to pick-up the task before its completely terminated.
                    // In worst case we will log on termination.
                }
                if (reject) {
                    reject();
                }
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }

  1. 添加task到执行队列中,也就是咱们上文提起的taskQueue中。
  2. 判断这个NioEventLoop中的是否已经开启过线程。
  3. 若未开启,则必须先启动线程任务,也就是我们下文会介绍的run方法。
  4. 首次初始化会在taskQueue中丢一个空任务去唤醒线程。

NioEventLoop的工作模式实际上就是开启一个单线程跑一个死循环,然后一直轮询taskQueue队列是否有任务添加进来,然后就去处理任务,还有就是如果注册在selector上的channel有兴趣事件进来,也会去处理selectorKeys,这一块下面会做介绍。


NioEventLoop.run

现在看看NioEventLoop中的run方法

    protected void run() {
        for (;;) {
            try {
                try {
                    // 按默认配置的话要么返回select.selectNow(),
                    //要么返回SelectStrategy.SELECT
                   switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));

                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                        // fall through
                    default:
                    }
                } catch (IOException e) {
                    rebuildSelector0();
                    handleLoopException(e);
                    continue;
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        // IO操作,根据selectedKeys去处理
                        processSelectedKeys();
                    } finally {
                        // 保证执行完所有的任务
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        // IO操作,根据selectedKeys去处理
                        processSelectedKeys();
                    } finally {
                        // 按一定的比例去处理任务,有可能遗留一部分任务下次进行处理
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                // 释放资源,将注册的channel全部关闭掉。
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) { 
                handleLoopException(t);
            }
        }
    }

  1. 这里有个默认的计算策略:
    return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT

1.有任务便会直接返回select.selectNow(),则会直接去跑任务或者是去处理selectorKeys
2.若没任务,则会走select(wakenUp.getAndSet(false))方法,里面会有一个timeout超时处理,selector.select(timeoutMillis),超时后也会去跑任务或者是去处理selectorKeys

这一块具体细节也很多,只是说一下处理流程。

  1. 注意上面有个ioRatio == 100这个判断条件,如果为100,则会将任务全部处理完成;否则会与io操作按一定的比例去执行任务。

这里的IO操作就是processSelectedKeys()方法,代码虽然很长,但是干的活就是根据不同的兴趣事件干不同的活,里面有对OP_READ OP_ACCEPT OP_WRITE OP_CONNECT等等不同兴趣事件的不同处理方法,这一块应该是JAVA NIO里面的相关知识点。感兴趣的朋友可以debug针对某个触发事件研究一下。


runAllTasks

执行任务的代码如下(下面是runAllTasks的代码):

    protected boolean runAllTasks() {
        assert inEventLoop();
        boolean fetchedAll;
        boolean ranAtLeastOne = false;

        do {
            // 这里会从定时任务队列中将达到执行事件的task丢到taskQueue中去
            fetchedAll = fetchFromScheduledTaskQueue();
            // 执行taskQueue中所有的task
            if (runAllTasksFrom(taskQueue)) {
                ranAtLeastOne = true;
            }
        } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.

        if (ranAtLeastOne) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
        }
        // 这个是执行上面所说的tailTasks中的task
        afterRunningAllTasks();
        return ranAtLeastOne;
    }

这一块的逻辑是:
先执行fetchFromScheduledTaskQueue方法,将到期的定时任务丢到taskQueue队列中,这个fetchFromScheduledTaskQueue方法里面有个小细节,当taskQueue队列满了之后,它就会重新塞到scheduledTaskQueue队列中,然后再外圈循环,taskQueue队列消费完毕,则继续执行fetchFromScheduledTaskQueue方法,直到把所有到期的任务都丢到taskQueue队列中执行完毕为止。如下图所示:

netty_runTasks.png

这一部分到这里就结束了,下一篇会对NioServerSocketChannel的注册以及服务端创建NioSocketChannel进行分析。


End

相关文章

网友评论

    本文标题:Netty系列(三):说说NioEventLoop

    本文链接:https://www.haomeiwen.com/subject/dxbvjqtx.html