
TCP是个"流"协议,它处于网络体系中的传输层,HTTP处于应用层,相对于HTTP协议更加底层;既然是流协议,又处于底层,所以它不关心上层的业务逻辑,也有可能随时断流(被截断),但是对于上层的业务逻辑来说,这是不允许的;这种情况,就是传说中的粘包、拆包。
粘包和拆包的情况又分为几种:
有两个数据包,每个数据包都完整发送,这是正常的情况;
两个包之间,粘在了一起,不分彼此,这是第一种异常情况,也成为粘包;
第一个包和第二个包的部分粘在一起,第二个包的剩余部分又单独成包,这是第二种异常情况;
第一个包和第二个包的部分粘在一起,第二个包又分为几个包,这是第三种异常情况。

先看一个粘包拆包的例子,首先看server端的代码:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* 服务端启动类
* @author 程就人生
* @date 2019年10月13日
*/
public class TestServer {
public void bind(int port){
//配置服务端的Nio线程组,boosGroup负责新客户端接入
EventLoopGroup boosGroup = new NioEventLoopGroup();
//workerGroup负责I/O消息处理
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boosGroup, workerGroup)
//设置为非阻塞
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
//采用匿名内部类的方式,声明hanlder
.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//事件处理绑定
ch.pipeline().addLast(new ServerHandler());
}
});
//绑定端口
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
//等待服务端监听端口关闭
channelFuture.channel().closeFuture().sync();
}catch(Exception e){
e.printStackTrace();
}finally{
//优雅退出
boosGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] argo){
new TestServer().bind(8080);
}
}
import java.util.Date;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* 服务端模拟
* @author 程就人生
* @date 2019年10月13日
*/
public class ServerHandler extends ChannelInboundHandlerAdapter{
private static int counter;
//I/O消息的接收处理
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg){
try{
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8").substring(0, req.length - System.getProperty("line.separator").length());
//把接收到的内容输出到控制台
System.out.println("the server receive:" + body + "the count is " + ++counter);
String currentTime = "current time:".equals(body)?new Date(System.currentTimeMillis()).toString():"BAD ORDER";
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
//返回信息给客户端
ctx.writeAndFlush(resp);
}catch(Exception e){
e.printStackTrace();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//遇到异常时关闭ChannelHandlerContext
ctx.close();
}
}
再看client端的代码:
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* 客户端模拟
* @author 程就人生
* @date 2019年10月13日
*/
public class TestClient {
public void connect(int port, String host){
//客户端Nio线程组
EventLoopGroup group = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
//线程组设置为非阻塞
bootstrap.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//事件处理绑定
ch.pipeline().addLast(new ClientHandler());
}
});
//建立连接
ChannelFuture channelFuture = bootstrap.connect(host, port);
//等待服务端监听端口关闭
channelFuture.channel().closeFuture().sync();
}catch(Exception e){
e.printStackTrace();
}finally{
//优雅退出
group.shutdownGracefully();
}
}
public static void main(String[] argo){
new TestClient().connect(8080, "localhost");
}
}
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class ClientHandler extends ChannelInboundHandlerAdapter{
private static int counter;
private byte[] req;
public ClientHandler(){
req = "current time:".getBytes();
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ByteBuf firstMessage = null;
//连接成功后,发送消息,连续发送100次,模拟数据交互的频繁
for(int i = 0;i<100;i++){
firstMessage = Unpooled.buffer(req.length);
firstMessage.writeBytes(req);
ctx.writeAndFlush(firstMessage);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg){
try{
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("now is:" + body + ";the counter is " + ++counter);
}catch(Exception e){
e.printStackTrace();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//释放资源
ctx.close();
}
}
启动服务端,再启动客户端,可以看到控制台的输出:


在客户端写个循环,或者说客户端发送的信息多一点,就出现粘包情况了,这个错误可不能出现,必须解决;如何解决粘包呢,可以使用Netty框架中的LineBasedFrameDecoder+StringDecoder(按行切换的文字解码器)来防止粘包,在使用LineBasedFrameDecoder 中有一点特别重要,就是需要在要发送的信息后增加换行符,否则接收端收到信息也不显示;接收方会认为这不是一个整包,不是一个整包,就不会输出,直到接到一个完整的包才输出。
服务端代码改动处第一处:
//采用匿名内部类的方式,声明hanlder
.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//使用换行符做粘包拆包的处理,所以发送的信息必须加上System.getProperty("line.separator"),否则对方接收不到
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
//字符串转对象解码器
ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new ServerHandler());
}
});
服务端改动第二处:
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg){
try{
//此处已经LineBasedFrameDecoder、StringDecoder的解码,可以直接接收
String body = (String) msg;
//把接收到的内容输出
System.out.println("the server receive:" + body + "the count is " + ++counter);
String currentTime = "current time:".equals(body)?new Date(System.currentTimeMillis()).toString():"BAD ORDER";
//发送的消息,必须加换行符,否则客户端接收不到
currentTime = currentTime + System.getProperty("line.separator");
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
//返回信息给客户端
ctx.writeAndFlush(resp);
}catch(Exception e){
e.printStackTrace();
}
}
客户端改动第一处:
.handler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//和服务器端保持一致
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new ClientHandler());
}
});
客户端改动第二处:
public ClientHandler(){
//客户端发送信息需要加换行符,特别重要,否则服务端接收不到;
req = ("current time:" + System.getProperty("line.separator")).getBytes();
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ByteBuf firstMessage = null;
//连接成功后,发送消息,发送100吃,模拟信息发送的频繁
for(int i = 0;i<100;i++){
firstMessage = Unpooled.buffer(req.length);
firstMessage.writeBytes(req);
ctx.writeAndFlush(firstMessage);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg){
try{
//此处已经LineBasedFrameDecoder、StringDecoder的解码,可以直接接收
String body = (String) msg;
System.out.println("now is:" + body + ";the counter is " + ++counter);
}catch(Exception e){
e.printStackTrace();
}
}
最后运行测试:


除了使用LineBasedFrameDecoder + StringDecoder(按行切换的文字解码器)解决TCP粘包的问题,还可以使用DelimiterBasedFrameDecoder,这个类可以根据自定义的特殊符号来作为一个包的结束,与LineBasedFrameDecoder的使用一样,只需将要发送消息的结尾加上特殊符号即可;
服务器端代码改动处:
.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//以特殊符号作为一个包的结束符
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
//字符串转对象
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new ServerHandler());
}
});
客户端对应的地方和服务端保持一致即可,另外发送的消息由换行符变成了自己定义的$_即可,其他地方无需变动,下面看测试结果:


总结
本文编写了两种解决TCP流传输过程中粘包拆包的处理demo,分别使用了Netty框架中的类,第一个是LineBasedFrameDecoder+StringDecoder按行切换的文字解码器,第二个是DelimiterBasedFrameDecoder+StringDecoder按分隔符切换的文字加码器;这两种解码器是不是有着相似之处呢。至于实现原理,还需要自个儿深入源码去看。
网友评论