第二次看netty源码,对netty的理解也更深入了点,修改了不少文章内容。
后面有时间再分析一下netty的buffer,codec等内容。
源码分析基于netty 4.1
前面已经说过netty对accept事件的处理,现在来讲讲netty中的read/write过程。
Pipeline
DefaultChannelPipeline是一个netty处理io事件的默认通道,通道中的每个节点都是AbstractChannelHandlerContext,
AbstractChannelHandlerContext.next指向下一个AbstractChannelHandlerContext,prev指向前一个AbstractChannelHandlerContext。
Pipeline是标准的责任链。
AbstractChannelHandlerContext.handler()方法返回一个ChannelHandler,ChannelInboundHandler/ChannelOutboundHandler都继承自这个接口,
我们继承这两个接口的适配类ChannelInboundHandlerAdapter/ChannelInboundHandlerAdapter,编写具体的业务逻辑。
DefaultChannelPipeline固定有两个节点head/tail,addLast会把节点添加到tail前。
read
回顾一下, NioEventLoop中对read事件的处理
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
int readyOps = k.readyOps();
...
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
return;
}
}
}
ch是NioSocketChannel对象, ch.unsafe()返回NioSocketChannel.NioSocketChannelUnsafe,
unsafe.read()
会调用到NioSocketChannelUnsafe父类NioByteUnsafe.read():
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
do {
// 申请缓存区空间
byteBuf = allocHandle.allocate(allocator);
// 从socket读取数据到缓存区
allocHandle.lastBytesRead(doReadBytes(byteBuf));
allocHandle.incMessagesRead(1);
// 触发ChannelRead事件
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
// 触发ChannelReadComplete事件
pipeline.fireChannelReadComplete();
从socket读取数据到byteBuf中,再调用DefaultChannelPipeline.fireChannelRead。
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
调用AbstractChannelHandlerContext静态方法invokeChannelRead,参数是head和msg
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
...
}
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) { // 检查handler的状态
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
invokeHandler()会检查ChannelHandler是否已经调用了handlerAdded
handler()返回一个ChannelHandler,这里再转化为ChannelInboundHandler,并调用它的channelRead。(HeadContext.handler返回this,HeadContext同时实现了ChannelOutboundHandler/ChannelInboundHandler)。
看看HeadContext.channelRead
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
ctx.fireChannelRead调用的是AbstractChannelHandlerContext.fireChannelRead:
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
}
findContextInbound会找到下一个ChannelInboundHandler
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
ctx.fireChannelRead(msg);
的作用就是找到下一个ChannelInboundHandler,并调用它的fireChannelRead方法, 这里会调用到我们实现的ChannelInboundHandler接口,并调用我们重写的fireChannelRead方法,进行逻辑处理。
我们重写的fireChannelRead方法最后要调用ctx.fireChannelRead(msg)
,这样会调用到AbstractChannelHandlerContext.fireChannelRead
, 它会找到下一个InboundHandler并调用fireChannelRead方法,这个数据才能在通道中继续流转(除非调用write相关方法)。
write
下面看看write过程
我们可以通过ChannelHandlerContext .writeAndFlush
写入结果给客户, 它会调用AbstractChannelHandlerContext.write:
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
...
}
}
findContextOutbound会找到当前节点前一个OutboundHandler(write和read的方向相反,这里向前找OutboundHandler)
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
next.invokeWriteAndFlush还是调用到AbstractChannelHandlerContext.invokeWriteAndFlush
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}
invokeWrite0也比较简单, 就是调用handler的处理
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
这里会调用到我们实现的ChannelOutboundHandler,并调用我们重写的write方法,实现业务逻辑。
最后会调用到HeadContext.write, 注意, HeadContext既是InboundHandler, 也是OutboundHandler
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
这里调用了AbstractUnsafe.write, 将数据write到socket中,具体过程这里不再描述。
invokeFlush0();
也是类似的流程, 这里不再复述。
那么ChannelOutboundHandler的read事件, 是在哪里触发的呢? 其实是在fireChannelReadComplete中
pipeline.fireChannelReadComplete(); 会调用到DefaultChannelPipeline.channelReadComplete
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
readIfIsAutoRead();
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
如果配置为AutoRead, 就会调用channel.read(), 进而调用 pipeline.read(), 最终就会触发ChannelOutboundHandler.read方法。
到这里, netty启动, accept, read/write的一个完整流程都讲完了。
netty是非常优秀的框架, 模块化做到很好, 对jdk的future, buffer这些模块都做了扩展,还自行进行了内存管理。
对netty流程熟悉后, 就可以继续学习netty的这些闪光点, 网上也有很多优秀的教程了。
下面是一些非常优秀的netty博客:
Netty源码分析-占小狼
Netty那点事-黄亿华
Netty系列之Netty线程模型-李林锋
Netty系列之Netty高性能之道-李林锋
Netty_in_Action-译文
Netty 源码分析之 一 揭开 Bootstrap 神秘的红盖头 (服务器端)
网友评论