本节开始学习pipeline
相关的源码,回顾在前面学些的章节中,无论是服务端channel还是客户端channel在初始化的时候,都会创建其pipeline
。因此初始化pipeline
的入口在创建channed
的过程中。
protected AbstractChannel(Channel parent) {
//parent为创建次客户端channel的服务端channel(服务端启动过程中通过反射创建的)
this.parent = parent;
id = newId();
unsafe = newUnsafe();
//初始化pipeline
pipeline = newChannelPipeline();
}
初始化的时候将创建的channel
传入,实例化了DefaultChannelPipeline
protected DefaultChannelPipeline newChannelPipeline() {
//this即channel
return new DefaultChannelPipeline(this);
}
实例化DefaultChannelPipeline
的过程主要有三个
- 保存
channel
- 创建头节点(
HeadContext
)和尾节点(TailContext) - 构建双向链表
protected DefaultChannelPipeline(Channel channel) {
//保存channel
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
//创建头节点、尾节点
tail = new TailContext(this);
head = new HeadContext(this);
//双向链表的结构
head.next = tail;
tail.prev = head;
}
- 数据节点
ChannelHandlerContext
继承了AttributeMap
用于存储信息,实现了ChannelInboundInvoker
和ChannelOutboundInvoker
可进行事件的传播。
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
//组件操作类方法,channel,executor(eventLoop),handler,pipeline和本身
/**
* Return the {@link Channel} which is bound to the {@link ChannelHandlerContext}.
*/
Channel channel();
/**
* Returns the {@link EventExecutor} which is used to execute an arbitrary task.
*/
EventExecutor executor();
/**
* The unique name of the {@link ChannelHandlerContext}.The name was used when then {@link ChannelHandler}
* was added to the {@link ChannelPipeline}. This name can also be used to access the registered
* {@link ChannelHandler} from the {@link ChannelPipeline}.
*/
String name();
/**
* The {@link ChannelHandler} that is bound this {@link ChannelHandlerContext}.
*/
ChannelHandler handler();
/**
* Return {@code true} if the {@link ChannelHandler} which belongs to this context was removed
* from the {@link ChannelPipeline}. Note that this method is only meant to be called from with in the
* {@link EventLoop}.
*/
boolean isRemoved();
//实现传播类
@Override
ChannelHandlerContext fireChannelRegistered();
@Override
ChannelHandlerContext fireChannelUnregistered();
@Override
ChannelHandlerContext fireChannelActive();
@Override
ChannelHandlerContext fireChannelInactive();
@Override
ChannelHandlerContext fireExceptionCaught(Throwable cause);
@Override
ChannelHandlerContext fireUserEventTriggered(Object evt);
@Override
ChannelHandlerContext fireChannelRead(Object msg);
@Override
ChannelHandlerContext fireChannelReadComplete();
@Override
ChannelHandlerContext fireChannelWritabilityChanged();
@Override
ChannelHandlerContext read();
@Override
ChannelHandlerContext flush();
/**
* Return the assigned {@link ChannelPipeline}
*/
ChannelPipeline pipeline();
//内存分配方法
/**
* Return the assigned {@link ByteBufAllocator} which will be used to allocate {@link ByteBuf}s.
*/
ByteBufAllocator alloc();
//数据存储类方法
/**
* @deprecated Use {@link Channel#attr(AttributeKey)}
*/
@Deprecated
@Override
<T> Attribute<T> attr(AttributeKey<T> key);
/**
* @deprecated Use {@link Channel#hasAttr(AttributeKey)}
*/
@Deprecated
@Override
<T> boolean hasAttr(AttributeKey<T> key);
}
该数据节点有一个抽象的实现AbstractChannelHandlerContext
有几个重要的成员变量,实例化的时候会初始化以下除链表实现外的组件。
//用于实现链表结构
volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;
//标识handler的类型
private final boolean inbound;
private final boolean outbound;
//存储重要的组件
private final DefaultChannelPipeline pipeline;
final EventExecutor executor;
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
boolean inbound, boolean outbound) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.inbound = inbound;
this.outbound = outbound;
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
- 尾节点
DefaultChannelPipeline.TailContext
TailContext
是一个ChannelInboundHandler
可处理入站事件,大部分是读事件。同时也是一个数据节点AbstractChannelHandlerContext
,比较特殊。其传播方法大部分是空方法。
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, true, false);
setAddComplete();
}
@Override
public ChannelHandler handler() {
return this;
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) { }
@Override
public void channelUnregistered(ChannelHandlerContext ctx) { }
@Override
public void channelActive(ChannelHandlerContext ctx) {
onUnhandledInboundChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
onUnhandledInboundChannelInactive();
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) {
onUnhandledChannelWritabilityChanged();
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) { }
@Override
public void handlerRemoved(ChannelHandlerContext ctx) { }
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
onUnhandledInboundUserEventTriggered(evt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
onUnhandledInboundException(cause);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
onUnhandledInboundMessage(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
onUnhandledInboundChannelReadComplete();
}
}
跟进exceptionCaught
和channelRead
的具体实现可以发现,就如果有msg(说明发生了读事件触发了channelRead)而前面的InboundHandler
未做处理,触发了TailContext#channelRead()
,则会打印debug日志。
同理,异常处理exceptionCaught
也是一样的做法。
protected void onUnhandledInboundException(Throwable cause) {
try {
logger.warn(
"An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
"It usually means the last handler in the pipeline did not handle the exception.",
cause);
} finally {
ReferenceCountUtil.release(cause);
}
}
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
- 头节点
DefaultChannelPipeline.HeadContext
该节点即是ChannelOutboundHandler
也是ChannelInboundHandler
,同时也是数据节点ChannelOutboundHandler
。
持有unsafe
做具体的读、写、连接、绑定端口等IO事件的操作。
回忆之前学习的处理客户端channel的读事件中,最终都会走到这里的相关方法。
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, true, true);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
@Override
public ChannelHandler handler() {
return this;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
// NOOP
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
// NOOP
}
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
unsafe.bind(localAddress, promise);
}
@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) {
unsafe.connect(remoteAddress, localAddress, promise);
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
unsafe.disconnect(promise);
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
unsafe.close(promise);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
unsafe.deregister(promise);
}
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
unsafe.write(msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) {
unsafe.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.fireExceptionCaught(cause);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
invokeHandlerAddedIfNeeded();
ctx.fireChannelRegistered();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) {
ctx.fireChannelUnregistered();
// Remove all handlers sequentially if channel is closed and unregistered.
if (!channel.isOpen()) {
destroy();
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
//触发客户端channel选择NioEventLoop并向Selector注册读,轮询检测该事件
readIfIsAutoRead();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
ctx.fireChannelInactive();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.fireChannelRead(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.fireChannelReadComplete();
readIfIsAutoRead();
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
ctx.fireUserEventTriggered(evt);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) {
ctx.fireChannelWritabilityChanged();
}
}
网友评论