美文网首页
Netty粘包拆包解决之道

Netty粘包拆包解决之道

作者: 小鱼嘻嘻 | 来源:发表于2019-10-02 13:22 被阅读0次
    何为粘包拆包

    tcp是个“流”协议,所谓流,就是没有界限的一串数据。tcp底层并不了解上层业务数据的含义,它会根据tcp缓冲区的视觉情况进行包的划分,所以在业务上认为,一个完整的包可能会被tcp拆分成多个包进行发送,也有可能把多个小包封装成一个大的数据包发送,这就是所谓的tcp粘包和拆包。

    粘包问题的解决策略
    • 消息定长,例如每个报文的大小为固定长度200个字节,如果不够,空位补空格。
    • 在包尾增加回车换行符进行分割
    • 将消息分为消息头和消息体,消息头中包含表示消息长度的字段,通常设计思路为消息头的第一个字段使用int32来表示消息的总长度。
    • 更复杂的应用层协议
    Netty 如何解决粘包和拆包
    • 利用LineBaseFrameDecoder和StringDecoder解决tcp粘包和拆包引起的半包读写问题。
      LineBaseFrameDecoder的工作原理是:
      它依次遍历ByteBuf中的可读字节,判断是否有“\n”或者“\r\n”,如果有就以此位置结束。
      StringDecoder的功能比较简单,就是将接收到的对象转化成字符串,然后继续调用后面的handler。
      server端:
    package com.netty.study;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.LineBasedFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    
    /**
     * netty server
     * @author yuxi
     */
    public class NettyServer {
        public static void main(String[] args) {
            bind(8089);
        }
    
        /**
         * bind port
         * @param port
         */
        private static void bind(int port) {
    
            EventLoopGroup boss = new NioEventLoopGroup();
            EventLoopGroup worker = new NioEventLoopGroup();
    
            ServerBootstrap serverBootstrap = new ServerBootstrap();
    
            try {
                serverBootstrap.group(boss,worker)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,1024)
                    .childHandler(new ChildChannelHandler());
    
                ChannelFuture future = serverBootstrap.bind(port).sync();
                future.channel().closeFuture().sync();
    
    
            }catch (Exception e){
    
            }finally {
                boss.shutdownGracefully();
                worker.shutdownGracefully();
            }
    
        }
    
        private static class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
            @Override
            protected void initChannel(SocketChannel socketChannel) {
                socketChannel.pipeline()
                    //解决tcp拆包粘包问题
                    .addLast(new LineBasedFrameDecoder(1024));
                socketChannel.pipeline()  .addLast(new StringDecoder());
                socketChannel.pipeline()  .addLast(new TimeServerHandler());
            }
        }
    }
    package com.netty.study;
    
    import java.util.Date;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    /**
     * @author yuxi
     */
    public class TimeServerHandler extends ChannelHandlerAdapter {
        private int counter;
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
            //ByteBuf byteBuf = (ByteBuf)msg;
            //byte[] req = new byte[byteBuf.readableBytes()];
            //byteBuf.readBytes(req);
            //String body = new String(req,"utf-8").substring(0,req.length-System.getProperty("line.separator").length());
    
            String body = (String)msg;
    
            System.out.println("server:this is body:"+body+";"+"this is counter is:"+ ++counter);
    
            String currentTime = " ok, this is ok ".equals(body)?new Date(System.currentTimeMillis()).toString():
                "not ok";
    
    
            ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
            ctx.writeAndFlush(resp);
    
        }
    
    
    }
    
    

    client端:

    package com.netty.study;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.LineBasedFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    
    /**
     * netty client
     * @author yuxi
     */
    public class NettyClient {
        public static void main(String[] args) {
            connect("127.0.0.1",8089);
        }
    
        /**
         *
         * @param host
         * @param port
         */
        private static void connect(String host, int port) {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap bootstrap = new Bootstrap();
    
                bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY,true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline()
                            // 解决半包读写问题
                            .addLast(new LineBasedFrameDecoder(1024));
                            socketChannel.pipeline()  .addLast(new StringDecoder());
                            socketChannel.pipeline()  .addLast(new TimeClientHandler());
                        }
                    });
    
                ChannelFuture channelFuture = bootstrap.connect(host, port);
                channelFuture.sync();
    
                channelFuture.channel().closeFuture().sync();
    
            }catch (Exception e){
            }finally {
                group.shutdownGracefully();
    
            }
        }
    }
    package com.netty.study;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    /**
     * @author yuxi
     */
    public class TimeClientHandler extends ChannelHandlerAdapter {
    
    
        private int counter;
    
        private byte[] req;
    
        public TimeClientHandler() {
            req = (" ok, this is ok "+System.getProperty("line.separator")).getBytes();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            super.exceptionCaught(ctx, cause);
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
          //ByteBuf byteBuf = (ByteBuf)msg;
          //byte[] req = new byte[byteBuf.readableBytes()];
          //byteBuf.readBytes(req);
          //String body = new String(req,"utf-8");
    
            String body = (String)msg;
            System.out.println("client: now is:"+body+"this is counter:"+ ++counter);
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
           ByteBuf message = null;
            for (int i = 0; i <100; i++) {
                message = Unpooled.buffer(req.length);
                message.writeBytes(req);
                ctx.writeAndFlush(message);
            }
        }
    }
    
    
    • 利用DelimiterBaseFrameDecoder解决粘包和拆包
      DelimiterBaseFrameDecoder可以使用分隔符作为码流结束标识的消息的解码

    • 利用FixLengthFrameDecoder解决粘包和拆包
      FixLengthFrameDecoder是固定长度解码器,它能够按照指定的长度对消息进行自动解码,开发者不需要考虑拆包和粘包问题,非常方便。

    • 通过在消息头定义长度字段来标识消息的总长度。

    相关文章

      网友评论

          本文标题:Netty粘包拆包解决之道

          本文链接:https://www.haomeiwen.com/subject/qwnlpctx.html