下面我们来分析Netty里的又一个重要的组件ChannelPipeline。
每个Channel有一个pipeline,它是一个双向链表,有一个head和tail,netty的事件就是通过ChannelPipeline进行传递的。每个channel内部都会持有一个ChannelPipeline对象pipeline. pipeline默认实现DefaultChannelPipeline内部维护了一个DefaultChannelHandlerContext链表。
ChannelPipeline
当channel完成register、active、read等操作时,会触发pipeline的相应方法。 1、当channel注册到selector时,触发pipeline的fireChannelRegistered方法。 2、当channel的socket绑定完成时,触发pipeline的fireChannelActive方法。 3、当有客户端请求时,触发pipeline的fireChannelRead方法。 4、当本次客户端请求,pipeline执行完fireChannelRead,触发pipeline的fireChannelReadComplete方法。
下面来看看ChannelPipeline的结构
DefaultChannelPipeline
ChannelPipeline是一个接口,默认的实现是DefaultChannelPipeline,每个ChannelPipeline上有ChannelHandlerContext,而ChannelHandlerContext包含了ChannelHandler,每个ChannelHandler负责处理具体的业务逻辑。下面来看看DefaultChannelPipeline有哪些属性
/**
* Head 节点
*/
final AbstractChannelHandlerContext head;
/**
* Tail 节点
*/
final AbstractChannelHandlerContext tail;
/**
* 所属 Channel 对象
*/
private final Channel channel;
/**
* 成功的 Promise 对象
*/
private final ChannelFuture succeededFuture;
1、TailContext实现了ChannelOutboundHandler接口。 2、HeadContext实现了ChannelInboundHandler接口。 3、head和tail形成了一个链表。
对于Inbound的操作,当channel注册到selector时,触发pipeline的fireChannelRegistered,从head开始遍历,找到实现了ChannelInboundHandler接口的handler,并执行其fireChannelRegistered方法。
public final ChannelPipeline fireChannelRegistered() {
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}
服务启动过程中,ServerBootstrap在init方法中,会给ServerSocketChannel的pipeline添加ChannelInitializer对象,其中ChannelInitializer继承ChannelInboundHandlerAdapter,并实现了ChannelInboundHandler接口,所以当ServerSocketChannel注册到selector之后,会触发其channelRegistered方法。
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance. 解决并发问题
try {
// 初始化通道
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// 发生异常时,执行异常处理
// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
// We do so to prevent multiple calls to initChannel(...).
exceptionCaught(ctx, cause);
} finally {
// 从 pipeline 移除 ChannelInitializer
remove(ctx);
}
return true; // 初始化成功
}
return false; // 初始化失败
}
在initChannel实现中,添加ServerBootstrapAcceptor实例到pipeline中。
ServerBootstrapAcceptor继承自ChannelInboundHandlerAdapter,负责把接收到的客户端socketChannel注册到childGroup中,由childGroup中的eventLoop负责数据处理。
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 接受的客户端的 NioSocketChannel 对象
final Channel child = (Channel) msg;
// 添加 NioSocketChannel 的处理器
child.pipeline().addLast(childHandler);
// 设置 NioSocketChannel 的配置项
setChannelOptions(child, childOptions, logger);
// 设置 NioSocketChannel 的属性
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
// 注册客户端的 NioSocketChannel 到 work EventLoop 中。
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// 注册失败,关闭客户端的 NioSocketChannel
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
ChannelPipeline的分析就到这里了。
网友评论