如何在句尾增加特殊符号解决粘包/拆包问题呢?
在 netty 的类库中还有这么两个类, LineEncoder 编码类 和 DelimiterBasedFrameDecoder 解码类。
打开 DelimiterBasedFrameDecoder 源码,该类继承自ByteToMessageDecoder,负责根据数据包末尾的特殊符号来对数据包进行拆分。
在 56 行的示例中,加入了换行符作为每个数据包的分隔符。
在该类中,提供了 7 个成员变量:
-
delimiters:一个或多个 ByteBuf 数组类型的分隔符;
-
maxFrameLength:数据包最大长度;
-
stripDelimiter:是否跳过分隔符;
-
failFast:为true时在没有读取完整个数据包时就报错,否则在读完才报错;
-
discardingTooLongFrame:是否丢弃过长的数据;
-
tooLongFrameLength:数据包最大长度;
-
LineBasedFrameDecoder:换行分隔符解码器,只能设置换行符作为分隔符。
在这 7 个参数中,有 5 个成员可以通过该类的 6 个构造函数分别设置,最终在 166 行的构造函数中完成成员变量的赋值。
在该构造函数中,在168 行对数据包的最大长度进行了判断,最大长度不能设置为小于 0 的数字。在 169 行,分隔符不能设置为 null。在 172 行,分隔符数组不能为空数组。在 176 行,判断分隔符是否为换行符,如果为换行符则使用 LineBasedFrameDecoder 换行符解码类进行解码。在 182 行,对分隔符数组进行遍历,在 183 行对分隔符所在的 ByteBuf 进行不能为 null 和 必须可读的验证。在 184 行将分隔符加入成员变量delimiters数组中。
isLineBased 方法是验证分隔符是否为换行符的验证。
解码的核心逻辑在 decode 方法中。
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
if (lineBasedDecoder != null) {
return lineBasedDecoder.decode(ctx, buffer);
}
// Try all delimiters and choose the delimiter which yields the shortest frame.
int minFrameLength = Integer.MAX_VALUE;
ByteBuf minDelim = null;
for (ByteBuf delim: delimiters) {
int frameLength = indexOf(buffer, delim);
if (frameLength >= 0 && frameLength < minFrameLength) {
minFrameLength = frameLength;
minDelim = delim;
}
}
if (minDelim != null) {
int minDelimLength = minDelim.capacity();
ByteBuf frame;
if (discardingTooLongFrame) {
// We've just finished discarding a very large frame.
// Go back to the initial state.
discardingTooLongFrame = false;
buffer.skipBytes(minFrameLength + minDelimLength);
int tooLongFrameLength = this.tooLongFrameLength;
this.tooLongFrameLength = 0;
if (!failFast) {
fail(tooLongFrameLength);
}
return null;
}
if (minFrameLength > maxFrameLength) {
// Discard read frame.
buffer.skipBytes(minFrameLength + minDelimLength);
fail(minFrameLength);
return null;
}
if (stripDelimiter) {
frame = buffer.readRetainedSlice(minFrameLength);
buffer.skipBytes(minDelimLength);
} else {
frame = buffer.readRetainedSlice(minFrameLength + minDelimLength);
}
return frame;
} else {
if (!discardingTooLongFrame) {
if (buffer.readableBytes() > maxFrameLength) {
// Discard the content of the buffer until a delimiter is found.
tooLongFrameLength = buffer.readableBytes();
buffer.skipBytes(buffer.readableBytes());
discardingTooLongFrame = true;
if (failFast) {
fail(tooLongFrameLength);
}
}
} else {
// Still discarding the buffer since a delimiter is not found.
tooLongFrameLength += buffer.readableBytes();
buffer.skipBytes(buffer.readableBytes());
}
return null;
}
}
/**
* 通过特殊符号获取数据包长度
*/
private static int indexOf(ByteBuf haystack, ByteBuf needle) {
for (int i = haystack.readerIndex(); i < haystack.writerIndex(); i ++) {
int haystackIndex = i;
int needleIndex;
for (needleIndex = 0; needleIndex < needle.capacity(); needleIndex ++) {
if (haystack.getByte(haystackIndex) != needle.getByte(needleIndex)) {
break;
} else {
haystackIndex ++;
if (haystackIndex == haystack.writerIndex() &&
needleIndex != needle.capacity() - 1) {
return -1;
}
}
}
if (needleIndex == needle.capacity()) {
// Found the needle from the haystack!
return i - haystack.readerIndex();
}
}
return -1;
}
在 2 行代码中,判断成员变量 lineBasedDecoder 是否为 null,如果不为 null 则使用该类对数据包进行解码。
在 8 行,对分隔符数组进行遍历。在 9 行,通过 indexOf 方法获取分隔符所在buffer 的索引。在 10 行,如果数据包长大于 0 并且小于 最小数据包长。在 11-12行,设置最小数据包长等于当前数据包长,设置最小分隔符等于当前分隔符。
在 16 行,最小分隔符不为 null 的时候,获取最小分隔符所在 ByteBuf 的所占的空间大小。在 20 行,如果 discardingTooLongFrame 为 true 时,buffer要丢弃 minFrameLength + minDelimLength 个字节数。
在 34 行,最小包长 minFrameLength 大于最大包长 maxFrameLength时,buffer 要丢弃 minFrameLength + minDelimLength 个字节数。
在 41 行,如果 stripDelimiter 为true 时,在 42 行从 buffer 中根据最小数据包长度 minFrameLength 获取缓冲区子区域的一个新的ByteBuf,在 43 行跳过 buffer 中的特殊字符长度 minDelimLength 的字节。stripDelimiter 默认值为 true,也就是在不设置的时候会走这个逻辑。
stripDelimiter 为 false 的时候,在 45 行从buffer 中根据最小数据包长度加最小分隔符长度 minFrameLength + minDelimLength,获取缓冲区子区域的一个新的 frame 并返回。最小包长在 6 行已设置,为 Integer类型的最大值。
一句话,在 9 行获取数据包中最小索引的分隔符,以及最小索引分隔符的长度。在 42 行,根据最小索引的分隔符分离出一个新的ByteBuf ,并跳过索引,返回新的 ByteBuf,并进行下一轮的分割,直到缓冲区的数据处理完毕。
重要的源码已经看过一遍了,接下来看服务器端代码:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.LineEncoder;
import io.netty.handler.codec.string.LineSeparator;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
/**
* Netty服务器端
* @author 程就人生
* @date 2023年01月06日
* @Description
*
*/
public class TestServer {
public void bind(final int port){
// 配置服务端的Nio线程组,boosGroup负责新客户端接入
EventLoopGroup boosGroup = new NioEventLoopGroup();
// workerGroup负责I/O消息处理
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boosGroup, workerGroup)
// 线程组设置为非阻塞
.channel(NioServerSocketChannel.class)
//连接缓冲池的大小
.option(ChannelOption.SO_BACKLOG, 1024)
//设置通道Channel的分配器
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
//设置长连接
.childOption(ChannelOption.SO_KEEPALIVE, true)
// 采用匿名内部类的方式,声明hanlder
.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 3.1、以特殊符号作为一个包的结束符
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
// ByteBuf 转 字符串
ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
// 字符串 转 ByteBuf
ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
// 3.2 、每次发出去的数据都增加特殊符号
ch.pipeline().addLast(new LineEncoder(new LineSeparator("$_"), CharsetUtil.UTF_8));
// 事件处理绑定
ch.pipeline().addLast(new ServerHandler());
}
});
// 绑定端口
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
// 服务端启动监听事件
channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
public void operationComplete(Future<? super Void> future) throws Exception {
//启动成功后的处理
if (future.isSuccess()) {
System.out.println("服务器启动成功,Started Successed:" + port);
} else {
System.out.println("服务器启动失败,Started Failed:" + port);
}
}
});
// 等待服务端监听端口关闭
channelFuture.channel().closeFuture().sync();
}catch(Exception e){
e.printStackTrace();
}finally{
// 优雅退出
boosGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] argo){
new TestServer().bind(8080);
}
}
/**
* 服务器端handler
* @author 程就人生
* @date 2023年01月06日
* @Description
*
*/
class ServerHandler extends ChannelInboundHandlerAdapter{
// 对接收的消息进行计数
private static int counter;
// I/O消息的接收处理
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg){
try{
// 把接收到的内容输出到控制台
System.out.println("这里是服务器端控制台:" + msg + ",计数:" + ++counter);
// 1.2、发送字符串
String resp = "来自服务器端的消息~!";
// 返回信息给客户端
ctx.writeAndFlush(resp);
}catch(Exception e){
e.printStackTrace();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 遇到异常时关闭ChannelHandlerContext
ctx.close();
}
}
在 54-55 行,加入了 DelimiterBasedFrameDecoder 解码器,设置 $_ 为每个数据包的解码分隔符。
在 63 行,加入了 LineEncoder 编码器,通过这个编码类,可以在每个将要发出去数据包的包尾加入特殊分隔符 $_ ,这样就可以全局控制,随时换分隔符,无需在每个需要发送的数据包后面手动添加了。
客户端代码:
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.LineEncoder;
import io.netty.handler.codec.string.LineSeparator;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
/**
* netty客户端
* @author 程就人生
* @date 2023年01月06日
* @Description
*
*/
public class TestClient {
public void connect(int port, String host){
// 客户端Nio线程组
EventLoopGroup group = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
// 线程组设置为非阻塞
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 3.1、以特殊符号作为一个包的结束符
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
// ByteBuf 转 字符串
ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
// 字符串 转 ByteBuf
ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
// 3.2 、每次发出去的数据都增加特殊符号
ch.pipeline().addLast(new LineEncoder(new LineSeparator("$_"), CharsetUtil.UTF_8));
// 事件处理绑定
ch.pipeline().addLast(new ClientHandler());
}
});
// 建立连接
ChannelFuture channelFuture = bootstrap.connect(host, port);
// 等待服务端监听端口关闭
channelFuture.channel().closeFuture().sync();
}catch(Exception e){
e.printStackTrace();
}finally{
// 优雅退出
group.shutdownGracefully();
}
}
public static void main(String[] argo){
new TestClient().connect(8080, "localhost");
}
}
/**
* 客户端处理handler
* @author 程就人生
* @date 2023年01月06日
* @Description
*
*/
class ClientHandler extends ChannelInboundHandlerAdapter{
// 对接收的消息次数进行计数
private static int counter;
@Override
public void channelActive(ChannelHandlerContext ctx) {
// 1.2、发送字符串
String req = "来自客户端的消息~!";
// 连接成功后,发送消息,连续发送100次,模拟数据交互的频繁
for(int i = 0;i<100;i++){
ctx.writeAndFlush(req);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg){
try{
System.out.println("这里是客户端控制台:" + msg + ",计数:" + ++counter);
}catch(Exception e){
e.printStackTrace();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//释放资源
ctx.close();
}
}
在 41-52 行的编解码,需要和服务端保持一致。接下来分别运行服务器端和客户端代码。
服务器端控制台输出:
客户端控制台输出:
服务器端控制台和客户端控制台,都打印了100次的数据,没有发生粘包/拆包问题。
如果有两个换行符会是什么情况呢?
服务器端编码调整第一部分:在解码的时候,增加一个特殊分隔符 @@。
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
ByteBuf delimiter1 = Unpooled.copiedBuffer("@@".getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter, delimiter1));
服务器端调整第二部分:在响应客户端的时候,增加特殊字符并增加文字。由于编码器 LineEncoder 只支持一组字符,因此我们只能在发送的文字里加第二组分隔符。
// 1.2、发送字符串
String resp = "来自服务器端的消息~!@@来自服务器端的消息~!";
客户端修改同服务器保持一致。
服务器端运行结果,客户端发了 100 条消息,但是有两个分隔符,每条消息都被拆成了两个数据包,因此最后计数为 200,没问题。
客户端运行结果:服务器收到了 200 条数据,响应客户端 200 条数据,响应客户端的一条数据里又有两个特殊字符,因此客户端收到的数据计数为 400,也没有问题。
以上便是 LineEncoder 编码器和 DelimiterBasedFrameDecoder 解码器的固定搭配。
网友评论