美文网首页IT@程序员猿媛SpringBoot精选
Netty中粘包拆包,小试牛刀

Netty中粘包拆包,小试牛刀

作者: 程就人生 | 来源:发表于2019-10-13 18:41 被阅读0次

TCP是个"流"协议,它处于网络体系中的传输层,HTTP处于应用层,相对于HTTP协议更加底层;既然是流协议,又处于底层,所以它不关心上层的业务逻辑,也有可能随时断流(被截断),但是对于上层的业务逻辑来说,这是不允许的;这种情况,就是传说中的粘包、拆包。

粘包和拆包的情况又分为几种:
有两个数据包,每个数据包都完整发送,这是正常的情况;
两个包之间,粘在了一起,不分彼此,这是第一种异常情况,也成为粘包;
第一个包和第二个包的部分粘在一起,第二个包的剩余部分又单独成包,这是第二种异常情况;
第一个包和第二个包的部分粘在一起,第二个包又分为几个包,这是第三种异常情况。

先看一个粘包拆包的例子,首先看server端的代码:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * 服务端启动类
 * @author 程就人生
 * @date 2019年10月13日
 */
public class TestServer {

    public void bind(int port){
               //配置服务端的Nio线程组,boosGroup负责新客户端接入
        EventLoopGroup boosGroup = new NioEventLoopGroup();
               //workerGroup负责I/O消息处理
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try{            
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boosGroup, workerGroup)
                        //设置为非阻塞
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 1024)
            //采用匿名内部类的方式,声明hanlder
            .childHandler(new ChannelInitializer<SocketChannel>(){
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                                      //事件处理绑定
                      ch.pipeline().addLast(new ServerHandler());
                }               
            });
            //绑定端口
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            //等待服务端监听端口关闭
            channelFuture.channel().closeFuture().sync();
        }catch(Exception e){
            e.printStackTrace();
        }finally{
                        //优雅退出
            boosGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
        
    }
    
    public static void main(String[] argo){
        new TestServer().bind(8080);
    }
}

import java.util.Date;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
 * 服务端模拟
 * @author 程就人生
 * @date 2019年10月13日
 */
public class ServerHandler extends ChannelInboundHandlerAdapter{
    
    private static int counter;
        //I/O消息的接收处理
    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg){
        try{
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            String body = new String(req, "UTF-8").substring(0, req.length - System.getProperty("line.separator").length());
            //把接收到的内容输出到控制台
            System.out.println("the server receive:" + body + "the count is " + ++counter);
            String currentTime = "current time:".equals(body)?new Date(System.currentTimeMillis()).toString():"BAD ORDER";
            ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
            //返回信息给客户端
            ctx.writeAndFlush(resp);            
        }catch(Exception e){
            e.printStackTrace();
        }
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
      //遇到异常时关闭ChannelHandlerContext 
       ctx.close();
    }
}

再看client端的代码:

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
 * 客户端模拟
 * @author 程就人生
 * @date 2019年10月13日
 */
public class TestClient {

    public void connect(int port, String host){
                 //客户端Nio线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try{   
                       Bootstrap bootstrap = new Bootstrap();
                        //线程组设置为非阻塞
            bootstrap.group(group).channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)            .handler(new ChannelInitializer<SocketChannel>(){
                @Override
                protected void initChannel(SocketChannel ch) throws Exception { 
                                        //事件处理绑定
                    ch.pipeline().addLast(new ClientHandler());
                }
            });
            
            //建立连接
            ChannelFuture channelFuture = bootstrap.connect(host, port);
            //等待服务端监听端口关闭
            channelFuture.channel().closeFuture().sync();
            
        }catch(Exception e){
            e.printStackTrace();
        }finally{
                        //优雅退出
            group.shutdownGracefully();
        }
    }
    
    public static void main(String[] argo){
        new TestClient().connect(8080, "localhost");
    }
}

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class ClientHandler extends ChannelInboundHandlerAdapter{
    
    private static int counter;
    
    private byte[] req;
    
    public ClientHandler(){
        req = "current time:".getBytes();
    }
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ByteBuf firstMessage = null;
        //连接成功后,发送消息,连续发送100次,模拟数据交互的频繁
        for(int i = 0;i<100;i++){
            firstMessage = Unpooled.buffer(req.length);
            firstMessage.writeBytes(req);
            ctx.writeAndFlush(firstMessage);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg){
        try{
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            String body = new String(req, "UTF-8");
            System.out.println("now is:" + body + ";the counter is " + ++counter);
        }catch(Exception e){
            e.printStackTrace();
        }
        
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    //释放资源
       ctx.close();
    }
}

启动服务端,再启动客户端,可以看到控制台的输出:


服务端控制台 客户端控制台

在客户端写个循环,或者说客户端发送的信息多一点,就出现粘包情况了,这个错误可不能出现,必须解决;如何解决粘包呢,可以使用Netty框架中的LineBasedFrameDecoder+StringDecoder(按行切换的文字解码器)来防止粘包,在使用LineBasedFrameDecoder 中有一点特别重要,就是需要在要发送的信息后增加换行符,否则接收端收到信息也不显示;接收方会认为这不是一个整包,不是一个整包,就不会输出,直到接到一个完整的包才输出。

服务端代码改动处第一处

