0、前言
netty作为现在常用的NIO框架,以其强大的健壮性、性能、可定制性、可扩展性在同类框架中首屈一指,大部分常见中间件如果用到了远程通信,大多选择于此。
个人也对其非常感兴趣,遂打算写一个系列,来从源码上聊聊netty。本文是第一篇netty的源码文章,先从netty服务端的启动来聊吧,其中也会对照下jdk nio的知识。
1、相关案例
在写之前,先写明下netty的server启动demo样例、jdk nio的server启动demo样例,帮助大家进行下回顾,也方便大家理解netty如何做的封装。
1.1、jdk nio 样例
这是比较基础的jdk nio server端启动的相关配置
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
int port = 5566;
serverChannel.socket().setSoTimeout(3000);
serverChannel.socket().bind(new InetSocketAddress(port),128);
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while(true){
int n = selector.select();
if(n > 0) {
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey selectionKey = iter.next();
......
iter.remove();
}
}
}
1.2、netty样例
// 配置服务端的NIO线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(
MarshallingCodeCFactory
.buildMarshallingDecoder());
ch.pipeline().addLast(
MarshallingCodeCFactory
.buildMarshallingEncoder());
ch.pipeline().addLast(new SubReqServerHandler());
}
});
// 绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
2、netty启动流程
关于NioEventLoopGroup的内容放到下个章节来讲解,本文先讲解下server端启动,可以暂时将NioEventLoopGroup理解为一组线程,管理eventLoop的生命周期,在执行的时候相关操作时,从其中选取一个eventLoop来进行执行。
AbstractBootstrap#bind()[用户代码入口]
AbstractBootstrap#doBind
AbstractBootstrap#initAndRegister()[初始化并注册]
1. 创建服务端Channel
ChannelFactory#newChannel()[创建服务端channel][通过反射创建服务端channel]
NioServerSocketChannel#DEFAULT_SELECTOR_PROVIDER[通过SelectorProvider#provider()生成]
NioServerSocketChannel()[NioServerSocketChannel构造方法]
NioServerSocketChannel#newSocket()[通过jdk来创建底层jdk channel,此处等同于1.1中的ServerSocketChannel.open()]
AbstractChannel(Channel)[AbstractChannel的构造方法,创建id(DefaultChannelId),unsafe(不同Channel不同),pipeline(DefaultChannelPipeline)]
AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp)[AbstractNioChannel的构造方法,设置ch、readInterestOp属性,并设置为非阻塞模式]
ch.configureBlocking(false);[设置channel为非阻塞模式]
AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp)[AbstractNioMessageChannel构造方法,仅做透传]
super(null, channel, Selectioney.OP_ACCEPT)[此处可以看到传入的channel为新创建的serverSocketChannel,readInterestOp为OP_ACCEPT状态]
config = new NioServerSocketChannelConfig(this, javaChannel().socket());[初始化config,tcp参数配置类]
javaChannel().socket()[javaChannel()返回的是jdk原生的channel对象,即上面创建的NioServerSocketChannel对象,等同于1.1中的serverChannel.socket()]
DefaultChannelConfig(Channel)[DefaultChannelConfig的构造方法]
this(channel, new AdaptiveRecvByteBufAllocator());[AdaptiveRecvByteBufAllocator用于构建一个最优大小的缓冲区来接收数据。该缓存的容量会尽可能的足够大以读入所有的入站数据,并且该缓存的容量也尽可能的小以不会浪费它的空间。]
setRecvByteBufAllocator(allocator, channel.metadata());[对allocator的maxMessagesPerRead字段赋值,默认16(设置一个读循环可读取的最大消息个数);校验allocator不同为null]
DefaultServerSocketChannelConfig(ServerSocketChannel channel, ServerSocket javaSocket)[DefaultServerSocketChannelConfig构造方法,设置javaSocket]
2. 初始化服务端Channel
AbstractBootstrap#init(channel);[初始化channel]
AbstractBootstrap#options0();[取到用户自定义的options,此处的option就是1.2的option方法填入]
AbstractBootstrap#setChannelOptions(channel, options, logger);[设置socket、channelConfig的值]
AbstractBootstrap#attrs0();[取到用户自定义的attrs]
channel.attr(key).set(e.getValue());[将自定的value填入channel的AttributeMap中]
channel.pipeline();[获取channelPipeline,默认为DefaultChannelPipeline]
childGroup、childHandler、childOptions、childAttrs进行取值操作
pipeline.addLast(new ChannelInitializer<Channel>() {...}[往pipeline中添加处理器]
handler=config.handler();[获取handler,此处的handler为主handler]
pipeline.addLast(handler); [将handler添加到pipeline末尾]
ch.eventLoop().execute(new Runnable(){...})[通过eventLoop来执行runnable,这里的eventLoop其实是在下面的register才注册上的]
pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));[runnable里的内容,生成一个ServerBootstrapAcceptor,并传入child的信息。最后放到pipeline的尾部,关于ServerBootstrapAcceptor的详细描述看下面]
DefaultChannelPipeline#addLast(executor, null, h); // 将handler添加到pipeline的尾部
newCtx = newContext(group, filterName(name, handler), handler); // 创建DefaultChannelHandlerContext实例
addLast0(newCtx); // 添加操作,将newCtx添加到pipeline的尾部
newCtx.setAddPending(); // 设置handlerState为ADD_PENDING
callHandlerCallbackLater(newCtx, true); // 由于此时还未注册,所以先设置pendingHandlerCallbackHead,后续调用
PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx); // 生成一个PendingHandlerAddedTask任务
pendingHandlerCallbackHead = task; // 赋值给pendingHandlerCallbackHead
3. 注册selector
config().group().register(channel)[将Channel与eventLoopGroup进行绑定]
config().group()[获取group属性,即eventLoopGroup]
EventLoopGroup#register(channel)[将channel与一个EventLoop绑定]
MultithreadEventLoopGroup#register
chooser.next().register(channel);[通过chooser选出一个EventLoop]
SingleThreadEventLoop#register
register(new DefaultChannelPromise(channel, this));[DefaultChannelPromise是一个可写的future]
promise.channel().unsafe().register(this, promise);
AbstractChannel.AbstractUnsafe#register(EventLoop,ChannelPromise)
AbstractChannel.this.eventLoop = eventLoop;[将eventLoop赋值]
AbstractUnsafe#register0(promise);[此处是在eventLoop中执行,相当于异步执行]
AbstractUnsafe#doRegister();[进行实际注册]
AbstractNioChannel#doRegister[进行实际的注册操作]
javaChannel().register(eventLoop().unwrappedSelector(), 0, this);[将channel注册到selector上,返回SelectionKey,类似于1.1中的serverChannel.register(selector, SelectionKey.OP_ACCEPT);只不过此处的ops为0(即什么事件都不感兴趣),那么此处主要是将netty中的channel绑定到SelectionKey上]
DefaultPipeline#invokeHandlerAddedIfNeeded();[在设置promise成功之前,调用了handlerAdded(...)方法]
DefaultChannelPipeline#callHandlerAddedForAllHandlers[会执行PendingHandlerAddedTask(ChannelInitializer为PendingHandlerAddedTask的子类)的handlerAdded(..)、PendingHandlerRemovedTask的handlerRemoved(..)方法]
AbstractChannel#safeSetSuccess(promise);[将promise设置为完成状态]
DefaultPipeline#fireChannelRegistered();[执行channelRegistered事件]
if(isActive()){[判断jdk中的ServerSocket是否已经绑定;ServerSocket#isBound]
if (firstRegistration) { [第一次注册]
DefaultPipeline#fireChannelActive();[active状态下,只有第一次注册时,才会执行;避免channel多次注销与注册导致触发多次]
} else if (config().isAutoRead()) { [是否自动读,默认值为true]
beginRead();[非第一次注册,且设置了自动读,则我们应该去读,以便处理读入的数据]
}
}
4. 端口绑定
doBind0(regFuture, channel, localAddress, promise);[进行绑定处理,这里可能有些同学有疑问:注册有可能是异步的,那么这里进行端口绑定是否注册完成了呢?请看下面的解释]
channel.eventLoop().execute(new Runnable() {...bind操作...})[在eventLoop中执行bind操作]
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);[runnable内的内容,进行bind操作;并添加了失败的监听处理器,在失败时关闭channel]
AbstractChannel#pipeline.bind(localAddress);[调用DefaultChannelPipeline的bind操作,关于pipeline的相关内容,另开一篇说明]
tail.bind(localAddress);[从tail进行bind操作]
AbstractChannelHandlerContext#bind(localAddress, newPromise());[进行绑定操作,生成一个DefaultChannelPromise进行传递]
next = findContextOutbound();[找下一个outbound]
next.invokeBind(localAddress, promise);[下一个执行invokeBind()方法]
HeadContext#bind()[最终从tail找到了head]
unsafe.bind(localAddress, promise);[通过AbstractChannel.AbstractUnsafe的bind方法进行绑定]
NioServerSocketChannel#doBind(localAddress);[进行真正的绑定处理了]
javaChannel().socket().bind(localAddress, config.getBacklog());[调用了jdk的绑定操作,等同于1.1中的jdk绑定操作serverChannel.socket().bind(new InetSocketAddress(port),128);]
pipeline.fireChannelActive();[当doBind()之前为非active,bind()之后为active,则调用]
AbstractChannelHandlerContext.invokeChannelActive(head);[触发channelActive事件调用]
next.invokeChannelActive();
DefaultChannelPipeline.HeadContext#channelActive[会执行到该方法]
DefaultChannelPipeline.HeadContext#readIfIsAutoRead[如果是自动读,则进行读取操作,默认是]
AbstractChannel#read();[进行读取操作]
DefaultChannelPipeline#read
AbstractChannelHandlerContext#read
next = findContextOutbound();
next.invokeRead();
DefaultChannelPipeline.HeadContext#read[最终调用到HeadContext的read]
AbstractChannel.AbstractUnsafe#beginRead[开始进行读取操作]
AbstractNioChannel#doBeginRead[真正的进行读取操作]
selectionKey.interestOps(interestOps | readInterestOp);[如果对readInterestOp操作不感兴趣,则在这里设置上,此处的readInterestOp是上面 1.创建服务端 Channel中设置的OP_ACCEPT,所以此处才设置上;如同1.1中的serverChannel.register(selector, SelectionKey.OP_ACCEPT);可以读一个新的连接了]
safeSetSuccess(promise);[将promise置为成功状态]
2.1、ChannelFactory是何时设置的?
在ServerBootstrap#channel(Class<? extends C> channelClass)方法执行时,会将传入的channelClass进行包装,生成new ReflectiveChannelFactory(channelClass),并赋值给channelFactory变量。
ReflectiveChannelFactory只是一个简单的封装类,newChannel()是直接通过反射调用传入的channelClass的无参构造方法初始化。
2.2、SelectorProvider.provider()有什么用?
JDK的NIO中的SocketChannel、ServerSocketChannel和Selector的实例初始化都通过SelectorProvider类实现。
ServerSocketChannel.open();
public static ServerSocketChannel open() throws IOException {
return SelectorProvider.provider().openServerSocketChannel();
}
SocketChannel.open();
public static SocketChannel open() throws IOException {
return SelectorProvider.provider().openSocketChannel();
}
Selector.open();
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
2.3、 SelectorProvider.provider()具体做了什么?
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
1. 如果配置了“java.nio.channels.spi.SelectorProvider”属性,则通过该属性值load对应的SelectorProvider对象,如果构建失败则抛异常。
2. 如果provider类已经安装在了对系统类加载程序可见的jar包中,并且该jar包的源码目录META-INF/services包含有一个java.nio.channels.spi.SelectorProvider提供类配置文件,则取文件中第一个类名进行load以构建对应的SelectorProvider对象,如果构建失败则抛异常。
3. 如果上面两种情况都不存在,则返回系统默认的SelectorProvider,即,sun.nio.ch.DefaultSelectorProvider.create();
4. 随后在调用该方法,即SelectorProvider.provider()。则返回第一次调用的结果。
5. 不同系统对应着不同的sun.nio.ch.DefaultSelectorProvider,主要是其create()方法返回的SelectorProvider不同。
- linux
sun.nio.ch.EPollSelectorProvider
- mac
sun.nio.ch.KQueueSelectorProvider
- windows
sun.nio.ch.WindowsSelectorProvider
2.4、ServerBootstrapAcceptor作用
此图摘自Doug Lee大神的主从Reactor多线程模型:
image.png
ServerBootstrapAcceptor如图中的acceptor,是一个入站处理器,其主要是完成mainReactor和subReactor的转交操作,将mainReactor accept的客户端channel,注册到subReactor上进行处理,从而实现主从Reactor模型的处理。
其内部的具体执行逻辑在后续accept处理的逻辑中会详细讲到。
2.5、注册有可能是异步的,那么这里进行端口绑定是否注册完成了呢?
在AbstractBootstrap#initAndRegister()的方法中返回ChannelFuture,此时并不等同于已经完成注册。
但是,如果此处成功的返回ChannelFuture,且cause为null。
则有如下两种可能:
- 我们是在eventLoop中调用注册的,那么上面的逻辑就相当于串行的,则这个注册已经完成。
所以,此时进行bind()或connect()都是安全的。 - 我们是从其他线程中来调用注册的,那么注册行为会被作为一个任务添加到eventLoop的task队列中进行排队执行。
这个时候执行bind()或connect()也是安全的,因为bind()或connect()也会被作为任务放入到eventLoop的task队列中,队列顺序执行。
3、结语
本章节主要讲解netty server端启动的相关处理逻辑,其中如有错误,欢迎大家留言沟通。
本文中提到了很多eventLoop相关的内容,下章节来看下NioEventLoop的启动,及其所完成的使命。
网友评论