TCP粘包和拆包

作者: 乙腾 | 来源:发表于2021-03-05 10:26 被阅读0次

    TCP的粘包和拆包

    粘包和拆包现象

    image.png

    客户端给服务端发送数据可能存在的场景:

    1.无拆包粘包

    服务端分两次读取到了两个独立的数据包,分别是D1和D2。

    2.粘包

    服务端一次接受到了一个数据包,但是这个数据包包含两个完整的消息D1和D2,D1和D2粘合在一起,称之为TCP粘包。(这里需要注意:一定是一个数据包包含两个完整的消息,才是粘包)

    3.拆包

    拆包有两种情况:

    3.1

    服务端分两次读取到了数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容。

    3.2

    服务端分两次读取到了数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余部分内容D1_2和完整的D2包。

    notice:

    拆包

    一个消息被拆分为多个部分,分散在多个数据包中传输。

    粘包

    多个完整的消息封装在了一个数据包中传输。

    产生拆包和粘包的原因

    粗略说明:

    由于TCP无消息保护边界,需要在接收端处理消息边界问题

    TCP是面向连接的,面向流的,提供高可靠性服务。收发两端(客户端和服务器端)都要有一一成对的socket,因此,发送端为了将多个发给接收端的包,更有效的发给对方,使用了优化方法(Nagle算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。这样做虽然提高了效率,但是接收端就难于分辨出完整的数据包了,因为面向流的通信是无消息保护边界的,由于TCP无消息保护边界,需要在接收端处理消息边界问题,也就是我们所说的粘包、拆包问题。

    详细说明:

    tcp是以流的方式进行传输的,传输的最小单元为一个报文段(segment)。tcp 通讯协议中的最大传输单元为MTU,一般是1500比特(187.5byte),超过这个量要分为多个报文段,其中可以传输的最大报文长度为mss,一般MSS=1500- 20(IP Header) -20 (TCP Header) = 1460比特,即180多个字节。

    tcp为提高性能,发送端会将需要发送的数据发送到缓冲区,等待缓冲区满了之后,再将缓冲中的数据发送到接收方。同理,接收方也有缓冲区这样的机制,来接收数据。

    发生TCP粘包、拆包主要是由于下面一些原因:

    1. 应用程序写入的数据大于套接字缓冲区大小,这将会发生拆包。
    2. 应用程序写入数据小于套接字缓冲区大小,网卡将应用多次写入的数据发送到网络上,这将会发生粘包。
    3. 进行mss(最大报文长度)大小的TCP分段,当TCP报文长度-TCP头部长度>mss的时候将发生拆包。
    4. 接收方法不及时读取套接字缓冲区数据,这将发生粘包。

    解决方案

    自定义协议+编解码器

    服务器端每次读取数据长度的问题, 这个问题解决,就不会出现服务器多读或少读数据的问题,从而避免的TCP 粘包、拆包 。

    image.png

    每次在数据包前面加上一个固定的长度,来标识本次消息的长度。

    code

    客户端代码

    MessageDecoder

    package com.pl.netty.protocoltcp.decoder;
    
    import com.pl.netty.protocoltcp.message.MessageProtocol;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.ReplayingDecoder;
    import java.util.List;
    
    /**
     * <p>
     *
     * @Description: TODO
     * </p>
     * @ClassName MessageDecoder  消息解码器
     * @Author pl
     * @Date 2021/3/5
     * @Version V1.0.0
     */
    public class MessageDecoder extends ReplayingDecoder<Void> {
        @Override
        protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {
            System.out.println("MessageDecoder 被调用");
            //关键点在这里
            //现获取int,判断本次数据的字节长度
            int length = in.readInt();
            //读取到字节长度后,从byteBuf中读取指定长度的字节
            byte[] content = new byte[length];
            in.readBytes(content);
            //封装成 MessageProtocol 对象,放入 out,传递到下一个Handler
            MessageProtocol messageProtocol = new MessageProtocol();
            messageProtocol.setLen(length);
            messageProtocol.setContent(content);
            out.add(messageProtocol);
        }
    }
    

    Client

    package com.pl.netty.protocoltcp;
    
    import com.pl.netty.protocoltcp.encoder.MessageEncoder;
    import com.pl.netty.protocoltcp.message.MessageProtocol;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    import io.netty.util.CharsetUtil;
    
    import java.util.Scanner;
    
    /**
     * <p>
     *
     * @Description: TODO
     * </p>
     * @ClassName Client
     * @Author pl
     * @Date 2021/2/16
     * @Version V1.0.0
     */
    public class Client {
    
        //属性
        private final String host;
        private final int port;
    
        public Client(String host, int port) {
            this.port = port;
            this.host = host;
        }
    
        public void run() throws InterruptedException {
            NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
            try {
                Bootstrap bootstrap = new Bootstrap()
                        .group(eventExecutors)
                        .channel(NioSocketChannel.class)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                //加入Handler
                           //     pipeline.addLast("encoder", new MessageEncoder());
                                pipeline.addLast("encoder",new MessageEncoder());
                                pipeline.addLast(new GroupChatClientHandler());
                            }
                        });
                ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
                //得到channel
                Channel channel = channelFuture.channel();
                System.out.println("--------" + channel.localAddress() + "---------");
                //客户端需要输入信息,创建一个扫描器
    
            } finally {
                eventExecutors.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            new Client("127.0.0.1", 7000).run();
        }
    }
    

    GroupChatClientHandler

    package com.pl.netty.protocoltcp;
    
    import com.pl.netty.protocoltcp.message.MessageProtocol;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.util.CharsetUtil;
    
    /**
     * <p>
     *
     * @Description: TODO
     * </p>
     * @ClassName GroupChatClientHandler
     * @Author pl
     * @Date 2021/2/16
     * @Version V1.0.0
     */
    public class GroupChatClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            for (int i = 0; i < 3; i++) {
                String message = "Server" + i;
                byte[] content = message.getBytes(CharsetUtil.UTF_8);
                int length = content.length;
                //创建协议包对象
                MessageProtocol messageProtocol = new MessageProtocol();
                messageProtocol.setLen(length);
                messageProtocol.setContent(content);
                ctx.writeAndFlush(messageProtocol);
            }
        }
    
    
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageProtocol messageProtocol) throws Exception {
    
        }
    }
    

    服务端代码

    MessageEncoder

    package com.pl.netty.protocoltcp.encoder;
    
    import com.pl.netty.protocoltcp.message.MessageProtocol;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToByteEncoder;
    
    /**
     * <p>
     *
     * @Description: TODO
     * </p>
     * @ClassName MessageEncoder  消息编码器
     * @Author pl
     * @Date 2021/3/5
     * @Version V1.0.0
     */
    public class MessageEncoder extends MessageToByteEncoder<MessageProtocol> {
        @Override
        protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
            System.out.println("MessageEncoder 方法被调用");
            //编码器的关键,每次传输消息数据的时候,先给定一个固定长度
            out.writeInt(msg.getLen());
            out.writeBytes(msg.getContent());
        }
    }
    
    

    Server

    package com.pl.netty.protocoltcp;
    
    import com.pl.netty.protocoltcp.decoder.MessageDecoder;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    /**
     * <p>
     *
     * @Description: TODO
     * </p>
     * @ClassName Server
     * @Author pl
     * @Date 2021/2/16
     * @Version V1.0.0
     */
    public class Server {
        private int port;
    
        public Server(int port) {
            this.port = port;
        }
    
        public void run(){
            //bossGroup 用于接收连接
            NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
            //workerGroup 用于具体的处理
            NioEventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                // Bootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(bossGroup,workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG,128)
                        .childOption(ChannelOption.SO_KEEPALIVE, true)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline pipeline = socketChannel.pipeline();
                                pipeline.addLast("decoder", new MessageDecoder());
                               // pipeline.addLast("decoder",new MessageEncoder());
                                pipeline.addLast(new MssgHandler());
                            }
                        });
                System.out.println("netty 服务端启动");
                //Netty 中所有的 IO 操作都是异步的,不能立刻得知消息是否被正确处理。但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过 Future 和 ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件
                ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
                //监听关闭事件
                channelFuture.channel().closeFuture().sync();
            }catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) {
            Server server = new Server(7000);
            server.run();
        }
    }
    

    MssgHandler

    package com.pl.netty.protocoltcp;
    
    import com.pl.netty.protocoltcp.message.MessageProtocol;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.channel.group.ChannelGroup;
    import io.netty.channel.group.DefaultChannelGroup;
    import io.netty.util.CharsetUtil;
    import io.netty.util.concurrent.GlobalEventExecutor;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.UUID;
    
    /**
     * <p>
     *
     * @Description: TODO
     * </p>
     * @ClassName GroupchatServerHandler  继承 SimpleChannelInboundHandler 入栈
     * @Author pl
     * @Date 2021/2/16
     * @Version V1.0.0
     */
    public class MssgHandler extends SimpleChannelInboundHandler<MessageProtocol> {
    
        private int count;
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
            //接收到数据,并处理
            int len = msg.getLen();
            byte[] content = msg.getContent();
            System.out.println("服务器第 " + (++count) +" 次接收到信息如下:");
            System.out.println("长度:" + len);
            System.out.println("内容:" + new String(content, CharsetUtil.UTF_8));
            //回复消息
            String response = UUID.randomUUID().toString();
            int length = response.getBytes(CharsetUtil.UTF_8).length;
            MessageProtocol messageProtocol = new MessageProtocol();
            messageProtocol.setLen(length);
            messageProtocol.setContent(response.getBytes());
            ctx.writeAndFlush(messageProtocol);
    
        }
    }
    

    输出

    image.png

    相关文章

      网友评论

        本文标题:TCP粘包和拆包

        本文链接:https://www.haomeiwen.com/subject/iflrqltx.html