美文网首页
【网络编程】Netty中的编解码之FixedLengthFram

【网络编程】Netty中的编解码之FixedLengthFram

作者: 程就人生 | 来源:发表于2023-03-03 13:46 被阅读0次

在进行 TCP 编程时,无论是客户端还是服务器端,我们都需要考虑 TCP 的粘包/拆包问题。

TCP 是流协议,所谓流就是没有界限的一串数据。TCP 底层是不知道上层业务逻辑含义的,只会根据 TCP 缓冲区的实际情况进行包的划分。一个完善的数据包可能会被 TCP 拆分成多个包进行发送,也可能会被 TCP 封装成一个大的数据包进行发送,这就是粘包/拆包的问题。

粘包/拆包发生的原因,大致有以下几个:

  • 应用程序 write 写入的字节大小大于套接口发送缓冲区的大小;

  • 进行 MSS 大小的 TCP 分段;

  • 以太网帧的 payload 大于 MTU 进行 IP 分片;

MSS(Maximum Segment Size,最大报文长度),是TCP协议定义的一个选项,MSS 选项用于在 TCP 连接建立时,收发双方协商通信时每一个报文段所能承载的最大数据长度。

MTU(Maximum Transmission Unit,最大传输单元)用来通知对方所能接受数据服务单元的最大尺寸,说明发送方能够接受的有效载荷大小。

由于底层的 TCP 是无法理解上层的业务数据,也无法保证数据包不被拆分和重组,因此粘包/拆包的问题,只能在上层的应用协议栈进行解决。主流协议的解决方案主要有:

  1. 消息定长,每个报文的大小固定;

  2. 在句尾增加换行符或其他特殊符号进行分割;

  3. 将消息分为消息头和消息体,消息头中有消息总长度;

  4. 设计更复杂的应用层协议。

先来看一个粘包/拆包的小例子,客户端依次给服务器发送100条数据,服务器端应该接收到100条数据,服务器端接收到100条数据后要回应客户端100条数据,客户端也应该接收到100条数据,而实际上呢,却没有。

服务器端代码:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

/**
 * Netty服务器端
 * @author 程就人生
 * @date 2023年01月03日
 * @Description 
 *
 */
public class TestServer {

    public void bind(final int port){
        EventLoopGroup boosGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try{            
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boosGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
          .option(ChannelOption.SO_BACKLOG, 1024)
          .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childOption(ChannelOption.SO_KEEPALIVE, true)
            .childHandler(new ChannelInitializer<SocketChannel>(){
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                  ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));                  
                  ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));                  
                    ch.pipeline().addLast(new ServerHandler());
                }               
            });
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
        channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {          
                public void operationComplete(Future<? super Void> future) throws Exception {
                    if (future.isSuccess()) {
                       System.out.println("服务器启动成功,Started Successed:" + port);
                    } else {
                      System.out.println("服务器启动失败,Started Failed:" + port);
                    }
                }
            });
        channelFuture.channel().closeFuture().sync();
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            boosGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }

    public static void main(String[] argo){
        new TestServer().bind(8080);
    }
}

/**
 * 服务器端handler
 * @author 程就人生
 * @date 2023年01月03日
 * @Description 
 *
 */
class ServerHandler extends ChannelInboundHandlerAdapter{

    private static int counter;

    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg){
        try{
            System.out.println("这里是服务器端控制台:" + msg + "计数:" + ++counter);
            String resp = "Server msg~!";            
            ctx.writeAndFlush(resp);            
        }catch(Exception e){
            e.printStackTrace();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
       ctx.close();
    }
}

第28-29行代码,创建服务器端的 NIO 线程组,其中 boosGroup 线程组专门负责接收客户端的连接,workerGroup 线程组处理 I/O 消息的读写。ServerBootstrap 为辅助启动类。在第 33 行设置为非阻塞模式,第34 行设置 backlog 为 1KM,第35行-36行设置通道 Channel 的分配器,第37 行设置为长连接,第 38 行采用匿名内部类的方式声明 handler。

在41行设置字符串解码器 StringDecoder,将接收到的 Bytebuf 解码为 String字符串。在42 行为字符串编码器 StringEncoder,将要发送的 String 字符串编码为 Bytebuf ,以便于在通道Channel中传输。StringDecoder 和 StringEncoder 经常成对出现,有了这两个类,我们就可以把 String 字符串交给它们,由它们来完成和 Bytebuf 之间的转换。

