客户端发送的数据在的服务端读取过程是在NioSocketChannel通道中经过了Pipeline链的处理,在这个过程中会涉及数据粘包半包的问题。为了解决粘包半包问题,可以使用了Netty当中已经写好的的解码器。
github测试代码
NioSocketChannel通道数据写入到ByteBuf缓冲
处理入口和服务端接受客户端连接是在相同的地方,只是实现类不同
NioByteUnsafe#read调用链
io.netty.channel.nio.NioEventLoop#run
io.netty.channel.nio.NioEventLoop#processSelectedKeys
io.netty.channel.nio.NioEventLoop#processSelectedKeysOptimized
io.netty.channel.nio.NioEventLoop#processSelectedKey

readyOps = 1,执行读取的unsafe对象是NioSocketChannel$NioSocketChannelUnsafe
NioByteUnsafe#read
io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read

在获取ChannelConfig、ChannelPipeline对象之后创建ByteBuf对象,使用的是直接内存不是JVM内的堆内存,这是netty高性能的一个原因。
byteBuf = "PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 1024)"
此时byteBuf 的读指针ridx: 0,写指针是widx: 0,容量大小是cap: 1024
ByteBuf指针结构

0到readerIndex读指针位置是可丢弃discardable字节
readerIndex读指针位置到writerIndex写指针位置是可读readable字节
writerIndex写指针位置capacity最大容量是可写writable字节
把通道数据写入到byteBuf
1、doReadBytes
io.netty.channel.socket.nio.NioSocketChannel#doReadBytes

2、writeBytes
io.netty.buffer.AbstractByteBuf#writeBytes

in = SocketChannelImpl java.nio.channels.SocketChannel
ByteBuf= "PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 1024)"
读指针0,写指针0,容量1024
3、setBytes
io.netty.buffer.PooledByteBuf#setBytes


4、ByteBuf指针变化
从通道写到ByteBuf缓冲中,并设置writerIndex的位置(此时,我发送的是:123456789回车)。客户端发送是以"\r\n"结尾(channel.writeAndFlush(bufferedReader.readLine()+"\r\n");)
ByteBuf从通道读取数据后的变化(容量大小cap=1024就是1024个小框框),读指针没有变化ridx: 0,写指针widx: 11

ByteBuf数据在pipeline传播读取
读到ByteBuf后,便把读取到的数据缓冲区在pipeline传播调用ChannelRead方法
1、pipeline链


2、自定义的ServerInitializer的initChannel方法(与pipeline链结构相对应)

3、pipeline链结构

HeadContext和TailContext既是出站又是入站
ChannelOutboundHandler(出站):StringEncoder
ChannelInboundHandler(入站):DelimiterBasedFrameDecoder、StringDecoder、ServerHandler
所以,读取数据是经过入站HandlerContext
粘包和半包
粘包:服务端从 TCP 缓存就可能一下读取了多个包(一个包小于缓存的大小)
半包:收到了全包的一部分(一个包大于缓存的大小)
产生的根本原因就是TCP 是流式协议,消息没有边界。解决就是为消息设置边界
Netty当中的解码器,解决了粘包和半包问题
①、FixedLengthFrameDecoder 基于固定长度的解码器
②、LineBasedFrameDecoder 基于行(\n,\r)的解码器
③、DelimiterBasedFrameDecoder 基于分隔符的解码器
④、LengthFieldBasedFrameDecoder 基于长度的解码器
DelimiterBasedFrameDecoder解码器
ChannelPipeline中HeadContext调用channelRead方法
1、channelRead
io.netty.channel.DefaultChannelPipeline.HeadContext#channelRead
直接向下一个调用

2、fireChannelRead
io.netty.channel.AbstractChannelHandlerContext#fireChannelRead
fireXXX便是查找下一个

io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead
ChannelHandlerContext(DelimiterBasedFrameDecoder 读取缓冲区PooledUnsafeDirectByteBuf(ridx: 0, widx: 11, cap: 1024)数据

ChannelPipeline中DelimiterBasedFrameDecoder调用channelRead方法
从缓冲区读取数据所涉及粘包半包问题,便是通过这些入站实现类处理的
DelimiterBasedFrameDecoder类继承了ByteToMessageDecoder,这个类没有channelRead方法,直接调用父类的
io.netty.handler.codec.ByteToMessageDecoder#channelRead

这个方法主要步骤:
①、定义CodecOutputList变量out
②、把传入的msg赋值到cumulation变量
③、调用callDecode方法,把读取数据添加到out
1、Cumulator#cumulate
io.netty.handler.codec.ByteToMessageDecoder.Cumulator#cumulate
把传入的msg赋值到cumulation变量
in = "PooledUnsafeDirectByteBuf(ridx: 0, widx: 11, cap: 1024)"
cumulation = "EmptyByteBufBE"直接释放cumulation内存

2、ByteToMessageDecoder#callDecode
①、callDecode
io.netty.handler.codec.ByteToMessageDecoder#callDecode
对cumulation变量(in)是否有可读指针(判断逻辑是writerIndex > readerIndex;)

②、decodeRemovalReentryProtection
io.netty.handler.codec.ByteToMessageDecoder#decodeRemovalReentryProtection

③、decode
io.netty.handler.codec.DelimiterBasedFrameDecoder#decode


lineBasedDecoder是创建DelimiterBasedFrameDecoder对象赋的值
3、LineBasedFrameDecoder#decode
io.netty.handler.codec.LineBasedFrameDecoder#decode

1、findEndOfLine获取结束位置eol('\r的位置')
io.netty.handler.codec.LineBasedFrameDecoder#findEndOfLine
找到\r的结束为止

2、length变量是可读长度
3、delimLength限定符长度 eol位置是\r 则是2
4、把buffer数据读取指定length长度数据到frame
5、跳过delimLength字节数据
6、返回frame
4、返回的读取数据decode添加到out
io.netty.handler.codec.DelimiterBasedFrameDecoder#decode

5、channelRead向下传播调用
io.netty.handler.codec.ByteToMessageDecoder#channelRead

总结:
1、读取数据与服务端接受连接是调用相同的方法只是实现类不同
接受连接是NioMessageUnsafe、读取数据NioByteUnsafe
接受连接是boss主Reactor线程,读取数据是work从Reactor线程
2、数据的读取就是从通道读到直接内存缓存区
3、直接内存缓存区数据需要经过Pipeline链的调用读取
4、为了解决TCP的粘包半包需要使用Netty提供或自定义的解码器
网友评论