简介
ChannelPipeline相关的接口和类如下图所示,下面逐一分析:
![](https://img.haomeiwen.com/i2592685/c0e5f6c4ac5a6276.png)
在ChannelPipeline的源码中,netty作者写了很长的一段javadoc对netty的pipeline机制进行介绍,其中最关键的部分可以用这幅图表示:
![](https://img.haomeiwen.com/i2592685/a9590a2eb385f737.png)
可以看到Pipeline由一个Handler的链表组成,Handler分为两种,一种是Inbound,一种是Outbound,对应的接口为ChannelInboundHandler和ChannelOutboundHandler;
ChannelInboundHandler
接口定义如下:
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
void channelActive(ChannelHandlerContext ctx) throws Exception;
void channelInactive(ChannelHandlerContext ctx) throws Exception;
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
Inbound该怎么理解呢,我个人的理解是由操作系统层级传入到应用层级;而Outbound则是相反;因此Inbound对应的通常是一些事件,而Outbound对应的通常是一些操作;
ChannelOutboundHandler
接口定义如下:
void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
void connect(
ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception;
void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void read(ChannelHandlerContext ctx) throws Exception;
void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
void flush(ChannelHandlerContext ctx) throws Exception;
DefaultChannelHandlerContext
在前面,我讲到pipeline由一系列的Handler以链表的形式存在,这种说法并不准确,实际上是由ChannelHandlerContext组成的链表;
ChannelHandlerContext的实现类为DefaultChannelHandlerContext,包含如下的成员变量:
volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;
private final ChannelHandler handler;
private final boolean inbound;
private final boolean outbound;
private final DefaultChannelPipeline pipeline;
private final String name;
final EventExecutor executor;
private ChannelFuture succeededFuture;
private Runnable invokeChannelReadCompleteTask;
private Runnable invokeReadTask;
private Runnable invokeChannelWritableStateChangedTask;
private Runnable invokeFlushTask;
从上面可以看出:
- 通过next和prev,ChannelHandlerContext组成了一个链表;每个Channel都有自己的ChannelPipeline, 都有自己的ChannelHandlerContext链表;
- 每个ChannelHandlerContext都包含一个ChannelHandler成员,可以看做对Handler的封装;
- 通过布尔值的inbound和outbound变量区分是InboundHandler还是OutboundHandler,具体的判断逻辑见DefaultChannelHandlerContext.isInbound和DefaultChannelHandlerContext.isOutbound方法;
DefaultChannelPipeline
DefaultChannelPipeline是ChannelPipeline的实现类,包含如下成员变量:
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
private final Channel channel;
public final ChannelPipeline fireChannelActive() {
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}
public final ChannelFuture write(Object msg) {
return tail.write(msg);
}
- 其中head为内部类HeadContext,tail为TailContext;
- 对于inbound事件,由HeadContext第一个处理,并向后续的InboundHandler进行传递,最后一个处理的为TailContext(实现了ChannelInboundHandler接口);另外要特别注意TailContext的ChannelInboundHandler实现,什么也没做,因此Inbound事件传递到此最终结束;
- 对于outbound事件,由TailContext第一个进行处理,并向后续的OutboundHandler进行传递;最后一个处理的Handler为HeadContext;
纵上所述,Inbound的处理顺序为head到tail,Outbound的处理顺序为tail到header; - 处理Outbound操作最后的ChannelHandlerContext为HeadContext,可以看到实际上最后都是由unsafe来进行处理;
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
unsafe.disconnect(promise);
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
unsafe.close(promise);
}
网友评论