在46 行绑定端口,在 47 行启动监听事件绑定。在 56 行同步服务器通道关闭。在60-61行服务器优雅关闭。

在80行,定义静态变量,对接收数据进行计数。在85-87行,打印客户端发来的消息,并给客户端进行响应。在95行,遇到异常时关闭连接。

客户端代码:

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

/**
 * netty客户端
 * @author 程就人生
 * @date 2023年01月03日
 * @Description 
 *
 */
public class TestClient {

    public void connect(int port, String host){
        EventLoopGroup group = new NioEventLoopGroup();
        try{   
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
            .channel(NioSocketChannel.class)
          .option(ChannelOption.SO_KEEPALIVE, true)
            .handler(new ChannelInitializer<SocketChannel>(){
                @Override
                protected void initChannel(SocketChannel ch) throws Exception { 
                  ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));                  
                  ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));                  
                    ch.pipeline().addLast(new ClientHandler());
                }
            });            
            ChannelFuture channelFuture = bootstrap.connect(host, port);
            channelFuture.channel().closeFuture().sync();
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            group.shutdownGracefully();
        }
    }

    public static void main(String[] argo){
        new TestClient().connect(8080, "localhost");
    }
}

/**
 * 客户端处理handler
 * @author 程就人生
 * @date 2023年01月03日
 * @Description 
 *
 */
class ClientHandler extends ChannelInboundHandlerAdapter{

    private static int counter;

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        String req = "Client msg~!";
        for(int i = 0;i<100;i++){
            ctx.writeAndFlush(req);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg){
        try{
            System.out.println("这里是客户端控制台:" + msg + ";计数:" + ++counter);
        }catch(Exception e){
            e.printStackTrace();
        }        
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

在45行,通过 main 方法传入端口号和 host 。第 25 行创建客户端 NIO 线程组,在29行将线程组设置为非阻塞模式。第 30 行,保持长链接为 true。第 31 行采用匿名内部类的方式声明handler,34 行设置字符串解码器,35 行设置字符串编码器,36 行设置 I/O 消息处理 handler。第 39 行连接服务器端端口号和 host,第 40 行同步关闭事件。

第62行,定义静态变量,对服务器发过来的数据进行计数。在62-68行,连接成功后,连续给服务器端发送100条消息。第75行,对服务器端发过来的信息继续计数。

接下来分别运行服务器端和客户端,服务器端控制台输出:

客户端输出:

通过控制台,只看到两个数据包,服务器端粘包/拆包了,因为服务器端的缓冲区大小为1KB,而发送的数据很小,TCP 底层把他们封装成了一个包进行发送,因此出现了粘包。在第一个包和第二个包之间,还出现了拆包情况,一个数据包被放到了两个缓冲区。毫无疑问,这不是我们想要的结果。

使用第 1 种解决方案,通过定长来控制粘包/拆包的问题。在服务器端、客户端采用匿名内部类的方式声明 handler 的地方,分别加入 FixedLengthFrameDecoder 类,服务器端和客户端发出去的消息长度刚好12个字符,这里就把FixedLengthFrameDecoder 中的固定长度设置为12。服务器端代码调整:

.childHandler(new ChannelInitializer<SocketChannel>(){
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
      ch.pipeline().addLast(new FixedLengthFrameDecoder(12));
      ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));                  
      ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));                  
      ch.pipeline().addLast(new ServerHandler());
    }               
});```

客户端代码调整:

.handler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new FixedLengthFrameDecoder(12));
ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new ClientHandler());
}
});


分别运行服务器端代码和客户端代码,服务器端控制台运行结果如下图所示:
![](https://img.haomeiwen.com/i3816895/78711df13ae98ea0.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

客户端运行结果如下图所示:![](https://img.haomeiwen.com/i3816895/b4ce45b32037735b.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

服务器端控制台、客户端控制台都收到了100条数据,并且没有再发生粘包/拆包的情况。

查看 FixedLengthFrameDecoder 的源码,这个解码类就是把接收到的 Bytebuf 按照固定的长度进行分割,因此接收到的都是等长的数据包。 FixedLengthFrameDecoder 单独使用,无需对应的编码类。

![](https://img.haomeiwen.com/i3816895/414d990317e26d86.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

当然这个解码类也有它的局限,在实际的业务场景中,不可能都是等长的数据包,数据包的长度很可能经常变化,这样这个解码器就不能满足我们的需求了。

相关文章

网友评论

      本文标题:【网络编程】Netty中的编解码之FixedLengthFram

      本文链接:https://www.haomeiwen.com/subject/rnosldtx.html