本文是Netty文集中“Netty 源码解析”系列的文章。主要对Netty的重要流程以及类进行源码解析,以使得我们更好的去使用Netty。Netty是一个非常优秀的网络框架,对其源码解读的过程也是不断学习的过程。
本文接Netty 源码解析 ——— 服务端启动流程 (上)继续解析Netty服务器启动流程的剩下步骤
重要类介绍
在讲解源码之前,同样我们先对该篇文字这涉及到的几个Netty中的重要的类进行简单的介绍。
ChannelFuture
ChannelFuture是针对于Channel的一个Future对象。表示一个异步的Channel I/O操作结果。
Netty中Channel的所有的I/O操作都是异步的。这意味着任何I/O的调用都会立即的返回,并且它不保证所请求的I/O操作在调用之后会完成。相反,你会得到一个ChannelFuture的实例,它会向你返回一些关于I/O操作的结果或者状态的一些信息。
一个ChannelFuture要么是uncompleted的,要么是completed的。
ChannelFuture提供了各种方法来让你去检测I/O操作是否已经完成,如果未完成可等待完成,并且得到I/O操作的结果。它还可以添加ChannelFutureListener,这样当I/O操作完成后该listener就可以得到通知。
使用addListener(GenericFutureListener)比使用await()更好。因为await()方法的调用在某些特定的环境下可能会导致死锁。
GenericFutureListener:
监听一个Future的执行结果。异步操作的结果会通知给listener,一旦这个listener通过调用Future.addListener(...)方法添加进去之后。
换句话说,我们可以往Future对象注册listener实例,当future里面的操作完成了之后,我们所注册的listener实例对象就会收到相应的通知。
GenericFutureListener的operationComplete方法:
void operationComplete(F future) throws Exception:当与这个Future相关的操作完成的时候,该方法会得到调用。
ChannelFutureListener:
ChannelFutureListener继承了GenericFutureListener类。一个针对ChannelFuture结果的监听器。ChannelFutureListener 将 future 和 callback 组合起来了。
Futures vs ChannelFuture
Future提供了另一个方式去通知应用当一个操作已经完成。Future对象作为一个异步操作结果的占位符返回,异步操作将在未来的某个时刻完成并提供一个可访问的结果。
JDK提供了 interface java.util.concurrent.Future,但JDK对Future的实现需要你手动检查是否一个操作已经完成或者堵塞直到操作完成。这是很笨重的,所以Netty提供了它自己的实现 ———— ChannelFuture,用于一个异步操作执行时。
ChannelFuture提供了一个附加的方法,这个方法允许你注册一个或多个ChannelFutureListenner实例。“operationComplete()”在操作完成时会被回调。监听者能够确定操作是否成功或失败。如果失败了,我们能够恢复错误。简而言之,ChannelFutureListener的通知机制消除了手动检查操作完成的需要。
AdaptiveRecvByteBufAllocator
RecvByteBufAllocator:
分配一个新的接受缓存,该缓存的容量会尽可能的足够大以读入所有的入站数据并且该缓存的容量也尽可能的小以不会浪费它的空间。
AdaptiveRecvByteBufAllocator:
RecvByteBufAllocator会根据反馈自动的增加和减少可预测的buffer的大小。
它会逐渐地增加期望的可读到的字节数如果之前的读取已经完全填充满了分配好的buffer( 也就是,上一次的读取操作,已经完全填满了已经分配好的buffer,那么它就会很优雅的自动的去增加可读的字节数量,也就是自动的增加缓冲区的大小 )。它也会逐渐的减少期望的可读的字节数如果连续两次读操作都没有填充满分配的buffer。否则,它会保持相同的预测。
① 依次往sizeTable添加元素:[16 , (512-16)]之间16的倍数。即,16、32、48...496
② 然后再往sizeTable中添加元素:[512 , 512 * (2^N)),N > 1; 直到数值超过Integer的限制(2^31 - 1);
③ 根据sizeTable长度构建一个静态成员常量数组SIZE_TABLE,并将sizeTable中的元素赋值给SIZE_TABLE数组。注意List是有序的,所以是根据插入元素的顺序依次的赋值给SIZE_TABLE,SIZE_TABLE从下标0开始。
SIZE_TABLE为预定义好的以从小到大的顺序设定的可分配缓冲区的大小值的数组。因为AdaptiveRecvByteBufAllocator作用是可自动适配每次读事件使用的buffer的大小。这样当需要对buffer大小做调整时,只要根据一定逻辑从SIZE_TABLE中取出值,然后根据该值创建新buffer即可。
HandleImpl是AdaptiveRecvByteBufAllocator一个内部类,该处理器类用于提供真实的操作并保留预测最佳缓冲区容量所需的内部信息。
guess()方法用于返回预测的应该创建的buffer容量大小。
每次读取完信息后可调用readComplete()方法来根据本次的读取数据大小以对下一次读操作是应该创建多大容量的buffer做调整。
allocate()方法会返回一个新的buffer用于接受读数据,该buffer的容量会尽可能足够大去读取所有的入站数据并且也会尽可能足够小以至于不会浪费容量。同时还会根据操作系统、平台以及系统参数的设置进行内存的分配(即,可能是堆外内存也可能是堆内内存)
总的来说,AdaptiveRecvByteBufAllocator目前我们只需要知道
① 它可以自动调节每次读操作所需buffer的容量大小。这需要我们在读操作的时候调用AdaptiveRecvByteBufAllocator类的相关方法来收集相关的信息。
② 容量调整逻辑是:a) 如果最后一次读操作读取到的数据大等于分配的buffer的容量大小,则扩大buffer容量的值。再下一次读操作时,就会根据这个扩大的容量值来创建新的buffer来接收数据;b) 如果连续两次读操作读取到的数据都比创建的buffer的容量小的话,则对buffer的容量进行缩减。那么在下一次读操作时就根据这个新的容量大小值进行新的buffer的创建来获取数据;c) 否则,则保持buffer的容量大小不变。
③ 可以通过allocate()方法获取由AdaptiveRecvByteBufAllocator衡量后得到的最理想容量大小的buffer。并且会根据操作系统、平台以及系统参数的设置进行内存的分配(即,可能是堆外内存也可能是对内内存)。
ChannelPipeline
ChannelPipeline是一个ChannelHandler的集合,这些ChannelHandler会处理或拦截一个Channel的入站事件或出站操作。
每一个新的Channel被创建时都会分配一个新的ChannelPipeline。他们的关系是永久不变的,Channel既不能依附于其他ChannelPipeline也不能和当前ChannelPipeline分离。
ChannelPipeline根本上是一系列的ChannelHandlers。ChannelPipeline还提供了方法用于传播事件通过ChannelPipeline本身。
ChannelPipeline是一个实现了拦截过滤器模式的高级形式,它使得用户能够完全控制事件的处理方式以及ChannelPipeline中的ChannelHandler间的交互。
👇的图解描述了ChannelPipeline中的ChannelHandlers是如何处理I/O事件的
ChannelHandler在ChannelPipeline中的顺序是由我们通过ChannelPipeline.add*()方法来确定的。
还有很重要的一点是:ChannelHandlerContext有许多的方法,有些方法也出现在了Channel和ChannelPipeline类中,但是它们有着很重要的不同处。如果你通过一个Channel实例或ChannelPipeline实例调用这些方法,它们将会传播通过整个管道。但相同的方法通过ChannelHandlerContext被调用时,它将从当前关联的ChannelHandler开始并传播给管道中下一个能够处理该事件的ChannelHandler。也就是说,ChannelPipeline和Channel中调用的方法都会通过整个管道;而ChannelHandlerContext调用的方法会从当前ChannelHandler对应方向的下一个ChannelHandler开始执行( 当前ChannelHandler也不会处理该事件 )。
源码解析
ChannelFuture channelFuture = serverBootstrap.bind(5566)
创建一个新的Channel并且绑定到这个端口号上。
① Channel的初始化操作,并且构建了该Channel的ChannelPipeline,然后将该Channel(即,NioServerSocketChannel)注册到EventLoopGroup(即,parentGroup)中的某个EventLoop(即,NioEventLoop)上。
② 在NioServerSocketChannel注册到NioEventLoop成功完成后,将ServerSocketChannel绑定到本地指定端口上。
首先我们对initAndRegister()进行详细的分析
①channel = channelFactory.newChannel():通过channelFactory构建一个Channel实例。
我们在Netty 源码解析 ——— 服务端启动流程 (上)已经分析过,这里的channelFactory实际指向了一个ReflectiveChannelFactory实例,而该实例则会构造一个我们通过ServerBootstrap.channel(class)方法设置的Channel类型的实例。也就是,我们最终会通过ReflectiveChannelFactory实例来构建一个NioServerSocketChannel的实例。
这里ReflectiveChannelFactory是通过反射的机制来去实现NioServerSocketChannel的构建的。
NioServerSocketChannel的构建主要包括:
a) NIO中的ServerSocketChannel创建
b) 相关属性的设置
c) NioServerSocketChannelConfig的构建
a) NIO中的ServerSocketChannel创建
👆这实际上Netty对ServerSocketChannel和SocketChannel(是的,SocketChannel的构建过程也有类似的方法)构建过程的一个优化,详细的说明可以见文章末尾的“附”。
b) 相关属性的设置
构建了Unsafe实例,这里是NioMessageUnsafe对象,该类用于完成Channel真实的I/O操作和传输。
创建了该Channel的ChannelPipeline实例(即,DefaultChannelPipeline)。每一个Channel都有它自己的pipeline。
将a)构建好的ServerSocketChannel实例赋值给ch成员变量
SelectionKey.OP_ACCEPT赋值给成员变量readInterestOp
设置ServerSocketChannel.configureBlocking(false);配置ServerSocketChannel为非阻塞模式,这步很重要。因为只有非阻塞模式Channel才能使用NIO的Selector来实现非阻塞的I/O操作。
这里ChannelPipeline的构建是很重要的一步。每一个新的Channel被创建时都会分配一个新的ChannelPipeline。ChannelPipeline本质上就是一系列的ChannelHandlers。ChannelPipeline还提供了方法用于传播事件通过ChannelPipeline本身。
ChannelPipeline通过两个AbstractChannelHandlerContext对象本身(head、tail)来维护一个双向链表。
b) 根据channel构建SucceededChannelFuture、VoidChannelPromise成员变量
c) 构建ChannelHandler链表头HeadContext和链表尾TailContext,并赋值给成员变量head、tail。将head.next指向tail,将tail.prev指向head。
HeadConext作为链表头,它同时实现了ChannelOutboundHandler和ChannelInboundHandler,也就是说它即会处理入站数据也会处理出站数据、它持有NioMessageUnsafe对象,该类用于完成Channel真实的I/O操作和传输。
TailContext作为ChannelHandler链中的最后一个ChannelHandler,它仅实现了ChannelInboundHandler,因此TailContext是入站事件的最后一个ChannelHandler,它主要完成了一些资源的释放工作。
入站事件会依次被从head ——> ... ——> tail中的所有ChannelInboundHandler处理。
出站事件会依次被从tail ——> ... ——> head中的所有ChannelOutboundHandler处理。
注意,我们程序中通过add*(...)加进来的ChannelHandler都会处于head和tail之间,也就是说链表头是HeadConext,链表尾是TailContext,这是固定不会改变的。
c) NioServerSocketChannelConfig的构建
构建一个AdaptiveRecvByteBufAllocator实例。将构建好的AdaptiveRecvByteBufAllocator实例赋值给rcvBufAllocator成员变量。
将当前的NioServerSocketChannel赋值给成员变量channel
将ServerSocketChannel所关联的ServerSocket赋值给成员变量javaSocket
② init(channel):初始化NioServerSocketChannel。
a) 完成options和attributes相关的设定;这里设定的是NioServerSocketChannel的options和attributes的相关配置。即,我们程序中通过serverBootstrap.option(...)和serverBootstrap.attr(...)设置的配置项。b) p.addLast(new ChannelInitializer<Channel>(){…}):
a) 主要完成了将ChannelInitializer封装成一个DefaultChannelHandlerContext对象加入到ChannelPipeline中的ChannelHandler链表的尾部(即,tail前),此时ChannelPipeline中的ChannelHandler链表为:head ——> 封装了ChannelInitializer的DefaultChannelHandlerContext ——> tail。
b) 因为此时还没完成channel的注册到EventLoop的操作,所以registered成员变量属性为false。如果registered为false,意味着channel还没有被注册到EventLoop中。在这种情况下,我们会将上面封装有ChannelInitializer的context给添加到ChannelPipeline中,同时会添加一个任务,这个任务会调用ChannelHandler.handlerAdded方法,当channel完成注册的时候。
registered表示,一旦Channel被注册到EventLoop上,该成员变量就会被设置为true。并且该成员变量一旦设置为true后就不会再被改变了。
c) newCtx.setAddPending():并使用CAS的方式修改成员变量handlerState的值为ADD_PENDING。 ADD_PENDING状态表示ChannelHandler.handlerAdded方法即将被调用,但当前并不会被调用。
d) callHandlerCallbackLater(newCtx, true):因为此时channel还没有注册到Eventloop中,所以这里会将ChannelInitializer封装为PendingHandlerAddedTask任务,并赋值给成员变量pendingHandlerCallbackHead,它会在channel注册到EventLoop时得以调用,也就是ChannelInitializer的handlerAdded方法会在NioServerSocketChannel注册到NioEventLoop时被调用。
注意,到此为止ChannelInitializer中的initChannel并没有被回调,它是在NioServerSocketChannel注册到EventLoop时才会通过回调handlerAdded来得以调用。下面我们先看看handlerAdded方法的具体实现。 ChannelInitializer的handlerAdded方法👇
initChannel((C) ctx.channel()):调用我们程序实现的initChannel方法。
remove(ctx):将当前的ChannelInitializer从ChannelPipeline中移除。同时会回调ChannelInitializer的handlerRemoved方法,并将ChannelInitializer关联的ChannelHandlerContext的状态置为REMOVE_COMPLETE。REMOVE_COMPLETE状态表示ChannelHandler已经从ChannelPipeline中移除,并且ChannelHandler.handlerRemoved方法已经被调用。
a) 将我们程序设定的handler加到ChannelPipeline中ChannelHandler链的尾部(即,tail之前),这里handler就是我们程序中serverBootstrap.handler(…)中设定的ChannelHandler。。比如,serverBootstrap.handler(new LoggingHandler(LogLevel.INFO))。
b) 往NioServerSocketChannel所关联的EventLoop中添加个task,该task完成将ServerBootstrapAcceptor加入到NioServerSocketChannel所关联的ChannelPipeline中。而ServerBootstrapAcceptor主要用于完成将NioServerSocketChannel所接收到的连接请求NioSocketChannel给注册到childGroup中(即,workerGroup)。
ServerBootstrapAcceptor的read方法
ServerBootstrapAcceptor的channelRead方法是一个很重要的方法,该方法完成了:当NioServerSocketChannel接收了新的客户端连接请求后,将NioSocketChannel注册到childGroup中,而且在注册前会设定好NioSocketChannel的ChannelPipeline中的ChannelHandler链(即,我们程序serverBootstrap.childHandler(…)所设置的值),以及NioSocketChannel的options和attr属性配置(即,我们程序serverBootstrap.childOption(...)、serverBootstrap.childAttr(...)所设置的值)
注意,再次强调,到目前为止上面ChannelInitializer的initChannel以及remove方法都还没得到调用,它们是在NioServerSocketChannel注册到EventLoop时才会得到调用的。
③ChannelFuture regFuture = config().group().register(channel);
config():方法返回一个ServerBootstrapConfig实例。👆可见ServerBootstrapConfig中持有了当前的ServerBootstrap的实例。
最后我们来看register()方法:
继续跟进next()会到👇的流程 super.next()即是调用MultithreadEventExecutorGroup.next() 而这里的chooser,是通过构造方法完成的赋值,这块我们已经在Netty 源码解析 ——— 服务端启动流程 (上)中做了具体的说明chooser = chooserFactory.newChooser(children);
chooserFactory.newChooser(children):会返回一个PowerOfTwoEventExecutorChooser或者GenericEventExecutorChooser的实例对象,根据NioEventLoopGroup中NioEventLoop的个数而定(即,children数组的个数),如果该NioEventLoopGroup的NioEventLoop个数为2的幂次方个,则chooser是一个PowerOfTwoEventExecutorChooser实例;否则,chooser是一个GenericEventExecutorChooser实例。PowerOfTwoEventExecutorChooser和GenericEventExecutorChooser都是通过简单轮询的方式选择下一个EventExecutor。
所以next()会返回一个EventExecutor并向下类型转换为EventLoop对象,这里实际上就是一个NioEventLoop对象。我们接着看NioEventLoop的register()方法又做了什么:
如前面所说,这里的unsafe()方法会返回一个NioMessageUnsafe实例,用于完成Channel真实的I/O操作和传输。Netty 4 中一个Channel在其整个生命周期里只会注册到一个EventLoop上,这样,关于这个Channel的I/O操作都会在这个EventLoop所在的线程上去执行(这么设计是为了防止产生多线程的并发)。确保了,Channel以及Channel上一系列的ChannelHandler里面的回调方法 都会在EventLoop所在的线程里执行。
而这里的register0操作就属于Channel的I/O操作,因此在调用register0操作前,需要判断当前的线程是否是EventLoop所在线程,如果不是的话,就将register0操作封装成一个task提交至EventLoop的taskQueue中,EventLoop的事件循环会从taskQueue中将task取出,然后在EventLoop线程上执行该任务。
eventLoop.inEventLoop():该方法用于判断执行当前方法的线程是否是EventLoop所在的线程。如果是则返回true,否则返回false。
因为调用serverBootstrap.bind(5566)方法的调用者线程并不是EventLoop所在的线程(也就是说,当前将要执行register0(promise)操作的线程不是EventLoop所在的线程),所以会将register操作封装为一个task放入EventLoop的taskQueue中,这些任务会在EventLoop的事件循环中得到执行。
同时在Netty 源码解析 ——— 服务端启动流程 (上)中我们有提到过NioEventLoop的事件循环会在第一个非EventLoop线程提交的任务时得到启动,而这里的注册任务就是第一个非EventLoop线程所提交给NioEventLoop的任务。 总的来说,这段代码完成了将注册操作封装为一个任务提交给NioEventLoop的taskQueue中,然后会创建NioEventLoop所关联的线程并且启动了NioEventLoop的事件循环,事件循环会取出taskQueue中的任务来执行,即,使得我们的任务能在EventLoop线程上执行。
继续深入注册的过程:register0(promise)
② doRegister():完成将当前的ServerSocketChannel注册到NioEventLoop中的Selector上,并将NioServerSocketChannel作为附加属性设置到SelectionKey中。
③ 将成员变量registered置位true,表示已经将Channel注册到NioEventLoop上了。
④ pipeline.invokeHandlerAddedIfNeeded():这一步就会完成我们前面ChannelInitializer封装的PendingHandlerAddedTask任务,即,调用ChannelInitializer的handlerAdded方法。ChannelInitializer的handlerAdded方法就会完成将我们程序serverBootstrap.handler(new LoggingHandler(LogLevel.INFO))定义的LoggingHandler添加到ChannelPipeline中,以及将ServerBootstrapAcceptor添加到ChannelPipeline的操作封装为一个task提交到EventLoop的taskQueue中,随后EventLoop的事件循环就会从taskQueue中取出该任务并执行。接着,ChannelInitializer会将自己从ChannelPipeline中移除。同时会回调ChannelInitializer的handlerRemoved方法,并将ChannelInitializer关联的ChannelHandlerContext的状态置为REMOVE_COMPLETE。REMOVE_COMPLETE状态表示ChannelHandler已经从ChannelPipeline中移除,并且ChannelHandler.handlerRemoved方法已经被调用。
⑤ 标志ChannelPromise(即,DefaultChannelPromise对象)为成功,即,将注册这个异步操作标志为成功完成。
⑥ pipeline.fireChannelRegistered():触发ChannelRegistered事件,该事件会在ChannelPipeline中得以传播。也就是说,ChannelRegistered这个入站事件,它会先被head处理,随后该事件通过一个ChannelHandlerContext来实现传递给ChannelPipeline中的下一个ChannelInboundHandler处理器处理,直到最后被tail所处理。
至此位置,“final ChannelFuture regFuture = initAndRegister();”的流程就分析完了。我们接下来看doBind(final SocketAddress localAddress)另一个重要的步骤
doBind0(regFuture, channel, localAddress, promise):将Channel绑定到本地指定端口上
a) 执行NioServerSocketChannel的绑定操作
b) 注册"绑定操作失败则关闭NioServerSocketChannel"的监听器。
我们对NioServerSocketChannel的绑定操作进行展开:
可见绑定事件会在ChannelPipeline中得以传播,因为bind是一个出站事件,因此它在ChannelPipeline中各个ChannelHandler的执行顺序是:tail ——> ...... ——> head。最终调用的是head的bind方法:
a) promise.setUncancellable():设置bind这个异步操作的状态为不可取消的(UNCANCELLABLE)。
b) doBind(localAddress):完成底层ServerSocket绑定指定端口号的操作。
c) 在ChannelPipeline中触发ChannelActive事件。ChannelActive是一个入站事件,因此它在ChannelPipeline中的传播顺序为:head ——> ...... ——> tail。
我们接着看head中ChannelActive事件的处理方法: head在处理ChannelActive事件时,除了将事件传播给ChannelPipeline中的下一个处理器外,还会执行readIfIsAutoRead()方法。 channel.config()返回的是一个NioServerSocketChannelConfig对象,而其中的autoRead属性默认就为1,因此channel.read()得以执行。
这样一来read事件就会在ChannelPipeline中传播,因为read是一个出站事件,所以它在ChannelPipeline中的传播顺序为:tail ——> ...... ——> head。
最终调用的是head的read方法:
跟进去最终会调用AbstractNioChannel的doBeginRead方法: 这里readInterestOp就是Netty 源码解析 ——— 服务端启动流程 (上)中所提到的,在构建NioServerSocketChannel对象是所传进来的SelectionKey.OP_ACCEPT。将SelectionKey.OP_ACCEPT设置为ServerSocketChannel注册到Selector中所感兴趣的事件。这样一来,NioEventLoop的事件循环中执行select操作时,就会关注该NioServerSocketChannel的SelectionKey.OP_ACCEPT事件,一旦SelectionKey.OP_ACCEPT事件发生时,NioEventLoop就会对该事件进行处理。
d) 标志ChannelPromise(即,DefaultChannelPromise对象)为成功,即,将绑定这个异步操作标志为成功完成。
至此位置,“serverBootstrap.bind(5566)”的操作流程就全部分析完了。
ChannelFuture channelFuture = serverBootstrap.bind(5566).sync(); 的sync()操作
我们继续看serverBootstrap.sync(),即,PendingRegistrationPromise的sync():await(): ① isDone():判断当前这个异步操作是否已经完成。这里就是判断bind这个异步操作是否已经完成,如已经完成则直接返回,如操作还未完成则执行下一步。完成可以是“成功的完成”、“失败的完成”、“取消的完成”。
② 判断当前线程是否被设置了“中断”标志,如果被设置了“中断”标志则抛出InterruptedException异常。否则执行下一步。
③ checkDeadLock():会对潜在的死锁可能性进行检查,如果Netty发现你准备执行Object wait()操作的线程是EventLoop线程的话,则认为该操作可能会导致死锁,就会抛出一个BlockingOperationException异常。
④ 使用同步代码块(其中this为同步监视器,即,PendingRegistrationPromise对象),再次通过isDone()异步操作是否已经完成,如果完成则返回。如果还未完成,则执行incWaiters()方法将waiters成员属性加1,waiters属性表示我们执行Java底层wait()操作被挂起的线程数,它们需要通过执行notifyAll来唤起(注意,在执行waiters加一之前会先判断waiters的值是否等于Short.MAX_VALUE,如果是则会抛出IllegalStateException异常,表示当前有太多的线程被挂起在等待notifyAll())。执行Java底层的wait()方法,线程被挂起,等待唤醒。这里当异步操作完成时,就会触发notifyAll()方法的调用,这样当前的wait()挂起的线程就会被唤醒。最后执行decWaiters()方法将waiters成员属性减1。
我们简单来看看唤醒操作:当waiters>0是会调用notifyAll(),waiters>0说明当前有线程因为wait()被挂起等待被唤醒,所以你会看到在wait()操作前一定会先调用incWaiters(),而在wait()操作后也一定会执行decWaiters()方法。
我们再来看看都有什么操作会调用checkNotifyWaiters():👆从这里就可以验证上面所说的观点,当异步操作完成时如果当前有线程因为wait()操作被挂起,则会触发notifyAll()操作。异步操作的完成包括,“成功的完成”、“失败的完成”、“取消的完成”。
至此为止,整个服务端程序已经启动完毕,并等待这客户端的请求连接以执行连接处理等等。
总结
至此为止,我们整个的启动流程就算是完整的分析完了,内容真不少,还涉及到不少NIO、JUC以及设计模式等的知识。
前面的源码分析充斥了大量的代码片段,这里,我再用全文字对整个启动的流程进行一个概述,以此来做个总结和回顾。
使用Netty实现NIO网络通信的应用的启动流程:
① 构建两个NioEventLoopGroup(非阻塞事件循环组):bossGroup和workGroup。在NIO模式中我们会在构建EventLoopGroup的时候就将EventLoop一并的给构建处理,每个NioEventLoopGroup默认会构建操作系统逻辑CPU数2倍的NioEventLoop(非阻塞事件循环),每个NioEventLoop与一个线程关联,并且NioEventLoop的整个生命周期只会和这个线程绑定。bossGroup用于接收客户端的连接请求,它不处理和客户端的通信,而是将接收到的请求转交给workerGroup,由workerGroup从它的NioEventLoop[]数组中取出(以轮询的方式)一个NioEventLoop,然后将Channel注册到这个NioEventLoop上,与客户端的通信都会在该NioEventLoop所在线程上进行。
每个NioEventLoop都有它们自己的线程、Selector以及事件循环。在构建NioEventLoop对象时就会开启Selector。
② 启动类设置。对handler、childHandler、channel类型、options、attrs、childOptions、childAttrs。其中handler、options、attrs是用于给bossGroup中ServerChannel的属性设置的;而childHandler、childOptions、childAttrs是用于给workerGroup中的child Channel的属性设置的,也就是客户端连接的Channel。
channel用于设置bossGroup中Channel类型(即,NioServerSocketChannel.class),channel(NioServerSocketChannel.class)完成了将NioServerSocketChannel的ChannelFactory赋值给AbstractBootstrap的成员变量channelFactory。该ChannelFactory是一个ReflectiveChannelFactory实例,它能够通过以反射的形式调用Channel默认的构造方法来实例化一个新的Channel。
在服务端的启动设置中,group()、channel()/channelFactory()、childHandler()这三中配置是必须的,否则调用bind操作就抛出IllegalStateException异常。
③ 完成NioServerSocketChannel的构建和初始化操作,并将该NioServerSocketChannel注册到bossGroup中的某个NioEventLoop上,然后执行NioServerSocketChannel绑定指定端口的操作。
首先NioServerSocketChannel的构建操作包含:
通过ReflectiveChannelFactory使用反射的形式构建NioServerSocketChannel对象,设置ServerSocketChannel为非阻塞模式(即,ServerSocketChannel.configureBlocking(false)),并在构建NioServerSocketChannel对象的时候会一并的将于该Channel关联的ChannelPipeline(即,DefaultChannelPipeline对象)给构建出来。
ChannelPipeline是一个ChannelHandler的集合,这些ChannelHandler会处理或拦截一个Channel的入站事件或出站操作。一个事件要么被一个ChannelInboundHandler处理要么被一个ChannelOutboundHandler处理。随后该事件通过一个ChannelHandlerContext来实现传递给ChannelPipeline中的下一个具有一样父类的处理器,即同一个方向的处理器。
在构建DefaultChannelPipeline的时候会将链表头head和链表尾tail给一并创建好,head它同时实现了ChannelOutboundHandler和ChannelInboundHandler,也就是说它即会处理入站数据也会处理出站数据、它持有NioMessageUnsafe对象,该类用于完成Channel真实的I/O操作和传输;tail它仅实现了ChannelInboundHandler,因此TailContext是入站事件的最后一个ChannelHandler,它主要完成了一些资源的释放工作。
NioServerSocketChannel的初始化操作具体指:对进构建的NioServerSocketChannel进行options、attrs属性的设置。往该NioServerSocketChannel的ChannelPipeline中添加了一个ChannelInitializer,并ChannelInitializer封装为PendingHandlerAddedTask任务,它会在channel注册到EventLoop时得以调用。
在将NioServerSocketChannel注册到NioEventLoop时:会先触发NioEventLoop的事件循环的启动,当前的注册操作是一个异步操作,它会在EventLoop线程上执行,并且注册操作一旦开始就不能被取消。注册操作会将当前NioServerSocketChannel所关联的ServerSocketChannel注册到NioEventLoop中的Selector上。接着执行PendingHandlerAddedTask任务,即调用ChannelInitializer的handlerAdded方法,将我们程序设定的handler添加到ChannelPipeline中,以及将ServerBootstrapAcceptor(ServerBootstrapAcceptor的作用是将NioServerSocketChannel所接收到的连接请求NioSocketChannel给注册到childGroup中)添加到ChannelPipeline的操作封装为一个task提交到EventLoop的taskQueue中,随后EventLoop的事件循环就会从taskQueue中取出该任务并执行,最后将ChannelInitializer自己从ChannelPipeline中移除。接着会将当前注册操作的异步结果标识为成功。然后触发ChannelRegistered事件,该事件会在ChannelPipeline中得以传播。
NioServerSocketChannel绑定指定端口:绑定操作时一个出站操作,该操作传播至链表头head时,会执行底层ServerSocket绑定指定端口的操作。然后将SelectionKey.OP_ACCEPT设置为ServerSocketChannel注册到Selector中所感兴趣的事件。这样一来,NioEventLoop的事件循环中执行select操作时,就会关注该NioServerSocketChannel的SelectionKey.OP_ACCEPT事件,一旦SelectionKey.OP_ACCEPT事件发生时,NioEventLoop的事件循环就会处理该事件。最后将当前绑定异步操作标志为成功完成。
到此为止,整个服务端就启动了,也就可以开始接受客户端的请求。
附
关于NioServerSocketChannel的构造方法:
github上valodzka提出了这么个意见:当前SelectorProvider.provider()(该方法包含了同步代码块)会在每一个channel被创建的时候调用。这个会导致不必要的阻塞当应用创建了大量的连接的时候(每秒大约有5000个连接的时候,性能会下降1%)。
可能的解决方法是:存储SelectorProvider.provider()方法的结果,然后直接调用SelectorProvider.openServerSocketChannel(),而不去调用SocketChannel.open()方法。
下面我们用NioSocketChannel的源码来说明这个问题,更有意义。而NioServerSocketChannel的构建逻辑同NioSocketChannel是一样的。
这样每次需要创建一个SocketChannel就需要调用一次SocketChannel.open()方法,也就需要去调用一次SelectorProvider.provider()。从下面SelectorProvider.provider()源码我们可以看见,SelectorProvider.provider()的实现是在一个同步代码块中去实现相应的逻辑。那么,这样一来,当应用需要创建了大量的连接的时候会导致不必要的阻塞。
因此根据valodzka的提议,就有了现在Netty源码中的这种方式来避免这个问题。SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider():将获得的系统默认的选择器提供者作为一个静态常量,然后可以直接使用DEFAULT_SELECTOR_PROVIDER的openSocketChannel()来创建一个SocketChannel实例。这样就避免了无意义的重复调用SelectorProvider.provider()而导致的不必要的锁操作。
后记
本文主要接Netty 源码解析 ——— 服务端启动流程 (上)对Netty服务端的启动流程的剩余不走进行了源码的解析。建议大家可以看看 Netty in action ——— 异步和事件驱动、Netty in action ——— Bootstrapping、Netty in action ——— 事件循环 和 线程模式、Netty in action ——— 传输协议、Netty in Action ——— ChannelHandler 和 ChannelPipeline这几篇文章,这几篇文章是在笔者写源码解析时涉及到的一些知识点的理论性文章,主要来自于《Netty in action》一书。
若文章有任何错误,望大家不吝指教:)
参考
圣思园《精通并发与Netty》
网友评论