美文网首页
【网络编程】Netty中的编解码之FixedLengthFram

【网络编程】Netty中的编解码之FixedLengthFram

作者: 程就人生 | 来源:发表于2023-03-03 13:46 被阅读0次

    在进行 TCP 编程时,无论是客户端还是服务器端,我们都需要考虑 TCP 的粘包/拆包问题。

    TCP 是流协议,所谓流就是没有界限的一串数据。TCP 底层是不知道上层业务逻辑含义的,只会根据 TCP 缓冲区的实际情况进行包的划分。一个完善的数据包可能会被 TCP 拆分成多个包进行发送,也可能会被 TCP 封装成一个大的数据包进行发送,这就是粘包/拆包的问题。

    粘包/拆包发生的原因,大致有以下几个:

    • 应用程序 write 写入的字节大小大于套接口发送缓冲区的大小;

    • 进行 MSS 大小的 TCP 分段;

    • 以太网帧的 payload 大于 MTU 进行 IP 分片;

    MSS(Maximum Segment Size,最大报文长度),是TCP协议定义的一个选项,MSS 选项用于在 TCP 连接建立时,收发双方协商通信时每一个报文段所能承载的最大数据长度。

    MTU(Maximum Transmission Unit,最大传输单元)用来通知对方所能接受数据服务单元的最大尺寸,说明发送方能够接受的有效载荷大小。

    由于底层的 TCP 是无法理解上层的业务数据,也无法保证数据包不被拆分和重组,因此粘包/拆包的问题,只能在上层的应用协议栈进行解决。主流协议的解决方案主要有:

    1. 消息定长,每个报文的大小固定;

    2. 在句尾增加换行符或其他特殊符号进行分割;

    3. 将消息分为消息头和消息体,消息头中有消息总长度;

    4. 设计更复杂的应用层协议。

    先来看一个粘包/拆包的小例子,客户端依次给服务器发送100条数据,服务器端应该接收到100条数据,服务器端接收到100条数据后要回应客户端100条数据,客户端也应该接收到100条数据,而实际上呢,却没有。

    服务器端代码:

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.PooledByteBufAllocator;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    import io.netty.util.CharsetUtil;
    import io.netty.util.concurrent.Future;
    import io.netty.util.concurrent.GenericFutureListener;
    
    /**
     * Netty服务器端
     * @author 程就人生
     * @date 2023年01月03日
     * @Description 
     *
     */
    public class TestServer {
    
        public void bind(final int port){
            EventLoopGroup boosGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try{            
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(boosGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
              .option(ChannelOption.SO_BACKLOG, 1024)
              .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childHandler(new ChannelInitializer<SocketChannel>(){
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                      ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));                  
                      ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));                  
                        ch.pipeline().addLast(new ServerHandler());
                    }               
                });
                ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {          
                    public void operationComplete(Future<? super Void> future) throws Exception {
                        if (future.isSuccess()) {
                           System.out.println("服务器启动成功,Started Successed:" + port);
                        } else {
                          System.out.println("服务器启动失败,Started Failed:" + port);
                        }
                    }
                });
            channelFuture.channel().closeFuture().sync();
            }catch(Exception e){
                e.printStackTrace();
            }finally{
                boosGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
    
        }
    
        public static void main(String[] argo){
            new TestServer().bind(8080);
        }
    }
    
    /**
     * 服务器端handler
     * @author 程就人生
     * @date 2023年01月03日
     * @Description 
     *
     */
    class ServerHandler extends ChannelInboundHandlerAdapter{
    
        private static int counter;
    
        @Override
        public void channelRead(ChannelHandlerContext ctx,Object msg){
            try{
                System.out.println("这里是服务器端控制台:" + msg + "计数:" + ++counter);
                String resp = "Server msg~!";            
                ctx.writeAndFlush(resp);            
            }catch(Exception e){
                e.printStackTrace();
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
           ctx.close();
        }
    }
    

    第28-29行代码,创建服务器端的 NIO 线程组,其中 boosGroup 线程组专门负责接收客户端的连接,workerGroup 线程组处理 I/O 消息的读写。ServerBootstrap 为辅助启动类。在第 33 行设置为非阻塞模式,第34 行设置 backlog 为 1KM,第35行-36行设置通道 Channel 的分配器,第37 行设置为长连接,第 38 行采用匿名内部类的方式声明 handler。

    在41行设置字符串解码器 StringDecoder,将接收到的 Bytebuf 解码为 String字符串。在42 行为字符串编码器 StringEncoder,将要发送的 String 字符串编码为 Bytebuf ,以便于在通道Channel中传输。StringDecoder 和 StringEncoder 经常成对出现,有了这两个类,我们就可以把 String 字符串交给它们,由它们来完成和 Bytebuf 之间的转换。

    在46 行绑定端口,在 47 行启动监听事件绑定。在 56 行同步服务器通道关闭。在60-61行服务器优雅关闭。

    在80行,定义静态变量,对接收数据进行计数。在85-87行,打印客户端发来的消息,并给客户端进行响应。在95行,遇到异常时关闭连接。

    客户端代码:

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    import io.netty.util.CharsetUtil;
    
    /**
     * netty客户端
     * @author 程就人生
     * @date 2023年01月03日
     * @Description 
     *
     */
    public class TestClient {
    
        public void connect(int port, String host){
            EventLoopGroup group = new NioEventLoopGroup();
            try{   
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(group)
                .channel(NioSocketChannel.class)
              .option(ChannelOption.SO_KEEPALIVE, true)
                .handler(new ChannelInitializer<SocketChannel>(){
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception { 
                      ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));                  
                      ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));                  
                        ch.pipeline().addLast(new ClientHandler());
                    }
                });            
                ChannelFuture channelFuture = bootstrap.connect(host, port);
                channelFuture.channel().closeFuture().sync();
            }catch(Exception e){
                e.printStackTrace();
            }finally{
                group.shutdownGracefully();
            }
        }
    
        public static void main(String[] argo){
            new TestClient().connect(8080, "localhost");
        }
    }
    
    /**
     * 客户端处理handler
     * @author 程就人生
     * @date 2023年01月03日
     * @Description 
     *
     */
    class ClientHandler extends ChannelInboundHandlerAdapter{
    
        private static int counter;
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            String req = "Client msg~!";
            for(int i = 0;i<100;i++){
                ctx.writeAndFlush(req);
            }
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx,Object msg){
            try{
                System.out.println("这里是客户端控制台:" + msg + ";计数:" + ++counter);
            }catch(Exception e){
                e.printStackTrace();
            }        
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
    

    在45行,通过 main 方法传入端口号和 host 。第 25 行创建客户端 NIO 线程组,在29行将线程组设置为非阻塞模式。第 30 行,保持长链接为 true。第 31 行采用匿名内部类的方式声明handler,34 行设置字符串解码器,35 行设置字符串编码器,36 行设置 I/O 消息处理 handler。第 39 行连接服务器端端口号和 host,第 40 行同步关闭事件。

    第62行,定义静态变量,对服务器发过来的数据进行计数。在62-68行,连接成功后,连续给服务器端发送100条消息。第75行,对服务器端发过来的信息继续计数。

    接下来分别运行服务器端和客户端,服务器端控制台输出:

    客户端输出:

    通过控制台,只看到两个数据包,服务器端粘包/拆包了,因为服务器端的缓冲区大小为1KB,而发送的数据很小,TCP 底层把他们封装成了一个包进行发送,因此出现了粘包。在第一个包和第二个包之间,还出现了拆包情况,一个数据包被放到了两个缓冲区。毫无疑问,这不是我们想要的结果。

    使用第 1 种解决方案,通过定长来控制粘包/拆包的问题。在服务器端、客户端采用匿名内部类的方式声明 handler 的地方,分别加入 FixedLengthFrameDecoder 类,服务器端和客户端发出去的消息长度刚好12个字符,这里就把FixedLengthFrameDecoder 中的固定长度设置为12。服务器端代码调整:

    .childHandler(new ChannelInitializer<SocketChannel>(){
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
          ch.pipeline().addLast(new FixedLengthFrameDecoder(12));
          ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));                  
          ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));                  
          ch.pipeline().addLast(new ServerHandler());
        }               
    });```
    
    客户端代码调整:
    
    

    .handler(new ChannelInitializer<SocketChannel>(){
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    ch.pipeline().addLast(new FixedLengthFrameDecoder(12));
    ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
    ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
    ch.pipeline().addLast(new ClientHandler());
    }
    });

    
    分别运行服务器端代码和客户端代码,服务器端控制台运行结果如下图所示:
    ![](https://img.haomeiwen.com/i3816895/78711df13ae98ea0.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
    
    客户端运行结果如下图所示:![](https://img.haomeiwen.com/i3816895/b4ce45b32037735b.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
    
    服务器端控制台、客户端控制台都收到了100条数据,并且没有再发生粘包/拆包的情况。
    
    查看 FixedLengthFrameDecoder 的源码,这个解码类就是把接收到的 Bytebuf 按照固定的长度进行分割,因此接收到的都是等长的数据包。 FixedLengthFrameDecoder 单独使用,无需对应的编码类。
    
    ![](https://img.haomeiwen.com/i3816895/414d990317e26d86.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
    
    当然这个解码类也有它的局限,在实际的业务场景中,不可能都是等长的数据包,数据包的长度很可能经常变化,这样这个解码器就不能满足我们的需求了。
    

    相关文章

      网友评论

          本文标题:【网络编程】Netty中的编解码之FixedLengthFram

          本文链接:https://www.haomeiwen.com/subject/rnosldtx.html