前言
本文将围绕 Netty 服务端相关TCP的属性配置以及 NioServerSocketChannel 如何被封装到Netty 的事件循环处理器上进行工作等方面进行讲解。
正文
ServerBootstrap
![](https://img.haomeiwen.com/i10132821/75d8352acaef7fe7.png)
ServerBootstrap 是服务端启动的核心类,提供了一系列初始化Channel和属性配置的相关方法。具体来看其相关方法
服务端启动的示例代码
NioEventLoopGroup mainGroup = new NioEventLoopGroup(1);
NioEventLoopGroup subGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors());
ServerBootstrap b = new ServerBootstrap();
b.group(mainGroup, subGroup)
.option(ChannelOption.TCP_NODELAY, true)
.attr(AttributeKey.valueOf("name"), "zhangsan")
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
// do
ch.pipeline().addLast(new StringDecoder())
.addLast(new EchoServerHandler())
.addLast(new StringEncoder())
;
}
});
ChannelFuture future = null;
try {
future = b.bind(8086).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
mainGroup.shutdownGracefully().sync();
subGroup.shutdownGracefully().sync();
}
- 创建服务端与客户端使用的线程池
- 创建服务端启动类 ServerBootstrap
- 定义相关配置
option、attr 方法将服务端相关的分别配置保存到AbstractBoostrap 的 options、attrs 中 , .channel(Class class) 方法用于关联相关的Channel并将分配一个默认工厂类来创建关联的Channel
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
//...
this.channelFactory = channelFactory;
return self();
}
而 ReflectiveChannelFactory 则提供一个newChannel() 方法来实例传入的 Class
@Override
public T newChannel() {
try {
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
紧接着调用 channelHandler 为连入的客户端绑定 ChannelHandler
- 将所有属性配置好之后,使用 bind 方法绑定服务端的网络地址,进而开始真正的启动
AbstractBootstrap.java
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
private ChannelFuture doBind(final SocketAddress localAddress) {
// 初始化 Channel 并注册到 Selector 多路复用器上
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
1.首先我们可以看到 bind 是一个重载的方法,最终会封装成SocketAddress
2.dobBind 中 initAndRegister方法用来初始化 ServerSocketChannel 并将其注册到 Selector 多路复用器上
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
3.channelFactory.newChannel() 为通过反射得到的 Channel 对象,在启动示例代码中 .channel(NioServerSocketChannel.class)
处设置的channelFactory
4.将得到的 Channel 进行初始化,即调用 init 方法。而 init 是一个抽象方法,委托给子类 ServerBootstrap 来实现
@Override
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
将启动之前设置的 options 与 attrs 属性设置到 Channel 中,然后获取 Channel 的 ChannelPipeline ,用来设置服务端的 ChannelHandler ,这里可以看到 ServerBootstrapAcceptor 将所有客户端需要用到的配置封装该类,目的就是在服务端与客户端建立好连接之后,能够为新建立的连接分配 ChannelHandler 和 TCP相关的属性
接着回到 initAndRegister() 方法中,当channel 初始化之后,便开始注册了。
( config().group().register(channel);
)
config() 方法返回的是在ServerBootstrap 中默认的一个配置类 ServerBootstrapConfig ,其中持有ServerBoostrap的引用
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
group()方法则实际上是在调用ServerBoostrap 的group() 方法,用来返回启动之前配置的线程池
register 方法为线程池 EventLoopGroup 接口的方法,该方法将会默认采用轮询的策略选择一个可用的线程来调用 register 方法
最终来到 SingleThreadEventLoop 的 register(Channel channel) 方法
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
这里将channel 封装到 ChannelPromise(用于事件的回调通知) 中,再将其作为参数传入 promise.channel().unsafe().register(this, promise);
channel 的内部类 unsafe 的 register 方法中(channel 实例化时将会同时初始化unsafe 与 pipeline)
参考 AbstractChannel.java
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
// NioServerSocketChannel 的构造函数传入 感兴趣的事件 SelectionKey.OP_ACCEPT
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
进入 AbstractUnsafe 的 register方法,这里是服务端真正注册的地方
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
//......
// 设置负责 NioServerSocketChannel 的线程
AbstractChannel.this.eventLoop = eventLoop;
//如果与工作线程在同一线程,则直接注册
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
//否则提交任务到该工作线程中执行
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
private void register0(ChannelPromise promise) {
try {
// 检查是否 promise 已经设置取消注册了
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
//真正注册方法
doRegister();
neverRegistered = false;
registered = true;
// 设置回调事件的注册状态为成功
safeSetSuccess(promise);
//唤起 registered 事件在pipeline中传递
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
//唤起 active 事件在pipeline中传递
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
//唤起 read 事件在pipeline中传递
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
进行一些状态及参数的检查,在 register0 方法中 调用 doRegister 方法,该方法同样委托给子类实现
AbstractChannel.java
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
可以看到在该方法里使用 javaChannel() 来获取原生的ServerSocketChannel 来将感兴趣的事件注册到 Selector 上,这里是注册的 0 ,表示暂时不能接收连接,只是一个保留的状态,那么真正注册的地方在哪里?
答案是 AbstractNioChannel 的 doBeginRead() 中,这里的 readInterestOp 是在实例 Channel 时传入的
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
背景
DefaultChannelPipeline 中保存着一个双向链表,默认 HeadContext 作为该链表的头部引用 ,用变量 head表示 。AbstractChannel的register0 中 pipeline.fireChannelActive(); 执行时, 该head 将会被触发 channelActive 方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
readIfIsAutoRead();
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
在 channelActive 方法中会调用readIfIsAutoRead方法,
在 readIfIsAutoRead 根据 ChannelConfig 配置判断是否开启自动执行所有 Channelhandler 的read方法(默认开启)
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
@Override
public final void beginRead() {
assertEventLoop();
if (!isActive()) {
return;
}
try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
其中就包括了 HeadContext, HeadContexrt 中的read方法将去调用 unsafe的beginRead(); 方法,最后进入到 doBeginRead,便将感兴趣的事件注册为SelectionKey.OP_ACCEPT
写到最后:
如有差错,请指出,望谅解;如有问题,请评论;望支持
网友评论