上一节学习了inbound
事件的传播,充分理解了在pipeline
中是如何向一个个handler
传播事件的,以channelRead
事件也就是读事件为例,研究了其处理逻辑。
本节学习outbound
事件的传播,和inbound
事件有相似之处。以write
事件为例,进行学习研究。
-
ChannelHandler家族
如下图,ChannelHandler家族
分为ChannelInboundHandler
以及ChannelOutboundHandler
,分别定义了入站处理器以及出站处理器,同时也提供了对应的实现。上一节inbound
事件传播也发现确定是inbound事件
还是outbound事件
是由instanceof
关键子判断是否实现了对应的接口。
ChannelHandler家族
对比一下ChannelInboundHandler
和ChannelOutboundHandler
的方法,可以发现,ChannelInboundHandler
的方法以被动触发为主,而ChannelOutboundHandler
的方法则是主动行为。
ChannelInboundHandler | ChannelInboundHandler |
---|---|
channelRegistered (事件触发:注册) |
deregister (注销) |
channelUnregistered (事件触发:注销) |
disconnect (取消连接) |
channelActive (事件触发:激活连接) |
connect (连接) |
channelRead (事件触发:读) |
read (读) |
channelReadComplete (事件触发:读完成) |
write (写) |
userEventTriggered (事件触发:用户事件) |
flush (刷缓存) |
channelWritabilityChanged (事件触发:通道可读状态被修改) |
close (关闭连接) |
exceptionCaught (事件触发:异常) |
bind (绑定端口) |
-
ChannelOutboundHandler
的执行顺序正好和ChannelInboundHandler
相反,是倒序的。
class DataServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new OutboundHandlerA(),
new OutboundHandlerB(),
new OutboundHandlerC()
);
}
}
class OutboundHandlerA extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("OutboundHandlerA : " + msg);
ctx.write(msg, promise);
}
}
class OutboundHandlerB extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("OutboundHandlerB : " + msg);
ctx.write(msg, promise);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
ctx.executor().schedule(() -> {
ctx.channel().write("hello world");
}, 3, TimeUnit.SECONDS);
}
}
class OutboundHandlerC extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("OutboundHandlerC : " + msg);
ctx.write(msg, promise);
}
}
data:image/s3,"s3://crabby-images/a51b1/a51b1f79aa863fbd2c1673c1401d80fb263cd1d2" alt=""
改变添加handler
的顺序
class DataServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new OutboundHandlerB(),
new OutboundHandlerA(),
new OutboundHandlerC()
);
}
}
data:image/s3,"s3://crabby-images/6d377/6d377252f4313098e016c0a2f740706bcf39a26e" alt=""
- 跟踪
ctx.channel().write("hello world");
@Override
public ChannelFuture write(Object msg) {
//从pipeline开始调用
return pipeline.write(msg);
}
@Override
public final ChannelFuture write(Object msg) {
//从尾节点开始传播
return tail.write(msg);
}
@Override
public ChannelFuture write(Object msg) {
//添加一个回调Promise,包装channel和executor
return write(msg, newPromise());
}
最终调用到AbstractChannelHandlerContext#write()
方法,主要是做了两件事
-
findContextOutbound
方法找到下一个ChannelOutboundHandlerContext
- 判断是否需要
flush
,选择执行write
回调方法之后是否执行flush
回调方法
private void write(Object msg, boolean flush, ChannelPromise promise) {
ObjectUtil.checkNotNull(msg, "msg");
try {
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return;
}
} catch (RuntimeException e) {
ReferenceCountUtil.release(msg);
throw e;
}
//查找下一个ChannelOutboundHandlerContext
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
//判断是否刷新
if (flush) {
//执行写并刷新方法
next.invokeWriteAndFlush(m, promise);
} else {
//执行写方法
next.invokeWrite(m, promise);
}
} else {
final AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
if (!safeExecute(executor, task, promise, m)) {
// We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes
// and put it back in the Recycler for re-use later.
//
// See https://github.com/netty/netty/issues/8343.
task.cancel();
}
}
}
-
findContextOutbound
方法找到下一个ChannelOutboundHandlerContext
private AbstractChannelHandlerContext findContextOutbound() {
//循环往前查找,通过outbound属性判断
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
- 执行
write
回调方法
private void invokeWrite(Object msg, ChannelPromise promise) {
//判断handler的状态是可以执行回调函数的
if (invokeHandler()) {
invokeWrite0(msg, promise);
} else {
write(msg, promise);
}
}
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
//执行回调函数write
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
-
invokeWriteAndFlush
执行完write
回调方法之后执行flush
回调方法
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
//执行write
invokeWrite0(msg, promise);
//执行flush
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}
private void invokeFlush0() {
try {
//回调flush方法
((ChannelOutboundHandler) handler()).flush(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
- 通过跟踪源码,也不难发现无论是从
tail
节点开始还是从当前节点开始调用write
方法,最终都会到head
节点。而头节点正是使用unsafe
来具体完成这些操作的。
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
unsafe.write(msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) {
unsafe.flush();
}
网友评论