在Netty学习笔记(二)中我们介绍了服务端消息接收的处理流程,通过pipeline.fireChannelRead来回调pipeline处理器链中每一个inboundHandler的channelRead方法。
本篇我们将简单分析下消息发送的处理流程。
通常我们会在最后一个inboundHandler的channelRead方法中完成业务逻辑的处理,并将响应消息体调用ChannelHandlerContext.writeAndFlush(response)发送回远端。
相关类的继承结构:
image.png
结合Netty学习笔记(一),pipeline.addLast(ChannelHandler... handlers)最终会通过new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);来对每一个处理器进行包装,并添加到pipeline的处理器链中。
详细代码逻辑:
//AbstractChannelHandlerContext.writeAndFlush
@Override
public ChannelFuture writeAndFlush(Object msg) {
return writeAndFlush(msg, newPromise());
}
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
write(msg, true, promise);
return promise;
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
//此处为核心代码
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}
//outbound调用顺序
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
关于findContextOutbound在Netty学习笔记(二)中已涉及,与inbound的执行顺序相反,依次调用每一个执行器的write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise),最终执行到pipeline处理器器链的HeadContext处理器。
//HeadContext.write
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
//在此处调用unsafe实现将消息发送到远端
unsafe.write(msg, promise);
}
消息发送逻辑比较简单,本篇到此结束。
网友评论