EventLoop的类层图
我们来简单讨论一下Netty线程模型的源码。学习一下EventLoopGroup的原理。首先看一下EventLoop的类层结构图:
data:image/s3,"s3://crabby-images/20d14/20d1459f3c8a15dba40021534de51b2b62da1849" alt=""
上面是一个简化的版本,完整的是非常复杂的。从整体上来看,netty针对EventLoop定义了两种类型的接口,一个是EventExecutor和EventExecutorGroup,另一个是EventLoop和EventLoopGroup。
通过这些接口,可以总结出两大功能,第一个就是它是一个事件执行器(EventExecutor),它能够执行各种事件,也能够向它提交事件,它能够执行。第二它是一个EventLoop,能够向它注册handler,进行循环处理。相当于nio中的selector加上循环操作。
对于每个NioEventLoop,它继承了SingleThreadEventLoop,同时SingleThreadEventLoop继承了SingleThreadEventExecutor。
对于每个NioEventLoopGroup,它继承了MultithreadEventLoopGroup,同时MultithreadEventLoopGroup继承了MultithreadEventExecutorGroup。
EventExecutor结构
EventExecutor是一个事件执行器,提供了很多执行任务的方法:
data:image/s3,"s3://crabby-images/17c15/17c15789ebbb29e6134e47ab92f6ced07ec81805" alt=""
比如submit方法,这些接口都继承了ScheduleExecutorService接口,因此它们能够执行定时任务,能够提交任务,还包括schedule方法,其中最重要的方法是next,上面是EventExecutorLoop里面的一些方法。
下面的EventExecutor里面的方法也是一样,next返回自己的类型,表示下一个执行,parent()方法表示它是属于哪一个EventExecutorGroup,因为每个EventExecutor都属于一个EventExecutorGroup,parent()方法能判断它属于哪个组。inEventLoop判断当前方法是不是在当前线程里面执行,还有其他方法等等。
EventExecutor和EventExecutorGroup可以总结如下:
1.EventExecutorGroup里面有一个EventExecutor数组,保存了多个EventExecutor;
2.EventExecutorGroup是不干什么事情的,当收到一个请后,他就调用next()获得一个它里面的EventExecutor,再调用这个executor的方法;
3.EventExecutorChooser.next()定义选择下一个EventExecutor的策略;
我们来看一下选择下一个EventExecutor的策略的具体实现,
data:image/s3,"s3://crabby-images/b0de3/b0de3dc10a8d6705305e5752c3f3cd473b4b60fa" alt=""
查看DefaultEventExecutorChooserFactory的源码,
data:image/s3,"s3://crabby-images/6c4dd/6c4dd1bc91580c1abe4c8e6da84413a2c4b5fc50" alt=""
算法很简单,它有两中选择方式,一种是PowerOfTwoEventExecutorChooser,另一种是GenericEventExecutorChooser。它们的算法很简单,其实就是轮询(round robin),如果是2的指数,就用位来做(PowerOfTwoEventExecutorChooser),如果不是2的指数就取模(GenericEventExecutorChooser),这两种效果是一模一样的。
再来看一下具体的内容,来看MultithreadEventExecutorGroup类方法:
data:image/s3,"s3://crabby-images/7bb62/7bb6210d25760bfa7cc387e111eb92a46d7c3954" alt=""
其中有个children属性,就是一个EventExecutor[]数组,有个chooser属性,就是EventExecutorChooser选择器,所有的方法都是通过next派发给SingleThreadEventExecutor去处理:
data:image/s3,"s3://crabby-images/2776a/2776ab5f7c0ca10957d692f3560aaf396429b0df" alt=""
就是在这里具体处理事情。它里面实现的一个重要的execute(Runnable )方法,就是做线程提交的,一定要关注这个方法。所有的方法都派发给SingleThreadEventExecutor后,它自己也不干,它交给它里面的Executor属性去做,具体的实现一般是ThreadPerTaskExecutor:
data:image/s3,"s3://crabby-images/f4e29/f4e29d46475c1d17de2f8fa717a0f9748b8e9ce4" alt=""
上面可以理解为都是中介,真正干活的就是ThreadPerTaskExecutor。
EventLoop结构
前面的EventExecutor就是执行,EventLoop主要是注册:
data:image/s3,"s3://crabby-images/cae98/cae98043ac08d58c14adb197bd305cea32629ef8" alt=""
EventLoopGroup也是啥都不干,让下面的SingleThreadEventLoop来干。
EventLoop除了注册事件,还包括事件循环,因为netty有很多EventLoop和EventLoopGroup,比如nio对应的是nio的一套,bio对应的是bio的一套,其它非nio的也各自有一套,我们这里只讨论nio这一套。它的事件循环肯定是放到具体的run方法里面的:
data:image/s3,"s3://crabby-images/1f599/1f5990adfe29ffa63d1d6fe0a38ece2d7b0db443" alt=""
NioEventLoop中的run方法实现的是SingleThreadEventExecutor中的run方法,每个EventLoopGroup都包括一系列的EventExecutor,看下图:
data:image/s3,"s3://crabby-images/544dc/544dcc7857b8f8cc8c8583662b023ed13f23edee" alt=""
上图左侧表示一个EventLoopGroup,同样包含了一系列的EventExecutor,刚才在源码中看到,它是一个数组,通过EventExecutorChooser进行选择,每一个都指向一个具体的NioEventLoop。对于每个NioEventLoop,它内部有Selector,有很多通道注册,同时还有个事件循环去处理这些事情(上图右上侧部分)。对于每个通道,它都有一个ChannelPipeline(上图右下侧),里面有个handler链,它默认有两个,一个是头(HeaderContext),一个是尾(TailContext),这是netty自己加的,中间我们可以插入自己的处理器。
上面可以理解为我们平时用的workerGroup的结构,对于bossGroup,大致上和workderGroup差不多,只不过注册的时候,只有一个接收事件:
data:image/s3,"s3://crabby-images/0d393/0d39326e396f7877171455c8f4512854d2e9028f" alt=""
上面Selector对应的只有一个NioServerSocketChannel,里面也是一个事件循环,处理接收事件,它的ChannelPipeline也大致上一样,只不过最尾部的一个东西,肯定是ServerBootstrapAccept,专门负责accept事件。
NioEventLoop创建分析
上面了解了NioEventLoop的类结构,下面来了解一下它的创建过程。我们使用debug的形式查看创建过程,首先在 new 地方打一个断点:
data:image/s3,"s3://crabby-images/15e74/15e74ed98a116fb8fc0875095ccc6de6bb24c17d" alt=""
然后debug启动项目,首先进入的是NioEventLoopGroup中的方法:
1、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int)
data:image/s3,"s3://crabby-images/bcada/bcada7e41e8cf2bbfb676f1496aab394d7cb0842" alt=""
我们在workerGroup处打断点发现,最终进入的也是这个方法,所以在两个地方打断点都可以。这个方法除了传入一个线程数量,还传入了一个Executor对象,前面讨论过,Executor就是最终干活的对象,最终实现是ThreadPerTaskExecutor,这里传的是null,说明我们是不需要传入的,由netty自己创建即可,我们继续往下走:
1、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int)
2、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor)
data:image/s3,"s3://crabby-images/00eac/00eac5eb8361beddb1b411e3bb85b1c81f7a4290" alt=""
这个方法中又多了一个参数,SelectorProvider定义了创建selector、ServerSocketChannel、SocketChannel等方法,采用Java的 Service Provider Interface (SPI) 方式实现,
data:image/s3,"s3://crabby-images/22a03/22a0386b6a3cda836af190713f02ecb59f1ce36e" alt=""
从代码可以知道,创建过程为
如果provider已经创建,直接返回;
如果定义了java.nio.channels.spi.SelectorProvider属性,则采用该属性定义的类创建SelectorProvider并返回;如果失败,则继续;
采用SPI方法创建SelectorProvider并返回;如果实现,则通过DefaultSelectorProvider创建;
我们继续往下走,
1、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int)
2、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor)
3、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor, java.nio.channels.spi.SelectorProvider)
data:image/s3,"s3://crabby-images/6bda7/6bda71deba0a985331c483c1f35388dcc016aa3f" alt=""
这里多了一个参数,继续往下走:
1、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int)
2、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor)
3、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor, java.nio.channels.spi.SelectorProvider)
4、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor, java.nio.channels.spi.SelectorProvider, io.netty.channel.SelectStrategyFactory)
data:image/s3,"s3://crabby-images/d50c5/d50c575dca431d77b375fe31a585653d82f57d55" alt=""
这里多了一个拒绝处理参数,继续往下走:
1、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int)
2、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor)
3、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor, java.nio.channels.spi.SelectorProvider)
4、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor, java.nio.channels.spi.SelectorProvider, io.netty.channel.SelectStrategyFactory)
5、 io.netty.channel.MultithreadEventLoopGroup#MultithreadEventLoopGroup(int, java.util.concurrent.Executor, java.lang.Object...)
data:image/s3,"s3://crabby-images/af075/af075b8c4636a90f99b87c51ff2781ed76cc0fb6" alt=""
从这里可以看到,线程数如果没有定义会使用默认的数值,默认的值就是系统cup*2:
data:image/s3,"s3://crabby-images/78443/784433004175e7bbfbe2279a8a9392ca46911cc2" alt=""
继续往下走,
1、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int)
2、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor)
3、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor, java.nio.channels.spi.SelectorProvider)
4、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor, java.nio.channels.spi.SelectorProvider, io.netty.channel.SelectStrategyFactory)
5、 io.netty.channel.MultithreadEventLoopGroup#MultithreadEventLoopGroup(int, java.util.concurrent.Executor, java.lang.Object...)
6、 io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, java.lang.Object...)
data:image/s3,"s3://crabby-images/4d32b/4d32ba7e44e2aa7ab2d9d7bc4d3e7708d6e1434d" alt=""
这里增加了一个我们上面讨论过的参数类型,DefaultEventExecutorChooserFactory,也就是EventExecutor的选择器,继续往下走:
1、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int)
2、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor)
3、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor, java.nio.channels.spi.SelectorProvider)
4、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor, java.nio.channels.spi.SelectorProvider, io.netty.channel.SelectStrategyFactory)
5、 io.netty.channel.MultithreadEventLoopGroup#MultithreadEventLoopGroup(int, java.util.concurrent.Executor, java.lang.Object...)
6、 io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, java.lang.Object...)
7、 io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)
data:image/s3,"s3://crabby-images/7a7de/7a7de89174b8f665a767436a8f86f204502ada2a" alt=""
通过上面七步,我们组合齐了创建需要的所有参数,进入了这里,这里是真正创建开始的地方,从代码中看到,前面两个判断,一个是线程数量,另一个是对Executor的创建,使用的是ThreadPerTaskExecutor,
data:image/s3,"s3://crabby-images/94722/94722c70aea7bb86b1743c49f3143e611f480eda" alt=""
里面的execute方法很简单,每次执行的时候都会创建一个新的线程执行命令。
下面的children,就是我们前面提到的MultithreadEventExecutorGroup中的EventExecutor数组,我们指定了多少个线程数,就会创建多少个EventExecutor,来看一下每个EventExecutor的创建过程:
data:image/s3,"s3://crabby-images/7bba2/7bba241f1ab671384a9c8dbb38737779e8292ed5" alt=""
创建的时候调用的是new Child去创建,这个方法真正实现的地方是:
io.netty.channel.nio.NioEventLoopGroup#newChild
data:image/s3,"s3://crabby-images/f9c76/f9c769d39cca145d732c409412a2385bd132d9d1" alt=""
里面new 了一个NioEventLoop,具体的内容如下:
data:image/s3,"s3://crabby-images/f8493/f8493f2ae5b3242d6f304da7e11b2a92067d5a1c" alt=""
我们看到里面创建了一个selector,通过这样children数组就new 完了。
接下来就是创建chooser,作用是选择下一个EventExecutor,前面介绍过根据是否是2的指数的倍数进行选择,这里的断点打在了bossGroup上面,只创建了一个,因此是2的0次方,我们通过调试可以看到它选择的策略:
data:image/s3,"s3://crabby-images/1bda4/1bda45a1aeb93f02318be478ab5675c8456144bd" alt=""
最下面的几行是对children的一些操作,这样整个new过程就结束了。我们再来回顾这张图:
data:image/s3,"s3://crabby-images/ad375/ad3753bc237f82fabebceab2df76f60e18a3941f" alt=""
创建的过程,EventExecutor数组创建好了,每个NioEventLoop的selector也创建了,但是没有看到事件循环线程启动,也没有看到事件注册等内容。所以在这个阶段,无论是boss还是worker都是一样的。从上面可以看出,创建流程是比较简单的。
NioEventLoop启动流程分析
netty是用ServerBootstrap进行启动,启动的代码是这一行:
1、 ChannelFuture f = server.bind(port).sync()
我们可以在这里打断点进行启动流程分析:
data:image/s3,"s3://crabby-images/2daab/2daab50e5045ebb4c13aa99c6fbd3a2a7324fd6e" alt=""
这里一种调用了两个方法,一个是bind方法,一个是sync方法,我们先从bind开始。
1、 ChannelFuture f = server.bind(port).sync()
2、 io.netty.bootstrap.AbstractBootstrap#bind(int)
data:image/s3,"s3://crabby-images/18195/18195b2bcadee2adb93bae7bbe081f30f65c8c31" alt=""
继续往下:
1、 ChannelFuture f = server.bind(port).sync()
2、 io.netty.bootstrap.AbstractBootstrap#bind(int)
3、 io.netty.bootstrap.AbstractBootstrap#bind(java.net.SocketAddress)
data:image/s3,"s3://crabby-images/57107/57107e65a116901c9c4bed4c9fd3daaaabcf9788" alt=""
validate()主要是验证是否为空的:
data:image/s3,"s3://crabby-images/bcda5/bcda517a40700eb5f223e9f9f565f57e38035e2a" alt=""
继续往下走:
1、 ChannelFuture f = server.bind(port).sync()
2、 io.netty.bootstrap.AbstractBootstrap#bind(int)
3、 io.netty.bootstrap.AbstractBootstrap#bind(java.net.SocketAddress)
4、 io.netty.bootstrap.AbstractBootstrap#doBind
data:image/s3,"s3://crabby-images/90b24/90b24a926494e9c7d7c6161ac560d4c79bce01c9" alt=""
这个方法内容比较多,来看第一行代码,执行了一个initAndRegister方法,回想我们自己写nio程序的时候,启动第一件事是把ServerSocketChannel初始化并注册到selector中,initAndRegister方法中也是做了同样的事情:
data:image/s3,"s3://crabby-images/0a9a5/0a9a5130881ed4e80fb64f576e1685a4b578f79b" alt=""
来看这个方法,首先通过 channel =channelFactory.newChannel(); 创建一个channel,我们配置ServerBootstrap的时候,channel传入的是 NioServerSocketChannel,这里通过一个工厂创建它的实例,当然这里用的是反射技术。具体实现代码如下:
io.netty.channel.ReflectiveChannelFactory#newChannel
data:image/s3,"s3://crabby-images/8b7cd/8b7cdd1b6b6c5fba88b81425a233a4bc41a3732c" alt=""
反射执行的时候,最终会走到下面的方法:
io.netty.channel.AbstractChannel#AbstractChannel(io.netty.channel.Channel)
data:image/s3,"s3://crabby-images/f5b3d/f5b3d70b03d0897964558bee7f8a987307a4b68b" alt=""
对于每个channel,都有这些属性,parent,id(自动生成的),unsafe,另外一个非常重要的就是pipeline属性,就是前面我们说的ChannelPipeline。刚开始创建的时候,啥都没有,就有默认的HeaderContext和TailContext:
io.netty.channel.DefaultChannelPipeline#DefaultChannelPipeline
data:image/s3,"s3://crabby-images/b7212/b72121f53e9f42351b9fba027fb5d959e97201cf" alt=""
接下来回到initAndRegister方法,创建完了,我们来看init方法,
io.netty.bootstrap.ServerBootstrap#init
data:image/s3,"s3://crabby-images/9c84e/9c84e6fe3e7e1c6dd9b7981133f0840954d6158c" alt=""
前面是设置options,设置属性的一些内容,接下来, ChannelPipeline p = channel.pipeline(); 就是把pipeline拿出来,然后把childGroup和childHandler都拿出来,接下来是往pipeline里面加东西,
data:image/s3,"s3://crabby-images/335df/335df86677a81f41fd60ca2b66b0e207c0733757" alt=""
就是把我们在ServerBootstrap中配置的handler加进去。这里面先配置了ChannelPipeline,接下来就是ServerBootstrapAcceptor,这里面会把childGroup和childHandler都传进去,之所以会传这两个,是因为这里的ServerBootstrapAcceptor是要把接收到的NioSocketChannel的连接注册上去,所以这里需要有childGroup,而对于每个NioSocketChannel中的pipeline里面有什么东西,这里需要childHandler来看的。做完这些事情,init方法就完成了。
再次回到initAndRegister方法,前面的newChannel和init方法都完成了,下面是注册方法:
ChannelFuture regFuture = config().group().register(channel);
这里先调用config和group方法,返回的就是我们配置的bossGroup,这里如果bossGroup配置多个,也会从中选一个,所以配置多个也是没有用处的(具体理由参考前面一篇文章)。对于boss来说,这里只会初始化注册一次,选择一个。获取group后,调用的是register方法,最终到的方法是:
io.netty.channel.AbstractChannel.AbstractUnsafe#register
data:image/s3,"s3://crabby-images/d32fe/d32feb7a8dd9c5fc057c6a89453260070db53d79" alt=""
网友评论