上篇博文我们分析了netty的启动流程。
详细见netty分析(一) -- 服务启动流程。这篇文章,我们来分析下netty的数据处理。
上篇讲到在bossGroup的NioEventLoop中的processSelectedKey函数中会调用unsafe.read()来执行NioServerSocketChannel的的accept操作。
在workerGroup中,NioEventLoop的processSelectedKey函数中会执行socket的数据读取操作,让我们来看一下。
1.读取客户端数据过程
processSelectedKey
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
} catch (CancelledKeyException e) {
unsafe.close(unsafe.voidPromise());
}
}
NioSocketChannel的read方法
public void read() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null) {
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}
if (!config.isAutoRead()) {
removeReadOp();
}
ByteBuf byteBuf = null;
int messages = 0;
boolean close = false;
try {
int byteBufCapacity = allocHandle.guess();
int totalReadAmount = 0;
do {
byteBuf = allocator.ioBuffer(byteBufCapacity);
int writable = byteBuf.writableBytes();
int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount <= 0) {
// not was read release the buffer
byteBuf.release();
close = localReadAmount < 0;
break;
}
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
// Avoid overflow.
totalReadAmount = Integer.MAX_VALUE;
break;
}
totalReadAmount += localReadAmount;
if (localReadAmount < writable) {
// Read less than what the buffer can hold,
// which might mean we drained the recv buffer completely.
break;
}
} while (++ messages < maxMessagesPerRead);
pipeline.fireChannelReadComplete();
allocHandle.record(totalReadAmount);
if (close) {
closeOnRead(pipeline);
close = false;
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close);
}
}
}
逻辑还是比较清晰的,核心操作是这两句"int localReadAmount = doReadBytes(byteBuf);"以及"pipeline.fireChannelReadComplete();"。
前者负责将数据从底层读入ByteBuf,后者负责将数据转交pipline处理。
DefaultChannelPipeline的fireChannelRead实现
public ChannelPipeline fireChannelRead(Object msg) {
head.fireChannelRead(msg);
return this;
}
ChannelPipeline
每个socket绑定了一个pipline,pipline内部维护了一个可迭代的ChannelHandlerContext链表,用于处理io数据,以head标识头部,tail标识尾部。
当我们初始化通道,调用pipline的addLast方法塞入一个ChannelHandler时,实际对handler各个方法是否有@Skip注解做了标记,封装成一个ChannelHandlerContext放入链表的尾部。
这样有数据触发的时候,pipline会从链表首部开始迭代,找到对应能处理相应逻辑的handler进行处理。如果当前的handler需要将数据丢给下一层handler进行处理,需要调用ChannelHandlerContext的fireXXX方法将数据传递下去。
进一步查看DefaultChannelHandlerContext的fireChannelRead方法。
@Override
public ChannelHandlerContext fireChannelRead(Object msg) {
DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_READ);
next.invoker.invokeChannelRead(next, msg);
return this;
}
这里的next上下文是根据handler中未携带@Skip的注解来查找最近的链表节点。进一步查看invokeChannelRead方法。
public void invokeChannelRead(final ChannelHandlerContext ctx, final Object msg) {
if (msg == null) {
throw new NullPointerException("msg");
}
if (executor.inEventLoop()) {
invokeChannelReadNow(ctx, msg);
} else {
safeExecuteInbound(new Runnable() {
@Override
public void run() {
invokeChannelReadNow(ctx, msg);
}
}, msg);
}
}
由于当前是workerGroup中的EventLoop线程,走进invokeChannelReadNow。
public static void invokeChannelReadNow(final ChannelHandlerContext ctx, final Object msg) {
try {
ctx.handler().channelRead(ctx, msg);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
}
}
走到这里,终于和用户代码联系起来了。当接收到客户端数据会调用用户设置的ChannelHandler的channelRead方法。这个很重要。下面我们分析下常见的ChannelHandler。
2 常见的ChannelHandler
由于数据处理的复杂性,Netty针对常见的应用场景给我们封装了一系列的ChannelHandler。先介绍ByteToMessageDecoder的子类。
2.1 ByteToMessageDecoder
2.1.1.LineBasedFrameDecoder
LineBasedFrameDecoder用于以换行符做解码分割符的场景。我们来查看其实现。根据上面的分析,当数据来了首先调用到channelRead方法。我们查看其实现
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
RecyclableArrayList out = RecyclableArrayList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()) {
expandCumulation(ctx, data.readableBytes());
}
cumulation.writeBytes(data);
data.release();
}
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
cumulation.release();
cumulation = null;
}
int size = out.size();
decodeWasNull = size == 0;
for (int i = 0; i < size; i ++) {
ctx.fireChannelRead(out.get(i));
}
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
注意:Netty框架并不缓存数据,所以当有未处理完的半包,我们需要自己存起来。
ByteToMessageDecoder如其名,只处理ByteBuf类型的输入数据,内部有个cumulation用于缓存半包(确切的说是若干个已读的全包加一个半包).如果每次解码以后,恰好处理完读入的字节没有剩余半包,那么清空cumulation。读入的新数据会附加到cumulation的尾部,如果cumulation剩下的空间不够写入了,则会对cumulation重新分配内存,新的内存大小正好是需要的字节数。
接着对读入的数据进行分包,详细代码如下:
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while(true) {
if (in.isReadable()) {
int outSize = out.size();
int oldInputLength = in.readableBytes();
this.decode(ctx, in, out);
if (!ctx.isRemoved()) {
if (outSize == out.size()) {
if (oldInputLength != in.readableBytes()) {
continue;
}
} else {
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(StringUtil.simpleClassName(this.getClass()) + ".decode() did not read anything but decoded a message.");
}
if (!this.isSingleDecode()) {
continue;
}
}
}
}
return;
}
} catch (DecoderException var6) {
throw var6;
} catch (Throwable var7) {
throw new DecoderException(var7);
}
}
接下来会调用子类的decode方法进行进一步解码,查看LineBasedFrameDecoder的实现:
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
int eol = findEndOfLine(buffer);
int length;
int length;
if (!this.discarding) {
if (eol >= 0) {
length = eol - buffer.readerIndex();
int delimLength = buffer.getByte(eol) == 13 ? 2 : 1;
if (length > this.maxLength) {
buffer.readerIndex(eol + delimLength);
this.fail(ctx, length);
return null;
} else {
ByteBuf frame;
if (this.stripDelimiter) {
frame = buffer.readBytes(length);
buffer.skipBytes(delimLength);
} else {
frame = buffer.readBytes(length + delimLength);
}
return frame;
}
} else {
length = buffer.readableBytes();
if (length > this.maxLength) {
this.discardedBytes = length;
buffer.readerIndex(buffer.writerIndex());
this.discarding = true;
if (this.failFast) {
this.fail(ctx, "over " + this.discardedBytes);
}
}
return null;
}
} else {
if (eol >= 0) {
length = this.discardedBytes + eol - buffer.readerIndex();
length = buffer.getByte(eol) == 13 ? 2 : 1;
buffer.readerIndex(eol + length);
this.discardedBytes = 0;
this.discarding = false;
if (!this.failFast) {
this.fail(ctx, length);
}
} else {
this.discardedBytes = buffer.readableBytes();
buffer.readerIndex(buffer.writerIndex());
}
return null;
}
}
以"\r\n" 或"\n"做分隔符,如果超过了配置的最大长度还没有读到结束符,将调用fireExceptionCaught告知后续节点解码发生异常。
2.1.2.DelimiterBasedFrameDecoder
DelimiterBasedFrameDecoder支持用户以多种特定的分割符来对原始ByteBuf解码。如果是换行符"\n"和"\r\n",且不是DelimiterBasedFrameDecoder的子类,功能和LineBasedFrameDecoder一模一样,内部创建一个LineBasedFrameDecoder对象来处理。
2.1.3.FixedLengthFrameDecoder
定长解码器,没什么好说的。
2.1.4.LengthFieldBasedFrameDecoder
这个比较经常用到,先看构造函数。
public LengthFieldBasedFrameDecoder(
ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,
int lengthAdjustment, int initialBytesToStrip, boolean failFast)
第一个参数是网络传输大小端,默认大端,最后一个参数是解码异常了是否快速报告。先跳过这两个参数,着重讲下其他
-
maxFrameLength 似有协议最大的包长度
-
lengthFieldOffset 私有协议都有一个header,header中有若干个字节是代表数据总长度的,这个字段代表header报文长度字段相对于第一个字节的偏移量。
-
lengthFieldLength 用多少个字节表示Header报文的长度,即从包头开始lengthFieldOffset ~lengthFieldOffset+lengthFieldLength 的数据代表报文总长度。
-
lengthAdjustment header中的报文总长度与body长度的差值,设body+header的实际长度是L,body长度是Lbody那么应该有公式,valueof(lengthFieldOffset ~lengthFieldOffset+lengthFieldLength) - lengthAdjustment= Lbody;
2.2.MessageToMessageDecoder
功能和ByteToMessageDecoder类似,但是支持了泛型的输入数据。
2.3.IdleStateHandler
IdleStateHandler在超过指定的时间通道上未发生读/写/(读和写)事件时,将借助NioEventLoop的定时器功能触发定时任务,满足条件时发出fireUserEventTriggered事件。通常我们可以用这个特性用来控制心跳等逻辑。
网友评论