美文网首页
Chapter4 TCP粘包/拆包问题的解决之道

Chapter4 TCP粘包/拆包问题的解决之道

作者: YaleWei | 来源:发表于2018-12-07 10:38 被阅读0次

    4.1 TCP粘包/拆包

    TCP是个"流"协议,是没有界限的一串数据。所以一个业务上认为的一个完整的包可能会被拆分成多个包发送,多个完整的包也可能被封装成一个大的数据包发送,这就是TCP粘包/拆包问题。

    4.1.1 TCP粘包/拆包问题说明

    假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到字节数不确定,故可能存在以下情况:
    ( 1 )服务端费两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包
    ( 2 )服务端一次性读取到了D1和D2粘合在一起的数据包,这种现象称为粘包
    ( 3 )服务端分两次读取到了两个数据包,第一次读取了完成的D1和D2的部分内容,第二次读取了D2的剩余内容,这种称为拆包
    ( 4 )如果服务端接收窗口非常小,数据包较大,可能发生多次拆包。


    粘包拆包示意图.png
    4.1.3 粘包问题的解决策略

    犹豫底层的TCP无法理解上层业务数据,这个问题只能通过上层的应用协议栈设计来解决,归纳如下 :
    ( 1 )消息定长,例如每个报文大小固定200字节,如果不够,空格补位
    ( 2 )在包尾增加回车换行符进行分割,如FTP协议
    ( 3 )将消息分为消息头和消息体,消息头包含标识消息长度的字段
    ( 4 )更复杂的应用层协议

    4.2 未考虑TCP 粘包导致的异常案例

    下面是一个时间服务器的demo,并没有考虑读半包问题。按照设计初衷,我们的本来意图是读取一条消息后,记一次数,然后发送应答消息给客户端,而且请求消息应该为"QUERY TIME ORDER"才会返回时间,否则返回"BAD ORDER"

    4.2.1 TimeServer 部分代码
    public class TimeServer {
        public void bind(int port) throws Exception {
            //配置服务端的NIO线程组
            //NioEventLoopGroup是个线程组,它包含了一组NIO线程,专门用于网络事件的处理,
            //实际上它们就是Reactor线程组。
            //这里创建两个的原因是一个用于服务端接受客户端的连接,另一个用于进行SocketChannel的网络读写。
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                //创建ServerBootstrap对象,它是Netty用于启动NIO服务端的辅助启动类,目的是降低服务端的开发复杂度。
                ServerBootstrap b = new ServerBootstrap();
                //调用ServerBootstrap的group方法,将两个NIO线程组当作入参传递到ServerBootstrap中。
                //接着设置创建的Channel为NioServerSocketChannel,它的功能对应于JDK NIO类库中的ServerSocketChannel类。
                //然后配置NioServerSocketChannel的TCP参数,此处将它的backlog设置为1024,
                //最后绑定I/O事件的处理类ChildChannelHandler,它的作用类似于Reactor模式中的handler类,
                //主要用于处理网络I/O事件,例如记录日志、对消息进行编解码等。
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 1024)
                        .childHandler(new ChildChannelHandler());
                //绑定端口,同步等待成功
                //服务端启动辅助类配置完成之后,调用它的bind方法绑定监听端口
                //随后,调用它的同步阻塞方法sync等待绑定操作完成。
                //完成之后Netty会返回一个ChannelFuture,它的功能类似于JDK的java.util.concurrent.Future,主要用于异步操作的通知回调。
                ChannelFuture f = b.bind(port).sync();
    
                //等待服务端监听端口关闭
                //使用f.channel().closeFuture().sync()方法进行阻塞,等待服务端链路关闭之后main函数才退出。
                f.channel().closeFuture().sync();
            } finally {
                //优雅退出,释放线程池资源
                //调用NIO线程组的shutdownGracefully进行优雅退出,它会释放跟shutdownGracefully相关联的资源。
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        private class ChildChannelHandler extends ChannelInitializer {
            @Override
            protected void initChannel(Channel channel) throws Exception {
                channel.pipeline().addLast(new TimeServerHandler());
            }
        }
    
        public static void main(String[] args) throws Exception {
            new TimeServer().bind(8080);
        }
    }
    
    public class TimeServerHandler extends ChannelInboundHandlerAdapter {
        private int counter;
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            //做类型转换,将msg转换成Netty的ByteBuf对象。
            //ByteBuf类似于JDK中的java.nio.ByteBuffer 对象,不过它提供了更加强大和灵活的功能。
            ByteBuf buf = (ByteBuf) msg;
            //通过ByteBuf的readableBytes方法可以获取缓冲区可读的字节数,
            //根据可读的字节数创建byte数组
            byte[] req = new byte[buf.readableBytes()];
            //通过ByteBuf的readBytes方法将缓冲区中的字节数组复制到新建的byte数组中
            buf.readBytes(req);
            //通过new String构造函数获取请求消息。
            String body = new String(req, "UTF-8").substring(0, req.length - System.getProperty("line.separator").length());
            System.out.println("The time server receive order : " + body + " ; the counter is : " + ++counter);
            //如果是"QUERY TIME ORDER"则创建应答消息,
            String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(
                    System.currentTimeMillis()).toString() : "BAD ORDER";
            currentTime = currentTime + System.getProperty("line.separator");
            ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
            //通过ChannelHandlerContext的write方法异步发送应答消息给客户端。
            ctx.writeAndFlush(resp);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
    
    4.2.2 TimeClient部分代码

    客户端部分代码需要注意的是TimeClientHandler.channelActive方法,在链路建立成功后,循环发送一百条消息,每发送一次刷新一次,保证每条消息写到Channel中,按照我们的设计服务端应该接收到100条查询时间的消息。而在客户端接收消息channelRead方法中,我们每接收一条消息打印一次计数器,我们初衷是看到打印100次系统时间。

    public class TimeClient {
        public void connect(int port, String host) throws Exception {
            // 配置客户端NIO线程组
            //首先创建客户端处理I/O读写的NioEventLoop Group线程组
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                //继续创建客户端辅助启动类Bootstrap,随后需要对其进行配置。
                //与服务端不同的是,它的Channel需要设置为NioSocketChannel
                //然后为其添加handler,此处为了简单直接创建匿名内部类,实现initChannel方法,
                //其作用是当创建NioSocketChannel成功之后,
                //在初始化它的时候将它的ChannelHandler设置到ChannelPipeline中,用于处理网络I/O事件。
                Bootstrap b = new Bootstrap();
                b.group(group).channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new TimeClientHandler());
                            }
                        });
    
                // 发起异步连接操作
                //客户端启动辅助类设置完成之后,调用connect方法发起异步连接,
                //然后调用同步方法等待连接成功。
                ChannelFuture f = b.connect(host, port).sync();
    
                // 等待客户端链路关闭
                //当客户端连接关闭之后,客户端主函数退出.
                f.channel().closeFuture().sync();
            } finally {
                // 优雅退出,释放NIO线程组
                //在退出之前,释放NIO线程组的资源。
                group.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            int port = 8080;
            if (args !=null && args.length > 0){
                port = Integer.valueOf(args[0]);
            }
            new TimeClient().connect(port,"127.0.0.1");
        }
    }
    
    public class TimeClientHandler extends ChannelInboundHandlerAdapter{
    
        private int counter;
        private byte[] req;
    
        public TimeClientHandler() {
            req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
        }
    
        //当客户端和服务端TCP链路建立成功之后,Netty的NIO线程会调用channelActive方法
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ByteBuf message = null;
            for (int i = 0; i < 100; i++) {
                message = Unpooled.buffer(req.length);
                message.writeBytes(req);
                ctx.writeAndFlush(message);
            }
        }
    
        //当服务端返回应答消息时,channelRead方法被调用
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            //从Netty的ByteBuf中读取并打印应答消息。
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            String body = new String(req, "UTF-8");
            System.out.println("Now is : " + body + " ; the counter is : " + ++counter);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
    
    4.2.3 运行结果

    运行结果显然不如我们意,发生了粘包现象 ,服务端显示结果接受了两条消息,客户端只接收到了一条返回消息,可见客户端的请求消息和服务端的返回消息都发生了粘包:

    The time server receive order : QUERY TIME ORDER
    QUERY TIME ORDER
    QUERY TIME ORDER
    ......
    QUE ; the counter is : 1
    The time server receive order : Y TIME ORDER
    QUERY TIME ORDER
    QUERY TIME ORDER
    ......
    QUERY TIME ORDER ; the counter is : 2
    

    4.3 利用LineBasedFrameDecoder解决粘包问题

    为了解决半包读写问题,Netty默认提供了多种解码器用于处理半包,下面我们来修正时间服务器

    4.3.1 支持TCP粘包的TimeServer

    直接看代码,只是在ChildChannelHandler2添加LineBasedFrameDecoder和StringDecoder两个解码器

    public class TimeServer {
        public void bind(int port) throws Exception {
            //配置服务端的NIO线程组
            //NioEventLoopGroup是个线程组,它包含了一组NIO线程,专门用于网络事件的处理,
            //实际上它们就是Reactor线程组。
            //这里创建两个的原因是一个用于服务端接受客户端的连接,另一个用于进行SocketChannel的网络读写。
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                //创建ServerBootstrap对象,它是Netty用于启动NIO服务端的辅助启动类,目的是降低服务端的开发复杂度。
                ServerBootstrap b = new ServerBootstrap();
                //调用ServerBootstrap的group方法,将两个NIO线程组当作入参传递到ServerBootstrap中。
                //接着设置创建的Channel为NioServerSocketChannel,它的功能对应于JDK NIO类库中的ServerSocketChannel类。
                //然后配置NioServerSocketChannel的TCP参数,此处将它的backlog设置为1024,
                //最后绑定I/O事件的处理类ChildChannelHandler,它的作用类似于Reactor模式中的handler类,
                //主要用于处理网络I/O事件,例如记录日志、对消息进行编解码等。
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 1024)
                        .childHandler(new ChildChannelHandler2());
                //绑定端口,同步等待成功
                //服务端启动辅助类配置完成之后,调用它的bind方法绑定监听端口
                //随后,调用它的同步阻塞方法sync等待绑定操作完成。
                //完成之后Netty会返回一个ChannelFuture,它的功能类似于JDK的java.util.concurrent.Future,主要用于异步操作的通知回调。
                ChannelFuture f = b.bind(port).sync();
    
                //等待服务端监听端口关闭
                //使用f.channel().closeFuture().sync()方法进行阻塞,等待服务端链路关闭之后main函数才退出。
                f.channel().closeFuture().sync();
            } finally {
                //优雅退出,释放线程池资源
                //调用NIO线程组的shutdownGracefully进行优雅退出,它会释放跟shutdownGracefully相关联的资源。
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        /**
         * 添加LineBasedFrameDecoder和StringDecoder两个解码器
         * 更多请看TimeServerHandler代码修改
         */
        private class ChildChannelHandler2 extends ChannelInitializer<SocketChannel> {
            @Override
            protected void initChannel(SocketChannel channel) throws Exception {
                channel.pipeline().addLast(new LineBasedFrameDecoder(1024));
                channel.pipeline().addLast(new StringDecoder());
                channel.pipeline().addLast(new TimeServerHandler());
            }
        }
    
        public static void main(String[] args) throws Exception {
            new TimeServer().bind(8080);
        }
    }
    
    public class TimeServerHandler extends ChannelInboundHandlerAdapter {
        private int counter;
    
        /**
         * 解决半包问题 不对msg做额外处理和编码
         * @param ctx
         * @param msg
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            String body = (String) msg;
            System.out.println("The time server receive order : " + body + " ; the counter is : " + ++counter);
            String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(
                    System.currentTimeMillis()).toString() : "BAD ORDER";
            currentTime = currentTime + System.getProperty("line.separator");
            ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
            ctx.writeAndFlush(resp);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
    
    4.3.2 支持TCP粘包的TimeClient

    客户端部分代码类似,也是加上两个编码器:

    public class TimeClient {
        public void connect(int port, String host) throws Exception {
            // 配置客户端NIO线程组
            //首先创建客户端处理I/O读写的NioEventLoop Group线程组
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                //继续创建客户端辅助启动类Bootstrap,随后需要对其进行配置。
                //与服务端不同的是,它的Channel需要设置为NioSocketChannel
                //然后为其添加handler,此处为了简单直接创建匿名内部类,实现initChannel方法,
                //其作用是当创建NioSocketChannel成功之后,
                //在初始化它的时候将它的ChannelHandler设置到ChannelPipeline中,用于处理网络I/O事件。
                Bootstrap b = new Bootstrap();
                b.group(group).channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                //添加LineBasedFrameDecoder和StringDecoder解码器 更多看TimeClentander代码修改
                                ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                                ch.pipeline().addLast(new StringDecoder());
                                ch.pipeline().addLast(new TimeClientHandler());
                            }
                        });
    
                // 发起异步连接操作
                //客户端启动辅助类设置完成之后,调用connect方法发起异步连接,
                //然后调用同步方法等待连接成功。
                ChannelFuture f = b.connect(host, port).sync();
    
                // 等待客户端链路关闭
                //当客户端连接关闭之后,客户端主函数退出.
                f.channel().closeFuture().sync();
            } finally {
                // 优雅退出,释放NIO线程组
                //在退出之前,释放NIO线程组的资源。
                group.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            int port = 8080;
            if (args !=null && args.length > 0){
                port = Integer.valueOf(args[0]);
            }
            new TimeClient().connect(port,"127.0.0.1");
        }
    }
    
    public class TimeClientHandler extends ChannelInboundHandlerAdapter{
    
        private int counter;
        private byte[] req;
    
        public TimeClientHandler() {
            req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
        }
    
        //当客户端和服务端TCP链路建立成功之后,Netty的NIO线程会调用channelActive方法
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ByteBuf message = null;
            for (int i = 0; i < 100; i++) {
                message = Unpooled.buffer(req.length);
                message.writeBytes(req);
                ctx.writeAndFlush(message);
            }
        }
    
        /**
         * 直接打印msg 不做额外处理
         * @param ctx
         * @param msg
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            //从Netty的ByteBuf中读取并打印应答消息。
            String body = (String) msg;
            System.out.println("Now is : " + body + " ; the counter is : " + ++counter);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
    
    4.3.3 运行支持粘包的时间服务器程序

    结果如下,运行结果客户端 服务端 都完全符合预期 :

    The time server receive order : QUERY TIME ORDER ; the counter is : 1
    The time server receive order : QUERY TIME ORDER ; the counter is : 2
    ......
    The time server receive order : QUERY TIME ORDER ; the counter is : 99
    The time server receive order : QUERY TIME ORDER ; the counter is : 100
    
    Now is : Thu Dec 06 18:14:52 CST 2018 ; the counter is : 1
    Now is : Thu Dec 06 18:14:52 CST 2018 ; the counter is : 2
    ......
    Now is : Thu Dec 06 18:14:52 CST 2018 ; the counter is : 99
    Now is : Thu Dec 06 18:14:52 CST 2018 ; the counter is : 100
    
    4.3.4 LineBasedFrameDecoder和StringDecoder原理分析

    LineBasedFrameDecoder工作原理是一次便利ByteBuf中的可读字节,以"\n" 或者"\r\n"为结束位置,中间字节组成一行,是已换行符为结束标志的解码器。还可以设置单行最大长度,超过最大长度还没有换行符,就会抛出异常。

    StringDecoder的功能也很简单,将接收到的对象转换成字符串,然后在调用后面的Handler。LineBasedFrameDecoder+StringDecoder其实就是组合成按行切换的文本解码器。

    另外,Netty还提供了其他解码器,如果发送的消息不是以换行符结束,也可以解决半包问题。

    相关文章

      网友评论

          本文标题:Chapter4 TCP粘包/拆包问题的解决之道

          本文链接:https://www.haomeiwen.com/subject/xnzncqtx.html