netty作为服务端从ServerBootstrap启动, 本文默认传输层协议为TCP协议。
UML图
bootstrap.png如上图所示,Bootstrap和ServerBoostrap都继承自AbstractBootstrap.
ServerBootstrap
使用ServerBootstrap启动程序如下:
// accept线程池,负责接受新进来的连接,然后将连接注册到工作线程池中去
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 工作线程池,负责channel业务数据的读、写
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// serverSocketChannel上的handler
.handler(new LoggingHandler(LogLevel.INFO))
// socketChannel上的handler
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new DiscardServerHandler());
}
});
// 绑定监听端口
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
从上面可以看出,ServerBootstrap的核心操作是bind()方法,该方法新建一个serverSocketChannel,开始监听本地端口。ServerBootstrap接口与Bootstrap属性配置的最大区别是ServerBootstrap还需要设置socketChannel的线程池和handlers。
bind()方法代码如下:
@Override
public ChannelFuture bind(SocketAddress localAddress) {
return super.bind(localAddress);
}
AbstractBootstrap bind()方法如下:
public ChannelFuture bind(SocketAddress localAddress) {
// 监测该有的属性是不是都具备并且合法
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
doBind()方法如下:
private ChannelFuture doBind(final SocketAddress localAddress) {
// 创建、初始化、注册channel
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
// 注册失败
if (regFuture.cause() != null) {
return regFuture;
}
// 如果注册完成
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// 如果注册没有完成
// Registration future is almost always fulfilled already, but just in case it's not.
// 由于没有注册完成,所以channel未必有excutor,不能利用
// channel产生promise
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
// 绑定
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
initAndRegister()方法如下:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 创建channel
channel = channelFactory.newChannel();
// 初始化,为serverSocketChannel增加handler,可选项,参数
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 注册到线程中
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
init()方法与Bootstrap中的方法略有不同,代码如下:
@Override
void init(Channel channel) throws Exception {
// 设置选项
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
channel.config().setOptions(options);
}
// 设置属性
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
// 增加初始化handler
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 增加serverSocketChannel的handler
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// 增加最后一个handler,负责将channel注册到work线程
// 这是与Bootstrap init()方法的重要区别
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
doBind0()方法如下:
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
// 调用channel的操作进行绑定
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
总结: ServerBootstrap绑定一个serverSocketChannel的流程如下:
- 创建一个ServerSocketChannel
- 初始化一个ServerSocketChannel
- 注册ServerSocketChannel
- 调用channel的绑定操作,实现监听一个端口
网友评论