在进行 TCP 编程时,无论是客户端还是服务器端,我们都需要考虑 TCP 的粘包/拆包问题。
TCP 是流协议,所谓流就是没有界限的一串数据。TCP 底层是不知道上层业务逻辑含义的,只会根据 TCP 缓冲区的实际情况进行包的划分。一个完善的数据包可能会被 TCP 拆分成多个包进行发送,也可能会被 TCP 封装成一个大的数据包进行发送,这就是粘包/拆包的问题。
粘包/拆包发生的原因,大致有以下几个:
-
应用程序 write 写入的字节大小大于套接口发送缓冲区的大小;
-
进行 MSS 大小的 TCP 分段;
-
以太网帧的 payload 大于 MTU 进行 IP 分片;
MSS(Maximum Segment Size,最大报文长度),是TCP协议定义的一个选项,MSS 选项用于在 TCP 连接建立时,收发双方协商通信时每一个报文段所能承载的最大数据长度。
MTU(Maximum Transmission Unit,最大传输单元)用来通知对方所能接受数据服务单元的最大尺寸,说明发送方能够接受的有效载荷大小。
由于底层的 TCP 是无法理解上层的业务数据,也无法保证数据包不被拆分和重组,因此粘包/拆包的问题,只能在上层的应用协议栈进行解决。主流协议的解决方案主要有:
-
消息定长,每个报文的大小固定;
-
在句尾增加换行符或其他特殊符号进行分割;
-
将消息分为消息头和消息体,消息头中有消息总长度;
-
设计更复杂的应用层协议。
先来看一个粘包/拆包的小例子,客户端依次给服务器发送100条数据,服务器端应该接收到100条数据,服务器端接收到100条数据后要回应客户端100条数据,客户端也应该接收到100条数据,而实际上呢,却没有。
服务器端代码:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
/**
* Netty服务器端
* @author 程就人生
* @date 2023年01月03日
* @Description
*
*/
public class TestServer {
public void bind(final int port){
EventLoopGroup boosGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boosGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new ServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
public void operationComplete(Future<? super Void> future) throws Exception {
if (future.isSuccess()) {
System.out.println("服务器启动成功,Started Successed:" + port);
} else {
System.out.println("服务器启动失败,Started Failed:" + port);
}
}
});
channelFuture.channel().closeFuture().sync();
}catch(Exception e){
e.printStackTrace();
}finally{
boosGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] argo){
new TestServer().bind(8080);
}
}
/**
* 服务器端handler
* @author 程就人生
* @date 2023年01月03日
* @Description
*
*/
class ServerHandler extends ChannelInboundHandlerAdapter{
private static int counter;
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg){
try{
System.out.println("这里是服务器端控制台:" + msg + "计数:" + ++counter);
String resp = "Server msg~!";
ctx.writeAndFlush(resp);
}catch(Exception e){
e.printStackTrace();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
第28-29行代码,创建服务器端的 NIO 线程组,其中 boosGroup 线程组专门负责接收客户端的连接,workerGroup 线程组处理 I/O 消息的读写。ServerBootstrap 为辅助启动类。在第 33 行设置为非阻塞模式,第34 行设置 backlog 为 1KM,第35行-36行设置通道 Channel 的分配器,第37 行设置为长连接,第 38 行采用匿名内部类的方式声明 handler。
在41行设置字符串解码器 StringDecoder,将接收到的 Bytebuf 解码为 String字符串。在42 行为字符串编码器 StringEncoder,将要发送的 String 字符串编码为 Bytebuf ,以便于在通道Channel中传输。StringDecoder 和 StringEncoder 经常成对出现,有了这两个类,我们就可以把 String 字符串交给它们,由它们来完成和 Bytebuf 之间的转换。
在46 行绑定端口,在 47 行启动监听事件绑定。在 56 行同步服务器通道关闭。在60-61行服务器优雅关闭。
在80行,定义静态变量,对接收数据进行计数。在85-87行,打印客户端发来的消息,并给客户端进行响应。在95行,遇到异常时关闭连接。
客户端代码:
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
/**
* netty客户端
* @author 程就人生
* @date 2023年01月03日
* @Description
*
*/
public class TestClient {
public void connect(int port, String host){
EventLoopGroup group = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect(host, port);
channelFuture.channel().closeFuture().sync();
}catch(Exception e){
e.printStackTrace();
}finally{
group.shutdownGracefully();
}
}
public static void main(String[] argo){
new TestClient().connect(8080, "localhost");
}
}
/**
* 客户端处理handler
* @author 程就人生
* @date 2023年01月03日
* @Description
*
*/
class ClientHandler extends ChannelInboundHandlerAdapter{
private static int counter;
@Override
public void channelActive(ChannelHandlerContext ctx) {
String req = "Client msg~!";
for(int i = 0;i<100;i++){
ctx.writeAndFlush(req);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg){
try{
System.out.println("这里是客户端控制台:" + msg + ";计数:" + ++counter);
}catch(Exception e){
e.printStackTrace();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
在45行,通过 main 方法传入端口号和 host 。第 25 行创建客户端 NIO 线程组,在29行将线程组设置为非阻塞模式。第 30 行,保持长链接为 true。第 31 行采用匿名内部类的方式声明handler,34 行设置字符串解码器,35 行设置字符串编码器,36 行设置 I/O 消息处理 handler。第 39 行连接服务器端端口号和 host,第 40 行同步关闭事件。
第62行,定义静态变量,对服务器发过来的数据进行计数。在62-68行,连接成功后,连续给服务器端发送100条消息。第75行,对服务器端发过来的信息继续计数。
接下来分别运行服务器端和客户端,服务器端控制台输出:
客户端输出:
通过控制台,只看到两个数据包,服务器端粘包/拆包了,因为服务器端的缓冲区大小为1KB,而发送的数据很小,TCP 底层把他们封装成了一个包进行发送,因此出现了粘包。在第一个包和第二个包之间,还出现了拆包情况,一个数据包被放到了两个缓冲区。毫无疑问,这不是我们想要的结果。
使用第 1 种解决方案,通过定长来控制粘包/拆包的问题。在服务器端、客户端采用匿名内部类的方式声明 handler 的地方,分别加入 FixedLengthFrameDecoder 类,服务器端和客户端发出去的消息长度刚好12个字符,这里就把FixedLengthFrameDecoder 中的固定长度设置为12。服务器端代码调整:
.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new FixedLengthFrameDecoder(12));
ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new ServerHandler());
}
});```
客户端代码调整:
.handler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new FixedLengthFrameDecoder(12));
ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new ClientHandler());
}
});
分别运行服务器端代码和客户端代码,服务器端控制台运行结果如下图所示:
![](https://img.haomeiwen.com/i3816895/78711df13ae98ea0.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
客户端运行结果如下图所示:![](https://img.haomeiwen.com/i3816895/b4ce45b32037735b.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
服务器端控制台、客户端控制台都收到了100条数据,并且没有再发生粘包/拆包的情况。
查看 FixedLengthFrameDecoder 的源码,这个解码类就是把接收到的 Bytebuf 按照固定的长度进行分割,因此接收到的都是等长的数据包。 FixedLengthFrameDecoder 单独使用,无需对应的编码类。
![](https://img.haomeiwen.com/i3816895/414d990317e26d86.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
当然这个解码类也有它的局限,在实际的业务场景中,不可能都是等长的数据包,数据包的长度很可能经常变化,这样这个解码器就不能满足我们的需求了。
网友评论