上一节总结了channel代码的架构,了解了从鼎城channel接口的定义以及一层一层最后区分开客户端channel和服务端channel。从中也可以体会到抽象和集成的特点。
本节回顾一下在服务端启动初始化的时候ServerBootstrap#init()
,主要做了一些参数的配置。其中对于childGroup
,childOptions
,childAttrs
,childHandler
等参数被进行了单独配置。作为参数和ServerBootstrapAcceptor
一起,被当作一个特殊的handle
,封装到pipeline
中。ServerBootstrapAcceptor
中的eventLoop
为workGroup
。
@Override
void init(Channel channel) throws Exception {
//配置AbstractBootstrap.option
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
//配置AbstractBootstrap.attr
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
//配置pipeline
ChannelPipeline p = channel.pipeline();
//获取ServerBootstrapAcceptor配置参数
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
//配置AbstractBootstrap.handler
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
//配置ServerBootstrapAcceptor,作为Handle紧跟HeadContext
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
可见,整个服务端pipeline
的结构如下图所示。bossGroup控制IO事件的检测与处理,整个bossGroup对应的pipeline
只包括头(HeadContext
)尾(TailContext
)以及中部的ServerBootstrap.ServerBootstrapAcceptor
。
当新连接接入的时候AbstractNioMessageChannel.NioMessageUnsafe#read()
方法被调用,最终调用fireChannelRead()
,方法来触发下一个Handler
的channelRead
方法。而这个Handler
正是ServerBootstrapAcceptor
//传递到下一个Handle: ServerBootstrapAcceptor#channelRead()方法
//readBuf.get(i) 获取临时存储的一个channel
pipeline.fireChannelRead(readBuf.get(i));
接着查看ServerBootstrapAcceptor
,它是ServerBootstrap
的内部类,同时继承自ChannelInboundHandlerAdapter
。也是一个ChannelInboundHandler
。其channelRead
主要做了以下几件事。
- 为客户端
channel
的pipeline
添加childHandler
- 设置客户端TCP相关属性
childOptions
和自定义属性childAttrs
- workGroup选择NioEventLoop并注册Selector
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
private final EventLoopGroup childGroup;
private final ChannelHandler childHandler;
private final Entry<ChannelOption<?>, Object>[] childOptions;
private final Entry<AttributeKey<?>, Object>[] childAttrs;
private final Runnable enableAutoReadTask;
ServerBootstrapAcceptor(
final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;
// Task which is scheduled to re-enable auto-read.
// It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may
// not be able to load the class because of the file limit it already reached.
//
// See https://github.com/netty/netty/issues/1328
enableAutoReadTask = new Runnable() {
@Override
public void run() {
channel.config().setAutoRead(true);
}
};
}
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//该channel为客户端接入时创建的channel
final Channel child = (Channel) msg;
//添加childHandler
child.pipeline().addLast(childHandler);
//设置TCP相关属性:childOptions
setChannelOptions(child, childOptions, logger);
//设置自定义属性:childAttrs
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
//选择NioEventLoop并注册Selector
childGroup.register(child)
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
private static void forceClose(Channel child, Throwable t) {
child.unsafe().closeForcibly();
logger.warn("Failed to register an accepted channel: {}", child, t);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final ChannelConfig config = ctx.channel().config();
if (config.isAutoRead()) {
// stop accept new connections for 1 second to allow the channel to recover
// See https://github.com/netty/netty/issues/1328
config.setAutoRead(false);
ctx.channel().eventLoop().schedule(enableAutoReadTask, 1, TimeUnit.SECONDS);
}
// still let the exceptionCaught event flow through the pipeline to give the user
// a chance to do something with it
ctx.fireExceptionCaught(cause);
}
}
- 为客户端
channel
的pipeline
添加childHandler
回顾引导的代码,在配置childHandler的时候,使用了ChannelInitializer
的一个自定义实例。并且覆盖了其initChannel
方法,改方法获取到pipeline
并添加具体的Handler
。
public class Server {
private static final int PORT = 1111;
public static void main(String[] args) throws Exception {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
bossGroup.setIoRatio(100);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
try {
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.attr(AttributeKey.newInstance("attr"), "attr")
.childAttr(AttributeKey.newInstance("childAttr"), "childAttr")
.handler(new DataServerInitializer())
.childHandler(new DataServerInitializer());
ChannelFuture future = serverBootstrap.bind(PORT).sync();
future.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
class DataServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast( new DataServerHandler());
}
}
查看ChannelInitializer
具体的添加逻辑,handlerAdded
方法。其实在initChannel
逻辑中,首先是回调到用户代码执行initChannel
,用户代码执行添加Handler
的添加操作,之后将ChannelInitializer
自己从pipeline
中删除。
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
// This should always be true with our current DefaultChannelPipeline implementation.
// The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
// surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
// will be added in the expected order.
//初始化Channel
if (initChannel(ctx)) {
// We are done with init the Channel, removing the initializer now.
removeState(ctx);
}
}
}
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
//回调到用户代码
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
// We do so to prevent multiple calls to initChannel(...).
exceptionCaught(ctx, cause);
} finally {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
//删除本身
pipeline.remove(this);
}
}
return true;
}
return false;
}
- workGroup选择NioEventLoop并注册Selector
这个过程在之前学习的《注册Selector》过程一样,不同的地方是unsafe实例不一样。
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
//服务端unsafe是NioMessageUnsafe,客户端的是NioByteUnsafe
promise.channel().unsafe().register(this, promise);
return promise;
}
网友评论