美文网首页源码阅读
Netty源码学习(3)--Reactor模式

Netty源码学习(3)--Reactor模式

作者: 未名枯草 | 来源:发表于2018-01-10 14:28 被阅读487次

    何为Reactor线程模型?

    Reactor模式是事件驱动的,有一个或多个并发输入源,有一个Service Handler,有多个Request Handlers;这个Service Handler会同步的将输入的请求(Event)多路复用的分发给相应的Request Handler。

    Reactor单线程模型就是指所有的IO操作都在同一个NIO线程上面完成的,也就是IO处理线程是单线程的。NIO线程的职责是: 

    (1)作为NIO服务端,接收客户端的TCP连接;

    (2)作为NIO客户端,向服务端发起TCP连接;

    (3)读取通信对端的请求或则应答消息;

    (4)向通信对端发送消息请求或则应答消息。

    从结构上,这有点类似生产者消费者模式,即有一个或多个生产者将事件放入一个Queue中,而一个或多个消费者主动的从这个Queue中Poll事件来处理;而Reactor模式则并没有Queue来做缓冲,每当一个Event输入到Service Handler之后,该Service Handler会立刻的根据不同的Event类型将其分发给对应的Request Handler来处理

    这个做的好处有很多,首先我们可以将处理event的Request handler实现一个单独的线程,即

    这样Service Handler 和request Handler实现了异步,加快了service Handler处理event的速度,那么每一个request同样也可以以多线程的形式来处理自己的event,即Thread1 扩展成Thread pool 1,

    Netty的Reactor线程模型

    1 Reactor单线程模型 

    Reactor机制中保证每次读写能非阻塞读写:Acceptor类接收客户端的TCP请求消息,当链路建立成功之后,通过Dispatch将对应的ByteBuffer转发到指定的handler上,进行消息的处理。

    一个线程(单线程)来处理CONNECT事件(Acceptor),一个线程池(多线程)来处理read,一个线程池(多线程)来处理write,那么从Reactor Thread到handler都是异步的,从而IO操作也多线程化。由于Reactor Thread依然为单线程,从性能上考虑依然有所限制。

    对于一些小容量的应用场景下,可以使用单线程模型,但是对于高负载、大并发的应用场景却不适合,主要原因如下: 

    (1)一个NIO线程处理成千上万的链路,性能无法支撑,即使CPU的负荷达到100%;

    (2)当NIO线程负载过重,处理性能就会变慢,导致大量客户端连接超时然后重发请求,导致更多堆积未处理的请求,成为性能瓶颈。

    (3)可靠性低,只有一个NIO线程,万一线程假死或则进入死循环,就完全不可用了,这是不能接受的。

    2 Reactor多线程模型

    Reactor多线程模型与单线程模型最大的区别在于,IO处理线程不再是一个线程,而是一组NIO处理线程。原理如下图所:

    Reactor多线程模型的特点如下: 

    (1)有一个专门的NIO线程—-Acceptor线程用于监听服务端,接收客户端的TCP连接请求。

    (2)网络IO操作—-读写等操作由一个专门的线程池负责,线程池可以使用JDK标准的线程池实现,包含一个任务队列和N个可用的线程,这些NIO线程就负责读取、解码、编码、发送。

    (3)一个NIO线程可以同时处理N个链路,但是一个链路只对应一个NIO线程。

    通过Reactor Thread Pool来提高event的分发能力。

    Reactor多线程模型可以满足绝大多数的场景,除了一些个别的特殊场景:比如一个NIO线程负责处理客户所有的连接请求,但是如果连接请求中包含认证的需求(安全认证),在百万级别的场景下,就存在性能问题了,因为认证本身就要消耗CPU,为了解决这种情景下的性能问题,产生了第三种线程模型:Reactor主从线程模型。

    3 Reactor主从模型

    主从Reactor线程模型的特点是:服务端用于接收客户端连接的不再是一个单独的NIO线程,而是一个独立的NIO的线程池。Acceptor接收到客户端TCP连接请求并处理完成后(可能包含接入认证),再将新创建的SocketChannel注册到IO线程池(sub reactor)的某个IO处理线程上并处理编解码和读写工作。Acceptor线程池仅负责客户端的连接与认证,一旦链路连接成功,就将链路注册到后端的sub Reactor的IO线程池中。利用主从Reactor模型可以解决服务端监听线程无法有效处理所有客户连接的性能不足问题,这也是netty推荐使用的线程模型。

    netty的线程模型

    netty的线程模型是可以通过设置启动类的参数来配置的,设置不同的启动参数,netty支持Reactor单线程模型、多线程模型和主从Reactor多线程模型。 

    4. netty的线程模型

    netty的线程模型是可以通过设置启动类的参数来配置的,设置不同的启动参数,netty支持Reactor单线程模型、多线程模型和主从Reactor多线程模型。

    服务端启动时创建了两个NioEventLoopGroup,一个是boss,一个是worker。实际上他们是两个独立的Reactor线程池,一个用于接收客户端的TCP连接,另一个用于处理Io相关的读写操作,或则执行系统的Task,定时Task。

    Boss线程池职责如下: 

    (1)接收客户端的连接,初始化Channel参数 

    (2)将链路状态变更时间通知给ChannelPipeline

    worker线程池作用是: 

    (1)异步读取通信对端的数据报,发送读事件到ChannelPipeline 

    (2)异步发送消息到通信对端,调用ChannelPipeline的消息发送接口 

    (3)执行系统调用Task; 

    (4)执行定时任务Task;

    通过配置boss和worker线程池的线程个数以及是否共享线程池等方式,netty的线程模型可以在单线程、多线程、主从线程之间切换。

    为了提升性能,netty在很多地方都进行了无锁设计。比如在IO线程内部进行串行操作,避免多线程竞争造成的性能问题。表面上似乎串行化设计似乎CPU利用率不高,但是通过调整NIO线程池的线程参数,可以同时启动多个串行化的线程并行运行,这种局部无锁串行线程设计性能更优。 

    NioEventLoop是Netty的Reactor线程,它在Netty Reactor线程模型中的职责如下:

    1. 作为服务端Acceptor线程,负责处理客户端的请求接入2. 作为客户端Connecor线程,负责注册监听连接操作位,用于判断异步连接结果3. 作为IO线程,监听网络读操作位,负责从SocketChannel中读取报文4. 作为IO线程,负责向SocketChannel写入报文发送给对方,如果发生写半包,会自动注册监听写事件,用于后续继续发送半包数据,直到数据全部发送完成

    如下图,是一个NioEventLoop的处理链:

    处理链中的处理方法是串行化执行的

    一个客户端连接只注册到一个NioEventLoop上,避免了多个IO线程并发操作

    3.2.1 Task

    Netty Reactor线程模型中有两种Task:系统Task和定时Task

    系统Task:创建它们的主要原因是,当IO线程和用户线程都在操作同一个资源时,为了防止并发操作时锁的竞争问题,将用户线程封装为一个Task,在IO线程负责执行,实现局部无锁化

    定时Task:主要用于监控和检查等定时动作

    基于以上原因,NioEventLoop不是一个纯粹的IO线程,它还会负责用户线程的调度

    IO线程的分配细节

    线程池对IO线程进行资源管理,是通过EventLoopGroup实现的。线程池平均分配channel到所有的线程(循环方式实现,不是100%准确),

    一个线程在同一时间只会处理一个通道的IO操作,这种方式可以确保我们不需要关心同步问题。

    Selector

    NioEventLoop是Reactor的核心线程,那么它就就必须实现多路复用。

    ioEevntLoopGroup

    EventExecutorGroup:提供管理EevntLoop的能力,他通过next()来为任务分配执行线程,同时也提供了shutdownGracefully这一优雅下线的接口

    EventLoopGroup继承了EventExecutorGroup接口,并新添了3个方法

    EventLoop next()

    ChannelFuture register(Channel channel)

    ChannelFuture register(Channel channel, ChannelPromise promise)

    EventLoopGroup的实现中使用next().register(channel)来完成channel的注册,即将channel注册时就绑定了一个EventLoop,然后EvetLoop将channel注册到EventLoop的Selector上。

    NioEventLoopGroup还有几点需要注意:

    NioEventLoopGroup下默认的NioEventLoop个数为cpu核数 * 2,因为有很多的io处理

    NioEventLoop和java的single线程池在5里差异变大了,它本身不负责线程的创建销毁,而是由外部传入的线程池管理

    channel和EventLoop是绑定的,即一旦连接被分配到EventLoop,其相关的I/O、编解码、超时处理都在同一个EventLoop中,这样可以确保这些操作都是线程安全的

     服务端线程模型

    结合Netty的源码,对服务端创建线程工作流程进行介绍:

    第一步,从用户线程发起创建服务端操作,代码如下:

    通常情况下,服务端的创建是在用户进程启动的时候进行,因此一般由Main函数或者启动类负责创建,服务端的创建由业务线程负责完成。在创建服务端的时候实例化了2个EventLoopGroup,1个EventLoopGroup实际就是一个EventLoop线程组,负责管理EventLoop的申请和释放。

    EventLoopGroup管理的线程数可以通过构造函数设置,如果没有设置,默认取-Dio.netty.eventLoopThreads,如果该系统参数也没有指定,则为可用的CPU内核数 × 2。

    bossGroup线程组实际就是Acceptor线程池,负责处理客户端的TCP连接请求,如果系统只有一个服务端端口需要监听,则建议bossGroup线程组线程数设置为1。

    workerGroup是真正负责I/O读写操作的线程组,通过ServerBootstrap的group方法进行设置,用于后续的Channel绑定。

    逐步debug,发现,HeadContext及TailContext的父类AbstractChannelHandlerContext构造函数,在初始化时,使用group内容为pipeline及channel。

    如图中所示,如果group不为空,group.next()返回的就是bossGroup,它的next方法用于从线程组中获取可用线程.

    reactor 线程的启动

    1)bossGroup初始化时,启动线程,过程如下

    EventLoopGroup bossGroup =new NioEventLoopGroup();

    debug进入NioEventLoopGroup构造函数,继续debug到下一个构造函数:

    此时,线程nThreads数量为0;,继续debug进入下一层构造函数,:

    此时,构造函数中的参数值如上图,继续debug进入下一层构造函数:

    继续debug,此时已知,class NioEventLoopGroupextends MultithreadEventLoopGroup,进入下一层后,跳转到

    MultithreadEventLoopGroup类中的构造函数中,如下图。

    如果没有指定创建的线程数量,则默认创建的线程个数为DEFAULT_EVENT_LOOP_THREADS,该数值为:处理器数量x2

    已知:class MultithreadEventLoopGroupextends MultithreadEventExecutorGroupimplements EventLoopGroup

    此时,跳转到其父类MultithreadEventExecutorGroupimplements构造函数:

     变量children就是用来存放创建的线程的数组,里面每一个元素都通过children[i] = newChild(threadFactory, args)创建。而newChild方法则由子类NioEventLoopGroup实现,debug后跳入NioEventLoopGroup中:

      变量children每个元素的真实类型为NioEventLoop,

    debug后跳转到NioEventLoop类中的构造函数:

    而NioEventLoop的类关系图如下

    此时首先分析第一句:super(parent, threadFactory, false);

    跳转到SingleThreadEventExecutor中:

    在构造函数中,启动线程

    线程启动后,创建任务队列taskQueue:

    boss线程就在此处创建:thread = threadFactory.newThread(new Runnable()

    同时也创建了线程的任务队列,是一个LinkedBlockingQueue结构。

    SingleThreadEventExecutor.this.run()由子类NioEventLoop实现,后面的文章再进行分析

    总结:

    EventLoopGroup bossGroup = new NioEventLoopGroup()发生了以下事情:

          1、 为NioEventLoopGroup创建数量为:处理器个数 x 2的,类型为NioEventLoop的实例。每个NioEventLoop实例 都持有一个线程,以及一个类型为LinkedBlockingQueue的任务队列

          2、线程的执行逻辑由NioEventLoop实现

          3、每个NioEventLoop实例都持有一个selector,并对selector进行优化。

    另分析一下:分析一下selector = openSelector()

    这里对sun.nio.ch.SelectorImpl中的selectedKeys和publicSelectedKeys做了优化,NioEventLoop中的变量selectedKeys的类型是SelectedSelectionKeySet,内部用两个数组存储,初始分配数组大小置为1024避免频繁扩容,当大小超过1024时,对数组进行双倍扩容。如源码所示:

    如上图添加数据,初始分配数组大小置为1024避免频繁扩容,当大小超过1024时,对数组进行双倍扩容doubleCapacityA()、doubleCapacityB()。

    reactor 线程的执行

    参考内容:

    https://www.jianshu.com/p/9acf36f7e025

    https://www.jianshu.com/p/0d0eece6d467

    NioEventLoop中维护了一个线程,线程启动时会调用NioEventLoop的run方法,执行I/O任务和非I/O任务:

    I/O任务

    即selectionKey中ready的事件,如accept、connect、read、write等,由processSelectedKeys方法触发。

    非IO任务

    添加到taskQueue中的任务,如register0、bind0等任务,由runAllTasks方法触发。

    两种任务的执行时间比由变量ioRatio控制,默认为50,则表示允许非IO任务执行的时间与IO任务的执行时间相等。

    初始化时 ioRatio为50:

    剖析一下 NioEventLoop 的run方法:

    hasTasks()方法判断当前taskQueue是否有元素。

    1、 如果taskQueue中有元素,执行 selectNow() 方法,最终执行selector.selectNow(),该方法会立即返回。

    wakenUp 表示是否应该唤醒正在阻塞的select操作

    2、 如果taskQueue没有元素,执行 select(oldWakenUp) 方法,代码如下:

    JDK NIO的BUG,例如臭名昭著的epoll bug,它会导致Selector空轮询,最终导致CPU 100%。若Selector的轮询结果为空,也没有wakeup或新消息处理,则发生空轮询,CPU使用率100%,

    bug:https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6595055

    图A

    NioEventLoop中reactor线程的select操作也是一个for循环,在for循环第一步中,如果发现当前的定时任务队列中有任务的截止事件快到了(<=0.5ms),就跳出循环。此外,跳出之前如果发现目前为止还没有进行过select操作(if (selectCnt == 0)),那么就调用一次selectNow(),该方法会立即返回,不会阻塞

    selectCnt == 0(selectCnt 用来记录selector.select方法的执行次数和标识是否执行过selector.selectNow())

    1、delayNanos(currentTimeNanos):

    计算延迟任务队列中第一个任务的到期执行时间(即最晚还能延迟多长时间执行),默认返回1s。从而生成截止时间点:selectDeadLineNanos;

    每个SingleThreadEventExecutor都持有一个延迟执行任务的优先队列PriorityQueue,名为:delayedTaskQueue,启动线程时,往队列中加入一个任务。

    延迟执行任务的优先队列PriorityQueue   启动线程时,往队列中加入一个任务  

    netty里面定时任务队列是按照延迟时间从小到大进行排序, delayNanos(currentTimeNanos)方法即取出第一个定时任务的延迟时间

    2、如果延迟任务队列中第一个任务的最晚还能延迟执行的时间小于500000纳秒,且selectCnt == 0(selectCnt 用来记录selector.select方法的执行次数和标识是否执行过selector.selectNow()),则执行selector.selectNow()方法并立即返回。

    3、否则执行selector.select(timeoutMillis),阻塞式select操作

    执行到这一步,说明netty任务队列里面队列为空,并且所有定时任务延迟时间还未到(大于0.5ms),于是,在这里进行一次阻塞select操作,截止到第一个定时任务的截止时间

    阻塞select操作结束之后,netty又做了一系列的状态判断来决定是否中断本次轮询,中断本次轮询的条件有

    轮询到IO事件 (selectedKeys != 0)

    oldWakenUp 参数为true

    任务队列里面有任务(hasTasks)

    第一个定时任务即将要被执行 (hasScheduledTasks())

    用户主动唤醒(wakenUp.get())


    如果第一个定时任务的延迟非常长,比如一个小时,那么有没有可能线程一直阻塞在select操作,当然有可能!But,只要在这段时间内,有新任务加入,该阻塞就会被释放

    外部线程调用execute方法添加任务:

    调用wakeup方法唤醒selector阻塞,可以看到,在外部线程添加任务的时候,会调用wakeup方法来唤醒 


    nio bug ,问题是围绕一个最初注册Selector的通道,因为I/O在服务器端关闭(由于早期客户端退出)。但是服务器端只有当它执行I/O(读/写),从而进入IO异常时才能知道这种通道。这种情况下,服务器端(选择器)不知道通道已经关闭(对等复位),从而出现错误操作,继续对 key(selector和channel的配对)进行空轮训,但是其相关的通道已关闭或无效。选择器会一直空轮训,从而导致cpu使用率100%。

    此处解决方式:

    包括上述描述delayNanos(currentTimeNanos)、如果延迟任务队列中第一个任务的最晚还能延迟执行的时间小于500000纳秒,且selectCnt == 0(selectCnt 用来记录selector.select方法的执行次数和标识是否执行过selector.selectNow()),则执行selector.selectNow()方法并立即返回。等,

    run方法继续执行会有以下操作:

    netty 会在每次进行 selector.select(timeoutMillis) 之前记录一下开始时间currentTimeNanos,在select之后记录一下结束时间,判断select操作是否至少持续了timeoutMillis秒。

    如果持续的时间大于等于timeoutMillis,说明就是一次有效的轮询,重置selectCnt标志,否则,表明该阻塞方法并没有阻塞这么长时间,可能触发了jdk的空轮询bug,当空轮询的次数超过一个阀值的时候,默认是512,就开始重建selector。对selector进行rebuild后,需要重新执行方法selectNow,检查是否有已ready的selectionKey。


    rebuildSelector() 

    下面我们简单描述一下netty 通过rebuildSelector来fix空轮询bug的过程,rebuildSelector的操作其实很简单:new一个新的selector,将之前注册到老的selector上的的channel重新转移到新的selector上。

    方法由下面三个图组成

    通过openSelector()方法创建一个新的selector,然后执行一个死循环,只要执行过程中出现过一次并发修改selectionKeys异常,就重新开始转移

    转移步骤为

    1. 拿到有效的key

    2. 取消该key在旧的selector上的事件注册

    3. 将该key对应的channel注册到新的selector上

    4. 重新绑定channel和新的key的关系: selector = newSelector;

    5. 将原有的selector废弃:oldSelector.close();


    Selector BUG出现的原因

    若Selector的轮询结果为空,也没有wakeup或新消息处理,则发生空轮询,CPU使用率100%,

    Netty的解决办法

    1. 对Selector的select操作周期进行统计,每完成一次空的select操作进行一次计数,

    2. 若在某个周期内连续发生N次空轮询,则触发了epoll死循环bug。

    3. 重建Selector,判断是否是其他线程发起的重建请求,若不是则将原SocketChannel从旧的Selector上去除注册,重新注册到新的Selector上,并将原来的Selector关闭。


    processSelectedKeys 

    对selector进行rebuild后,需要重新执行方法selectNow,检查是否有已ready的selectionKey。

    方法selectNow()或select(oldWakenUp)返回后,执行方法processSelectedKeys和runAllTasks。

    processSelectedKeys 用来处理有事件发生的selectkey,

    图中1处理优化过的selectedKeys,2是正常的处理

    selectedKeys 被引用过的地方

    selectedKeys是一个 SelectedSelectionKeySet 类对象,在NioEventLoop 的 openSelector 方法中创建,之后就通过反射将selectedKeys与 sun.nio.ch.SelectorImpl 中的两个field绑定;sun.nio.ch.SelectorImpl 中我们可以看到,这两个field其实是两个HashSet。

    SelectedSelectionKeySet,内部用两个数组存储,初始分配数组大小置为1024避免频繁扩容,当大小超过1024时,对数组进行双倍扩容。源码分析前文有描述。


    processSelectedKeysOptimized

    方法源码如下:两图组成:

    该过程分为以下三个步骤:

    1.取出IO事件以及对应的netty channel类

            拿到当前SelectionKey之后,将selectedKeys[i]置为null,这里简单解释一下这么做的理由:想象一下这种场景,假设一个NioEventLoop平均每次轮询出N个IO事件,高峰期轮询出3N个事件,那么selectedKeys的物理长度要大于等于3N,如果每次处理这些key,不置selectedKeys[i]为空,那么高峰期一过,这些保存在数组尾部的selectedKeys[i]对应的SelectionKey将一直无法被回收,SelectionKey对应的对象可能不大,但是要知道,它可是有attachment的,这里的attachment具体是什么下面会讲到,但是有一点我们必须清楚,attachment可能很大,这样一来,这些元素是GC root可达的,很容易造成gc不掉,内存泄漏就发生了

    2.处理该channel

    拿到对应的attachment之后,netty做了如下判断

    processSelectedKey

    1).对于boss NioEventLoop来说,轮询到的是基本上就是连接事件,后续的事情就通过他的pipeline将连接扔给一个worker NioEventLoop处理

    2).对于worker NioEventLoop来说,轮询到的基本上都是io读写事件,后续的事情就是通过他的pipeline将读取到的字节流传递给每个channelHandler来处理

    3.判断是否该再来次轮询

    netty的reactor线程经历前两个步骤,分别是抓取产生过的IO事件以及处理IO事件,每次在抓到IO事件之后,都会将 needsToSelectAgain 重置为false,那么什么时候needsToSelectAgain会重新被设置成true呢?

    needsToSelectAgain初始化都为false;needsToSelectAgain =false;

    在NioEventLoop类中,只有下面一处将needsToSelectAgain设置为true

    查看cancel方法被调用位置:

    在channel从selector上移除的时候,调用cancel函数将key取消,并且当被去掉的key到达 CLEANUP_INTERVAL 的时候,设置needsToSelectAgain为true,CLEANUP_INTERVAL默认值为256

    也就是说,对于每个NioEventLoop而言,每隔256个channel从selector上移除的时候,就标记 needsToSelectAgain 为true,我们还是跳回到上面这段代码

    每满256次,就会进入到if的代码块,首先,将selectedKeys的内部数组全部清空,方便被jvm垃圾回收,然后重新调用selectAgain重新填装一下 selectionKey

    netty这么做的目的我想应该是每隔256次channel断线,重新清理一下selectionKey,保证现存的SelectionKey及时有效

    netty的reactor线程第二步做的事情为处理IO事件,netty使用数组替换掉jdk原生的HashSet来保证IO事件的高效处理,每个SelectionKey上绑定了netty类AbstractChannel对象作为attachment,在处理每个SelectionKey的时候,就可以找到AbstractChannel,然后通过pipeline的方式将处理串行到ChannelHandler,回调到用户方法。不断地轮询是否有IO事件发生,并且在轮询的过程中不断检查是否有定时任务和普通任务,保证了netty的任务队列中的任务得到有效执行,轮询过程顺带用一个计数器避开了了jdk空轮询的bug。


    reactor线程task的调度

    runAllTasks(longtimeoutNanos);

    代码表示了尽量在一定的时间内,将所有的任务都取出来run一遍。timeoutNanos 表示该方法最多执行这么长时间,reactor线程如果在此停留的时间过长,那么将积攒许多的IO事件无法处理(见reactor线程的前面两个步骤),最终导致大量客户端请求阻塞,因此,默认情况下,netty将控制内部队列的执行时间

    被调用情况:

    NioEventLoop中run方法调用:

    分析源码runAllTasks:


    从scheduledTaskQueue中的任务delayedTask转移定时任务到taskQueue(mpsc queue);

    从scheduledTaskQueue从拉取一个定时任务。首先分析fetchFromDelayedQueue()方法,由父类SingleThreadEventExecutor实现

    功能是将延迟任务队列(delayedTaskQueue)中已经超过延迟执行时间的任务迁移到非IO任务队列(taskQueue)中.然后依次从taskQueue取出任务执行,每执行64个任务,就进行耗时检查,如果已执行时间超过预先设定的执行时间,则停止执行非IO任务,避免非IO任务太多,影响IO任务的执行。

    nanoTime()=System.nanoTime() -START_TIME;

    循环执行任务

    下面为执行任务的核心:

    将已运行任务runTasks加一,每隔0x3F任务,即每执行完64个任务之后,判断当前时间是否超过本次reactor任务循环的截止时间了,如果超过,那就break掉,如果没有超过,那就继续执行。可以看到,netty对性能的优化考虑地相当的周到,假设netty任务队列里面如果有海量小任务,如果每次都要执行完任务都要判断一下是否到截止时间,那么效率是比较低下的

    总结:

    当前reactor线程调用当前eventLoop执行任务,直接执行,否则,添加到任务队列稍后执行

    netty内部的任务分为普通任务和定时任务,分别落地到MpscQueue和PriorityQueue

    netty每次执行任务循环之前,会将已经到期的定时任务从PriorityQueue转移到MpscQueue

    netty每隔64个任务检查一下是否该退出任务循环

    参考:

    https://www.jianshu.com/p/58fad8e42379

    总结:NioEventLoop实现的线程执行逻辑做了以下事情

    先后执行IO任务和非IO任务,两类任务的执行时间比由变量ioRatio控制,默认是非IO任务允许执行和IO任务相同的时间

    如果taskQueue存在非IO任务,或者delayedTaskQueue存在已经超时的任务,则执行非阻塞的selectNow()方法,否则执行阻塞的select(time)方法

    如果阻塞的select(time)方法立即返回0的次数超过某个值(默认为512次),说明触发了epoll的cpu 100% bug,通过对selector进行rebuild解决:即重新创建一个selector,然后将原来的selector中已注册的所有channel重新注册到新的selector中,并将老的selectionKey全部cancel掉,最后将老的selector关闭

    如果select的结果不为0,则依次处理每个ready的selectionKey,根据readyOps的值,进行不同的分发处理,譬如accept、read、write、connect等

    执行完IO任务后,再执行非IO任务,其中会将delayedTaskQueue已超时的任务加入到taskQueue中。每执行64个任务,就进行耗时检查,如果已执行时间超过通过ioRatio和之前执行IO任务的耗时计算出来的非IO任务预计执行时间,则停止执行剩下的非IO任务

    相关文章

      网友评论

        本文标题:Netty源码学习(3)--Reactor模式

        本文链接:https://www.haomeiwen.com/subject/ehdgnxtx.html