//采用匿名内部类的方式,声明hanlder
.childHandler(new ChannelInitializer<SocketChannel>(){
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    //使用换行符做粘包拆包的处理,所以发送的信息必须加上System.getProperty("line.separator"),否则对方接收不到
                    ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                    //字符串转对象解码器
                    ch.pipeline().addLast(new StringDecoder());                     ch.pipeline().addLast(new ServerHandler());
                }               
            });

服务端改动第二处:

@Override
    public void channelRead(ChannelHandlerContext ctx,Object msg){
        try{
                        //此处已经LineBasedFrameDecoder、StringDecoder的解码,可以直接接收
            String body = (String) msg;
            //把接收到的内容输出
            System.out.println("the server receive:" + body + "the count is " + ++counter);
            String currentTime = "current time:".equals(body)?new Date(System.currentTimeMillis()).toString():"BAD ORDER";
            //发送的消息,必须加换行符,否则客户端接收不到
            currentTime = currentTime + System.getProperty("line.separator");
            ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
            //返回信息给客户端
            ctx.writeAndFlush(resp);            
        }catch(Exception e){
            e.printStackTrace();
        }
    }

客户端改动第一处:

.handler(new ChannelInitializer<SocketChannel>(){
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    //和服务器端保持一致
                    ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                    ch.pipeline().addLast(new StringDecoder());                 ch.pipeline().addLast(new ClientHandler());
                }
            });

客户端改动第二处:

public ClientHandler(){
        //客户端发送信息需要加换行符,特别重要,否则服务端接收不到;
        req = ("current time:" + System.getProperty("line.separator")).getBytes();
    }
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ByteBuf firstMessage = null;
        //连接成功后,发送消息,发送100吃,模拟信息发送的频繁
        for(int i = 0;i<100;i++){
            firstMessage = Unpooled.buffer(req.length);
            firstMessage.writeBytes(req);
            ctx.writeAndFlush(firstMessage);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg){
        try{
                        //此处已经LineBasedFrameDecoder、StringDecoder的解码,可以直接接收
            String body = (String) msg;
            System.out.println("now is:" + body + ";the counter is " + ++counter);
        }catch(Exception e){
            e.printStackTrace();
        }
        
    }

最后运行测试:

服务端测试结果图
客户端测试结果图

除了使用LineBasedFrameDecoder + StringDecoder(按行切换的文字解码器)解决TCP粘包的问题,还可以使用DelimiterBasedFrameDecoder,这个类可以根据自定义的特殊符号来作为一个包的结束,与LineBasedFrameDecoder的使用一样,只需将要发送消息的结尾加上特殊符号即可;

服务器端代码改动处:

.childHandler(new ChannelInitializer<SocketChannel>(){
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    //以特殊符号作为一个包的结束符
                    ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
                    ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
                    //字符串转对象
                    ch.pipeline().addLast(new StringDecoder()); 
                    ch.pipeline().addLast(new ServerHandler());
                }               
            });

客户端对应的地方和服务端保持一致即可,另外发送的消息由换行符变成了自己定义的$_即可,其他地方无需变动,下面看测试结果:


服务端测试结果
客户端测试结果

总结
本文编写了两种解决TCP流传输过程中粘包拆包的处理demo,分别使用了Netty框架中的类,第一个是LineBasedFrameDecoder+StringDecoder按行切换的文字解码器,第二个是DelimiterBasedFrameDecoder+StringDecoder按分隔符切换的文字加码器;这两种解码器是不是有着相似之处呢。至于实现原理,还需要自个儿深入源码去看。

相关文章

  • Netty系列(3)TCP的粘包拆包问题及方案

    1.概述 1.1 粘包拆包问题描述 1.2 粘包拆包产生的原因 1.3 粘包拆包问题的解决思路 2.Netty中粘...

  • Netty中粘包拆包,小试牛刀

    TCP是个"流"协议,它处于网络体系中的传输层,HTTP处于应用层,相对于HTTP协议更加底层;既然是流协议,又处...

  • Netty 权威指南笔记(三):TCP 粘包和拆包

    Netty 权威指南笔记(三):TCP 粘包和拆包 什么是 TCP 粘包和拆包? TCP 是一个“流”协议,所谓“...

  • Netty 粘包拆包

    粘包示例 服务端 客户端 自定义协议处理粘包问题

  • Netty-TCP拆包/粘包

    Netty-TCP拆包/粘包 TCP拆包/粘包 TCP 是一个面向字节流的协议,它是性质是流式的,所以它并没有分段...

  • Netty粘拆包

    本文的示例代码参考NettySticky 目录 准备 NettyStartupProtocolClientServ...

  • netty------拆包粘包

    在上篇博客中介绍了netty的helloworld,本篇来介绍netty的拆包粘包问题。 TCP是一个 流 的协议...

  • Netty的拆包粘包

    包是什么 包的定义:客户端和服务端在发送数据的过程中,每次会以一个数据包的形式进行数据传输,比如登录会发送一个包含...

  • Netty TCP 粘包 & 拆包

    粘包/拆包 TCP 的一个数据包可能包含一个、多个或者不足一个应用层数据包,程序需要按照一整个应用包进行处理,这就...

  • netty粘包和拆包

    粘包和拆包是TCP网络编程中不可避免的,无论是服务端还是客户端,当我们读取或者发送消息的时候,都需要考虑TCP底层...

网友评论

    本文标题:Netty中粘包拆包,小试牛刀

    本文链接:https://www.haomeiwen.com/subject/xlmhmctx.html