美文网首页转载部分
Netty-TCP拆包/粘包

Netty-TCP拆包/粘包

作者: Cool_Pomelo | 来源:发表于2020-04-17 19:35 被阅读0次

    Netty-TCP拆包/粘包

    TCP拆包/粘包

    TCP 是一个面向字节流的协议,它是性质是流式的,所以它并没有分段。就像水流一样,你没法知道什么时候开始,什么时候结束。所以它会根据当前的套接字缓冲区的情况进行拆包或是粘包

    粘包问题图示:

    图1.png

    客户端发送两个数据包D1&D2给服务端,因为服务端一次读取的字节数是不确定的,所以可能出现:

    • 正常情况,服务端分两次读取到了两个独立的数据包
    • 服务端一次收到两个数据包,两个粘合在了一起,出现粘包现象

    • 服务端分两次读取到了两个数据包,第一次读取到完整的D1包&部分D2包的内容,第二次读取到了D2剩余内容,出现拆包现象

    • 服务端分两次读取到了两个数据包,第一次读取D1的部分,第二次读取了D1剩余内容以及完整D2

    TCP拆包/粘包发送原因

    图示:

    图2.png

    三个原因:

    • 应用程序write写入的字节大小大于套接口发送缓冲区的大小

    • 进行MSS大小的TCP分段

    • 以太网帧的payload大于MTU进行IP分片

    例子

    未考虑TCP粘包的情况

    
    public class Client {
    
        public void connect(String host, int port) throws InterruptedException {
    
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group)
                        .channel(NioSocketChannel.class)
    //                    .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
    
                                ChannelPipeline p = ch.pipeline();
    //                            p.addLast(new LineBasedFrameDecoder(1024));
    //                            p.addLast(new StringDecoder());
    //                            p.addLast(new StringEncoder());
    
                                p.addLast(new ClientHandler());
                            }
                        });
    
                ChannelFuture future = b.connect(host, port).sync();
    
                future.channel().closeFuture().sync();
            } finally {
                group.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
    
            new Client().connect("localhost",9988);
        }
    
    
    }
    
    
    public class ClientHandler extends ChannelInboundHandlerAdapter {
    
    //    private Logger logger = LoggerFactory.getLogger(getClass());
    
        private int count =0;
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            // Send the message to Server
            ByteBuf buf = null;
            for(int i=0; i<100; i++){
    
                String msg = "hello from client "+i + "\n";
                byte[] r = msg.getBytes();
                buf = Unpooled.buffer(r.length);
                buf.writeBytes(r);
               ctx.writeAndFlush(buf);
    //            System.out.println("client send message:{}   " + msg);
    
    
    //            ctx.writeAndFlush(msg+System.getProperty("line.separator"));
            }
    
            System.out.println("out");
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg)
                throws Exception {
            String body = (String) msg;
            count++;
            System.out.println("client read msg:{}, count:{}   " + body + "    " + count);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    
    public class Server {
    
    //    private Logger logger = LoggerFactory.getLogger(getClass());
    
        public void bind(int port) throws Exception {
    
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
    //                    .option(ChannelOption.SO_BACKLOG, 1024)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
    
                                ChannelPipeline p = ch.pipeline();
    //                            p.addLast(new LineBasedFrameDecoder(1024));
    //                            p.addLast(new StringDecoder());
    //                            p.addLast(new StringEncoder());
    
                                p.addLast(new ServerHandler());
                            }
                        });
    
                // Bind and start to accept incoming connections.
                ChannelFuture f = b.bind(port).sync(); // (7)
    
                System.out.println("server bind port:{}    "+ + port);
    
                // Wait until the server socket is closed.
                f.channel().closeFuture().sync();
    
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
    
            new Server().bind(9988);
        }
    }
    
    
    public class ServerHandler extends ChannelInboundHandlerAdapter {
    //    private Logger logger = LoggerFactory.getLogger(getClass());
    
        private int count = 0;
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];
    //        System.out.println("len  " + req.length);
            buf.readBytes(req);
            String body = new String(req, "UTF-8");
    
            System.out.println("============package=====================");
            System.out.println(body);
            System.out.println("============package=====================");
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    
    }
    
    
    
    
    
    
    

    运行输出:

    
    ============package=====================
    hello from client 0
    hello from client 1
    hello from client 2
    hello from client 3
    hello from client 4
    hello from client 5
    hello from client 6
    hello from client 7
    hello from client 8
    hello from client 9
    hello from client 10
    hello from client 11
    hello from client 12
    hello from client 13
    hello from client 14
    hello from client 15
    hello from client 16
    hello from client 17
    hello from client 18
    hello from client 19
    hello from client 20
    hello from client 21
    hello from client 22
    hello from client 23
    hello from client 24
    hello from client 25
    hello from client 26
    hello from client 27
    hello from client 28
    hello from client 29
    hello from client 30
    hello from client 31
    hello from client 32
    hello from client 33
    hello from client 34
    hello from client 35
    hello from client 36
    hello from client 37
    hello from client 38
    hello from client 39
    hello from client 40
    hello from client 41
    hello from client 42
    hello from client 43
    hello from client 44
    hello from client 45
    hello from client 46
    hello from client 47
    hello from client 48
    hello
    ============package=====================
    ============package=====================
     from client 49
    hello from client 50
    hello from client 51
    hello from client 52
    hello from client 53
    hello from client 54
    hello from client 55
    hello from client 56
    hello from client 57
    hello from client 58
    hello from client 59
    hello from client 60
    hello from client 61
    hello from client 62
    hello from client 63
    hello from client 64
    hello from client 65
    hello from client 66
    hello from client 67
    hello from client 68
    hello from client 69
    hello from client 70
    hello from client 71
    hello from client 72
    hello from client 73
    hello from client 74
    hello from client 75
    hello from client 76
    hello from client 77
    hello from client 78
    hello from client 79
    hello from client 80
    hello from client 81
    hello from client 82
    hello from client 83
    hello from client 84
    hello from client 85
    hello from client 86
    hello from client 87
    hello from client 88
    hello from client 89
    hello from client 90
    hello from client 91
    hello from client 92
    hello from client 93
    hello from client 94
    hello from client 95
    hello from client 96
    hello from client 97
    hello from client 98
    hello from client 99
    
    ============package=====================
    

    客户端发送的100条消息被当成了两个数据包进行处理,说明发送了粘包现象

    使用LineBasedFrameDecoder + StringDecoder 解决问题

    LineBasedFrameDecoder

    文档:

    public class LineBasedFrameDecoder
    extends ByteToMessageDecoder
    

    在行尾拆分接收到的ByteBuf的解码器,“ \ n”和“ \ r \ n”都被处理,字节流应采用UTF-8字符编码或ASCII。 当前实现使用直接字节进行字符转换,然后将该字符与一些低范围的ASCII字符(例如'\ n'或'\ r')进行比较。 UTF-8没有将低范围[0..0x7F]字节值用于多字节代码点表示,因此此实现完全支持。

    LineBasedFrameDecoder 的工作原理是它依次遍历 ByteBuf 中的可读字节,判断看是否有 "\n” 或者 "\r\n”,如果有,就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。它是以回车换行符为结束标记的解码器,支持配置单行的最大长度,如果连续读取到最大长度后仍然没有发现换行符,会抛出异常,同时忽略掉之前读取到的异常码流。

    StringDecoder

    public class StringDecoder
    extends MessageToMessageDecoder<ByteBuf>
    

    将收到的ByteBuf解码为字符串。 请注意,如果使用的是基于流的传输方式(例如TCP / IP),则此解码器必须与适当的ByteToMessageDecoder(例如DelimiterBasedFrameDecoder或LineBasedFrameDecoder)一起使用。 TCP / IP套接字中基于文本的线路协议的典型设置为:

     ChannelPipeline pipeline = ...;
    
     // Decoders
     pipeline.addLast("frameDecoder", new LineBasedFrameDecoder(80));
     pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));
    
     // Encoder
     pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8));
     
    and then you can use a String instead of a ByteBuf as a message:
     void channelRead(ChannelHandlerContext ctx, String msg) {
         ch.write("Did you say '" + msg + "'?\n");
     }
    
    

    StringEncoder

    public class StringEncoder
    extends MessageToMessageEncoder<java.lang.CharSequence>
    
    

    将请求的字符串编码为ByteBuf。 TCP / IP套接字中基于文本的线路协议的典型设置为:

     ChannelPipeline pipeline = ...;
    
     // Decoders
     pipeline.addLast("frameDecoder", new LineBasedFrameDecoder(80));
     pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));
    
     // Encoder
     pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8));
     
    and then you can use a String instead of a ByteBuf as a message:
     void channelRead(ChannelHandlerContext ctx, String msg) {
         ch.write("Did you say '" + msg + "'?\n");
     }
    
    

    应用:

    public class Server {
    
    //    private Logger logger = LoggerFactory.getLogger(getClass());
    
        public void bind(int port) throws Exception {
    
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
    //                    .option(ChannelOption.SO_BACKLOG, 1024)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
    
                                ChannelPipeline p = ch.pipeline();
                                p.addLast(new LineBasedFrameDecoder(1024));
                                p.addLast(new StringDecoder());
    //                            p.addLast(new StringEncoder());
    
                                p.addLast(new ServerHandler());
                            }
                        });
    
                // Bind and start to accept incoming connections.
                ChannelFuture f = b.bind(port).sync(); // (7)
    
                System.out.println("server bind port:{}    "+ + port);
    
                // Wait until the server socket is closed.
                f.channel().closeFuture().sync();
    
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
    
            new Server().bind(9988);
        }
    }
    
    public class ServerHandler extends ChannelInboundHandlerAdapter {
    //    private Logger logger = LoggerFactory.getLogger(getClass());
    
        private int count = 0;
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
            String body = (String) msg;
            System.out.println("");
            System.out.println("============package=====================");
            System.out.println(body);
            System.out.println("============package=====================");
            System.out.println("");
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    
    }
    
    
    public class ClientHandler extends ChannelInboundHandlerAdapter {
    
        private int count =0;
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            // Send the message to Server
            ByteBuf buf = null;
            for(int i=0; i<100; i++){
    
                String msg = "hello from client "+i + "\n";
                byte[] r = msg.getBytes();
                buf = Unpooled.buffer(r.length);
                buf.writeBytes(r);
               ctx.writeAndFlush(buf);
            }
    
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg)
                throws Exception {
            String body = (String) msg;
            count++;
            System.out.println("client read msg:{}, count:{}   " + body + "    " + count);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    
    
    public class Client {
    
        public void connect(String host, int port) throws InterruptedException {
    
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group)
                        .channel(NioSocketChannel.class)
    //                    .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
    
                                ChannelPipeline p = ch.pipeline();
                                p.addLast(new LineBasedFrameDecoder(1024));
                                p.addLast(new StringDecoder());
    //                            p.addLast(new StringEncoder());
    
                                p.addLast(new ClientHandler());
                            }
                        });
    
                ChannelFuture future = b.connect(host, port).sync();
    
                future.channel().closeFuture().sync();
            } finally {
                group.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
    
            new Client().connect("localhost",9988);
        }
    
    
    }
    
    
    
    

    相关文章

      网友评论

        本文标题:Netty-TCP拆包/粘包

        本文链接:https://www.haomeiwen.com/subject/rkhuvhtx.html