美文网首页Java进阶之路IT@程序员猿媛
Netty的拆包和粘包解决方案

Netty的拆包和粘包解决方案

作者: 叫我不矜持 | 来源:发表于2019-05-02 21:54 被阅读3次

    概念

    TCP是一个“流”协议,所谓流,就是没有界限的一长串二进制数据。TCP作为传输层协议并不不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行数据包的划分,所以在业务上认为是一个完整的包,可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。

    当数据被TCP拆分成多个包进行发送,在另一端接收的时候,需要把多次获取的结果粘在一起,变成我们可以理解的信息。这种情况需要我们做粘包处理。

    当TCP把多个小的包封装成一个大的数据包发送,在另一端接受的时候,需要我们做拆包处理。

    一.黏包/拆包问题的解决方案

    由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决。业界的主流协议的解决方案,可以归纳如下:

    1. 消息定长,报文大小固定长度,例如每个报文的长度固定为200字节,如果不够空位补空格;
    2. 包尾添加特殊分隔符,例如每条报文结束都添加回车换行符(例如FTP协议)或者指定特殊字符作为报文分隔符,接收方通过特殊分隔符切分报文区分;
    3. 将消息分为消息头和消息体,消息头中包含表示信息的总长度(或者消息体长度)的字段;
    4. 更复杂的自定义应用层协议。

    Netty提供了多个解码器,可以进行分包的操作,分别是:

    • LineBasedFrameDecoder
    • DelimiterBasedFrameDecoder(添加特殊分隔符报文来分包)
    • FixedLengthFrameDecoder(使用定长的报文来分包)
    • LengthFieldBasedFrameDecoder

    其实上述的粘包和拆包的问题,归根结底的解决方案就是发送端给远程端一个标记,告诉远程端,每个信息的结束标志是什么,这样,远程端获取到数据后,根据跟发送端约束的标志,将接收的信息分切或者合并成我们需要的信息,这样我们就可以获取到正确的信息了。

    二.Netty解码器介绍

    1. LineBasedFrameDecoder解码器
    LineBasedFrameDecoder是回车换行解码器,如果用户发送的消息以回车换行符作为消息结束的标识,则可以直接使用Netty的LineBasedFrameDecoder对消息进行解码,只需要在初始化Netty服务端或者客户端时将LineBasedFrameDecoder正确的添加到ChannelPipeline中即可,不需要自己重新实现一套换行解码器。

    new LineBasedFrameDecoder(2048) 配合 System.getProperty("line.separator")使用。

    2. DelimiterBasedFrameDecoder解码器
    DelimiterBasedFrameDecoder是分隔符解码器,用户可以指定消息结束的分隔符,它可以自动完成以分隔符作为码流结束标识的消息的解码。回车换行解码器实际上是一种特殊的DelimiterBasedFrameDecoder解码器。

    new DelimiterBasedFrameDecoder(2019, Unpooled.copiedBuffer("$$__".getBytes())) 按照$$__切分

    3. FixedLengthFrameDecoder解码器
    FixedLengthFrameDecoder是固定长度解码器,它能够按照指定的长度对消息进行自动解码,开发者不需要考虑TCP的粘包/拆包等问题,非常实用。

    对于定长消息,如果消息实际长度小于定长,则往往会进行补位操作,它在一定程度上导致了空间和资源的浪费。但是它的优点也是非常明显的,编解码比较简单,因此在实际项目中仍然有一定的应用场景。

    new FixedLengthFrameDecoder(200) 告诉Netty,获取的帧数据有200个字节就切分一次

    4. LengthFieldBasedFrameDecoder解码器
    大多数的协议,协议头中会携带长度字段,用于标识消息体或者整包消息的长度,例如SMPP、HTTP协议等。由于基于长度解码需求的通用性,以及为了降低用户的协议开发难度,Netty提供了LengthFieldBasedFrameDecoder,自动屏蔽TCP底层的拆包和粘包问题,只需要传入正确的参数,即可轻松解决“读半包“问题。

    Message 实体类,发送的数据都封装在这里

    public class Message {
    
        private final Charset charset = Charset.forName("utf-8");
    
        private byte magicType;
        private byte type;//消息类型  0xAF 表示心跳包    0xBF 表示超时包  0xCF 业务信息包
        private long requestId; //请求id
        private int length;
        private String body;
    
        public Message(){
    
        }
    
        public Message(byte magicType, byte type, long requestId, byte[] data) {
            this.magicType = magicType;
            this.type = type;
            this.requestId = requestId;
            this.length = data.length;
            this.body = new String(data, charset);
        }
    
        public Message(byte magicType, byte type, long requestId, String body) {
            this.magicType = magicType;
            this.type = type;
            this.requestId = requestId;
            this.length = body.getBytes(charset).length;
            this.body = body;
        }
        ...setter/getter
    }
    
    

    MessageDecoder 消息解码

    public class MessageDecoder extends LengthFieldBasedFrameDecoder {
        private Logger logger = LoggerFactory.getLogger(getClass());
    
        //头部信息的大小应该是 byte+byte+int = 1+1+8+4 = 14
        private static final int HEADER_SIZE = 14;
    
        public MessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {
            super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
        }
    
        @Override
        protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
            if (in == null) {
                return null;
            }
    
            if (in.readableBytes() <= HEADER_SIZE) {
                return null;
            }
    
            in.markReaderIndex();
    
            byte magic = in.readByte();
            byte type = in.readByte();
            long requestId = in.readLong();
            int dataLength = in.readInt();
    
            // FIXME 如果dataLength过大,可能导致问题
            if (in.readableBytes() < dataLength) {
                in.resetReaderIndex();
                return null;
            }
    
            byte[] data = new byte[dataLength];
            in.readBytes(data);
    
            String body = new String(data, "UTF-8");
            Message msg = new Message(magic, type, requestId, body);
            return msg;
        }
    }
    
    

    Message消息编码

    public class MessageEncoder extends MessageToByteEncoder<Message> {
        private final Charset charset = Charset.forName("utf-8");
    
        @Override
        protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
    
            //
            out.writeByte(msg.getMagicType());
            out.writeByte(msg.getType());
            out.writeLong(msg.getRequestId());
    
            byte[] data = msg.getBody().getBytes(charset);
            out.writeInt(data.length);
            out.writeBytes(data);
        }
    }
    
    

    NettyServer

    public class NettyServer {
        private Logger logger = LoggerFactory.getLogger(this.getClass());
    
        public void bind(int port) throws InterruptedException {
    
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 128)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
    
                                ChannelPipeline p = ch.pipeline();
                                p.addLast(new MessageDecoder(1<<20, 10, 4));
                                p.addLast(new MessageEncoder());
                                p.addLast(new ServerHandler());
                            }
                        });
    
                // Bind and start to accept incoming connections.
                ChannelFuture future = b.bind(port).sync(); // (7)
    
                logger.info("server bind port:{}", port);
    
                // Wait until the server socket is closed.
                future.channel().closeFuture().sync();
    
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        private class ServerHandler extends SimpleChannelInboundHandler<Message> {
    
            @Override
            protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
    
                logger.info("server read msg:{}", msg);
    
                Message resp = new Message(msg.getMagicType(), msg.getType(), msg.getRequestId(), "Hello world from server");
                ctx.writeAndFlush(resp);
            }
        }
    
        public static void main(String[] args) throws Exception {
    
            new NettyServer().bind(Constants.PORT);
        }
    }
    
    

    NettyClient

    public class NettyClient {
    
        public void connect(String host, int port) throws InterruptedException {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group)
                        .channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
    
                                ChannelPipeline p = ch.pipeline();
                                p.addLast(new MessageDecoder(1<<20, 10, 4));
                                p.addLast(new MessageEncoder());
    
                                p.addLast(new ClientHandler());
                            }
                        });
    
                ChannelFuture future = b.connect(host, port).sync();
    
                future.awaitUninterruptibly(2000, TimeUnit.MILLISECONDS);
                if(future.channel().isActive()){
    
                    for(int i=0; i<100; i++) {
    
                        String body = "Hello world from client:"+ i;
                        Message msg = new Message((byte) 0XAF, (byte) 0XBF, i, body);
    
                        future.channel().writeAndFlush(msg);
                    }
                }
    
                future.channel().closeFuture().sync();
            } finally {
                group.shutdownGracefully();
            }
        }
    
        private class ClientHandler extends ChannelInboundHandlerAdapter {
            private Logger logger = LoggerFactory.getLogger(getClass());
    
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg)
                    throws Exception {
    
                logger.info("client read msg:{}, ", msg);
    
            }
    
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                    throws Exception {
                logger.error("client caught exception", cause);
                ctx.close();
            }
        }
    
        public static void main(String[] args) throws Exception {
    
            new NettyClient().connect(Constants.HOST, Constants.PORT);
        }
    }
    
    

    相关文章

      网友评论

        本文标题:Netty的拆包和粘包解决方案

        本文链接:https://www.haomeiwen.com/subject/brdonqtx.html