美文网首页
Netty接收数据时一次读取多少字节以及读多少次

Netty接收数据时一次读取多少字节以及读多少次

作者: 书唐瑞 | 来源:发表于2021-08-26 04:19 被阅读0次

    本篇文章介绍一下,Netty在接收到数据时,一次性读取多少字节.

    本篇使用Netty构建一个简单的服务端,使用Python构建一个简单的客户端,然后客户端向服务端发送数据,然后观察Netty每次读取的字节数.

    客户端代码如下

    import socket
    
    if __name__ == '__main__':
    
        client = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        client.connect(('127.0.0.1',8080))
        client.send('''11111111111111111111111111111111111111111111\
                2222222222222222222222222222222222222222222222222\
                3333333333333333333333333333333333333333333333333\
                4444444444444444444444444444444444444444444444444\
                5555555555555555555555555555555555555555555555555\
                6666666666666666666666666666666666666666666666666\
                7777777777777777777777777777777777777777777777777\
                8888888888888888888888888888888888888888888888888\
                9999999999999999999999999999999999999999999999999\
                0000000000000000000000000000000000000000000000000\
                1111111111111111111111111111111111111111111111111\
                2222222222222222222222222222222222222222222222222\
                3333333333333333333333333333333333333333333333333\
                4444444444444444444444444444444444444444444444444\
                5555555555555555555555555555555555555555555555555\
                6666666666666666666666666666666666666666666666666\
                7777777777777777777777777777777777777777777777777\
                8888888888888888888888888888888888888888888888888\
                9999999999999999999999999999999999999999999999999'''.encode('utf-8'))
    

    刻意让客户端一次性发送多一些数据 .

    服务端代码如下

    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    
    public class Server {
    
    
        public static void main(String[] args) throws Exception {
    
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup(8);
            EventLoopGroup businessGroup = new NioEventLoopGroup(8);
    
            ServerBootstrap serverBootstrap = new ServerBootstrap();
    
            try {
    
                serverBootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .handler(new LoggingHandler(LogLevel.INFO))
                        .childHandler(new ChannelInitializer<NioSocketChannel>() {
                            @Override
                            protected void initChannel(NioSocketChannel ch) {
                                ChannelPipeline channelPipeline = ch.pipeline();
                                channelPipeline.addLast(new StringEncoder());
                                channelPipeline.addLast(new StringDecoder());
                                channelPipeline.addLast(businessGroup, new ServerHandler());
                            }
                        });
    
                ChannelFuture channelFuture = serverBootstrap.bind("127.0.0.1", 8080).sync();
                channelFuture.channel().closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    
    
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    
    public class ServerHandler extends SimpleChannelInboundHandler<String> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) {
            System.out.println("接收到客户端信息:" + msg);
        }
    }
    

    启动服务端,然后执行客户端发送数据.

    这里我们借助wireshark工具查看数据包情况


    在这里插入图片描述

    如上图可知, 客户端(端口43262)向服务端(端口8080)首先进行了三次握手,握手之后,客户端一次性向服务端发送了1142个字节内容,之后进行了四次挥手.

    接下来看一下服务端的打印日志情况.

    在这里插入图片描述

    从上图可以发现,共打印了两次. 客户端发送了一次数据,就把所有的数据发送完了,而服务端却打印了两次,难道是Netty读取了两次TCP中的数据?

    接下来通过debug方式,观察下数据读取情况.

    Netty读取数据的逻辑在以下类方法中

    // 源码位置
    io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
    
    @Override
    public final void read() {
        ...
        byteBuf = allocHandle.allocate(allocator);
        // 读取数据
        allocHandle.lastBytesRead(doReadBytes(byteBuf));
        ...    
    }
    

    继续跟踪doReadBytes方法,最后会调用到

    @Override
    public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
        ensureWritable(length);
        int writtenBytes = setBytes(writerIndex, in, length);
        if (writtenBytes > 0) {
            writerIndex += writtenBytes;
        }
        // 就在此处打断点
        return writtenBytes;
    }
    

    客户端重新发送数据,再进行测试


    在这里插入图片描述

    会发现,读取了1024个字节,放行

    在这里插入图片描述

    如上图,第二次读取了118字节. 两次加起来1024+118=1142个字节,和客户端发送的数据一致.
    当然以上是我们通过debug方式查看的数据读取情况,我们也可以通过ss命令查看数据的读取情况,先让客户端发送数据,然后服务端读取一次数据,再通过debug让服务器暂时停下来,通过ss命令查看TCP接收缓冲区中还剩多少字节.


    在这里插入图片描述

    还剩119个字节,其实就是118个有效数据再加一个结束字节. 其实与我们上面分析的是一致的.

    根据以上分析,客户端一次性把1142个字节发送给了服务端,但是服务端分两次才把数据读取完成,而且第一次只读取1024个字节.

    如果这个时候你认为文章标题的答案是1024个字节,那其实也是不对的.
    我们假设一种场景,客户端在一直急速地给服务端发送数据. 第一次Netty会使用1024字节大小的Buffer去读取TCP接收缓冲区中的数据,当读取完成之后,Netty发现分配的1024字节大小的Buffer都用来装数据了,那么Netty猜测后面应该还会有更多的数据,那么Netty下次就会分配16384字节大小的Buffer用来读取TCP接收缓冲区中的数据,如果16384字节大小的Buffer也被装满了数据,说明后面可能还会有更多数据,因此还会分配比16384更大的Buffer用来装数据. 假如分配的16384字节大小的Buffer在读取数据之后没有被装满,说明TCP接收缓冲区中的数据可能不是很多,那么Netty就会分配比16384字节小的Buffer用来装下一次要读取的数据.

    总之,Netty会根据分配Buffer的大小和实际读取到的数据大小之间的关系,来决定下一步是增大Buffer还是减小Buffer的大小.

    核心代码如下

    // 源码位置
    io.netty.channel.AdaptiveRecvByteBufAllocator.HandleImpl#record
    
    private void record(int actualReadBytes) {
        if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT - 1)]) {
            if (decreaseNow) {
                index = max(index - INDEX_DECREMENT, minIndex);
                // 减小下次读取字节的大小
                nextReceiveBufferSize = SIZE_TABLE[index];
                decreaseNow = false;
            } else {
                decreaseNow = true;
            }
        } else if (actualReadBytes >= nextReceiveBufferSize) {
            index = min(index + INDEX_INCREMENT, maxIndex);
            // 增大下次读取字节的大小
            nextReceiveBufferSize = SIZE_TABLE[index];
            decreaseNow = false;
        }
    }
    
    

    还有一点需要说明的是,假如客户端发送了非常多的数据过来,难道服务端必须一直读取这个Channel里的数据吗?
    当然不是, 默认Netty只会读取Channel里面的数据16次,如果在16次的机会里,还是没有读取完这个Channel里面的数据,那么暂时就不会读取这个Channel里面的数据了,Netty需要去处理其他事情(比如轮询IO事件,处理IO事件,执行task任务等),只有当下次轮循到IO事件的时候,才会继续读取之前没有读完的数据.

    image.png

    Netty使用的是水平触发,因此即便客户端不发送数据了,Netty依然可以把之前没有读取完的数据,继续读取.

    相关文章

      网友评论

          本文标题:Netty接收数据时一次读取多少字节以及读多少次

          本文链接:https://www.haomeiwen.com/subject/pfjyiltx.html