源码分析基于netty 4
前面已经说过netty对accept事件的处理,现在来讲讲netty中的read/write过程。
开始前,先重点说说netty的Handler/Pipeline
netty的Handler分为ChannelInboundHandler、ChannelOutboundHandler两大类。ChannelInboundHandler对从客户端发往服务器的报文进行处理,一般用来执行解码、读取客户端数据、进行业务处理等;ChannelOutboundHandler对从服务器发往客户端的报文进行处理,一般用来进行编码、发送报文到客户端。
Handler是netty提供的扩展点,非常重要。通过Handler可以完成通讯报文的解码编码、拦截指定的报文、统一错误处理、请求计数等工作。众多的网络协议都是通过Handler完成的, 如http、自定义rpc等。
Netty中,可以注册多个handler,形成Pipeline。ChannelInboundHandler按照注册的先后顺序执行;ChannelOutboundHandler按照注册的先后顺序逆序执行
netty在ChannelPipeline类的注释中给出了如下示意图
I/O Request
via Channel or
ChannelHandlerContext
|
+---------------------------------------------------+---------------+
| ChannelPipeline | |
| \|/ |
| +---------------------+ +-----------+----------+ |
| | Inbound Handler N | | Outbound Handler 1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler N-1 | | Outbound Handler 2 | |
| +----------+----------+ +-----------+----------+ |
| /|\ . |
| . . |
| ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
| [ method call] [method call] |
| . . |
| . \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 2 | | Outbound Handler M-1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 1 | | Outbound Handler M | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
+---------------+-----------------------------------+---------------+
| \|/
+---------------+-----------------------------------+---------------+
| | | |
| [ Socket.read() ] [ Socket.write() ] |
| |
| Netty Internal I/O Threads (Transport Implementation) |
+-------------------------------------------------------------------+
就是Channel/ChannelHandlerContext writer会依次通过Outbound处理, 最后通过socket write出去
从socket read到的数据, 也会依次交给Inbound处理。
netty中还定义了ChannelInboundInvoker/ChannelOutboundInvoker, Invoker和ChannelHandler接口基本一致,只是参数少了ChannelHandlerContext(Invoker接口少了ChannelHandlerContext上下文)。
ChannelHandlerContext和ChannelPipeline接口都继承了ChannelInboundInvoker和ChannelOutboundInvoker
DefaultChannelPipeline是netty中的核心Pipeline, 聚合了ChannelHandlerContext, ChannelHandlerContext可以看做pipeline的节点。 HeadContext/TailContext(图中没展示)是Pipeline的开始/结束节点。
当触发ChannelPipeline的事件时, netty会将事件委派给ChannelHandlerContext, 再由ChannelHandlerContext委派到ChannelHandler进行处理。
ChannelHandler中有handlerAdded/handlerRemoved方法,当一个ChannelHandler添加/移除Pipeline中,会触发这些事件。
ChannelInitializer是一个特殊的InboundHandler,提供了抽象的initChannel方法,用于提供给用户向pipeline添加自定义的ChannelHandler
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
initChannel(ctx);
}
}
可以看到, ChannelInitializer添加到Pipeline后, 会调用initChannel方法
值得注意的是,DefaultChannelPipeline的handlerAdded是通过task执行的(不过有时也会强行执行)。
可以看一下DefaultChannelPipeline.addLast
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
// 如果registered为false,那channel就没有注册到任何eventloop
// 所以调用callHandlerCallbackLater方法,延时进行
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
// 立即执行task
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
callHandlerCallbackLater就是把task放到pendingHandlerCallbackHead,当loop启动后,会执行这些task
回想一下启动注册过程中调用的AbstractUnsafe.register0方法, 会调用pipeline.invokeHandlerAddedIfNeeded();
, 这里会强制执行pendingHandlerCallbackHead。
先来看一个小栗子
我们将tcp请求的内容通过"|"分割为一个字符串数组,进行逻辑处理后,再将结果数组用"|"合并为一个字符串返回给用户。
负责分割的InboundHandler
public class SplitHandler extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("split in inboundHandler channelRead");
ByteBuf in = (ByteBuf) msg;
byte[] bytes = new byte[in.writerIndex()];
in.readBytes(bytes);
String s = new String(bytes);
System.out.println("read string : " + s);
ctx.fireChannelRead(s.split("\\|"));
}
}
负责合并的OutboundHandler
public class MergeHandler extends ChannelOutboundHandlerAdapter {
public void read(ChannelHandlerContext ctx) throws Exception {
System.out.println("outbound read");
ctx.read();
}
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("outbound write");
String[] arr = (String[])msg;
StringBuilder sb = new StringBuilder();
for (int i = 0; i < arr.length; i++) {
if(i > 0) {
sb.append("|");
}
sb.append(arr[i]);
}
System.out.println("response str : " + sb.toString());
byte[] bytes = sb.toString().getBytes();
ByteBuf byteBuf = ctx.alloc().buffer(bytes.length);
byteBuf.writeBytes(bytes);
ctx.write(byteBuf, promise);
}
}
这里重写read方法, 主要是想关注该方法的触发时机。
简单的逻辑处理:
public class LogicHandler extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server inboundHandler channelRead");
String[] arr = (String[])msg;
for (int i = 0; i < arr.length; i++) {
String s = arr[i];
System.out.println("split result : " + s);
arr[i] = "ok for " + s;
}
ctx.writeAndFlush(arr);
}
}
server端
ServerBootstrap b = new ServerBootstrap();
b.group(parentGroup, childGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new MergeHandler());
p.addLast(new SplitHandler());
p.addLast(new LogicHandler());
}
});
运行栗子后, 用telnet发送123|456字符串, 得到结果
binecy ~/work/shadowsocks $ telnet 127.0.0.1 8007
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
123|345
ok for 123|ok for 345
服务端输出
split in inboundHandler channelRead
read string : 123|456
logic handle in inboundHandler channelRead
split result : 123
split result : 456
merge in outboundHandler write
response str : ok for 123|ok for 456
fire outboundHandler read
以这个栗子入手,分析netty中read/write过程
注意一下, accept后, netty中pipeline如下:
head > ServerBootstrapAcceptor > MergeHandler > SplitHandler > LogicHandler > tail
read
回顾一下, NioEventLoop中对read事件的处理
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
int readyOps = k.readyOps();
...
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
return;
}
}
}
unsafe.read()
这个方法, 会调用到AbstractNioByteChannel.read():
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
do {
// 申请缓存区空间
byteBuf = allocHandle.allocate(allocator);
// 从socket读取数据到缓存区
allocHandle.lastBytesRead(doReadBytes(byteBuf));
allocHandle.incMessagesRead(1);
// 触发ChannelRead事件
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
// 触发ChannelReadComplete事件
pipeline.fireChannelReadComplete();
pipeline.fireChannelRead
触发read事件,看看DefaultChannelPipeline的处理
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
很简单, pipeline的事件会委托给ChannelHandlerContext处理, 从head开始处理
invokeChannelRead会调用到AbstractChannelHandlerContext.invokeChannelRead
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) { // 检查handler的状态
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
invokeHandler()会检查ChannelHandler是否已经调用了handlerAdded
看看HeadContext.channelRead
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
ctx.fireChannelRead调用的是AbstractChannelHandlerContext.fireChannelRead:
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
}
findContextInbound会找到下一个InboundHandler
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
ctx.fireChannelRead(msg);
的作用就是找到下一个InboundHandler,并调用它的fireChannelRead方法, 所以我们重写InboundHandler的fireChannelRead方法,方法最后也要调用ctx.fireChannelRead(msg);
,以免调用链就此断掉, 除非使用write。这里会沿pipeline,依次查找InboundHandler并fireChannelRead方法。
write
下面看看write过程
LogicHandler中调用ctx.writeAndFlush
触发write过程, 调用到AbstractChannelHandlerContext.write:
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}
findContextOutbound会找到当前节点前一个OutboundHandler
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
next.invokeWriteAndFlush还是调用到AbstractChannelHandlerContext.invokeWriteAndFlush
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}
invokeWrite0也比较简单, 就是调用handler的处理
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
LogicHandler前一个OutboundHandler是MergeHandler, 所以会调用到MergeHandler.write方法, 进行字符串数组合并。
MergeHandler.write调用ctx.write
会调用HeadContext, 注意, HeadContext既是InboundHandler, 也是OutboundHandler
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
这里调用了AbstractUnsafe.write, 将数据write到socket中,具体过程这里不再描述。
invokeFlush0();
也是类似的流程, 这里不再复述。
那么OutboundHandler的read事件, 是在哪里触发的呢? 其实是在fireChannelReadComplete中
pipeline.fireChannelReadComplete(); 会调用到DefaultChannelPipeline.channelReadComplete
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
readIfIsAutoRead();
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
如果配置为AutoRead, 就会调用channel.read(), 进而调用 pipeline.read(), 最终就会触发MergeHandler.read方法。
到这里, netty启动, accept, read/write的一个完整流程都讲完了。
netty是非常优秀的框架, 模块化做到很好, 对jdk的future, buffer这些模块都做了扩展,还自行进行了内存管理。
对netty流程熟悉后, 就可以继续学习netty的这些闪光点, 网上也有很多优秀的教程了。
下面是一些非常优秀的netty博客:
Netty源码分析-占小狼
Netty那点事-黄亿华
Netty系列之Netty线程模型-李林锋
Netty系列之Netty高性能之道-李林锋
Netty_in_Action-译文
网友评论