IO 演进

作者: 鱼蛮子9527 | 来源:发表于2022-05-17 09:49 被阅读0次

    写程序难免与 IO 打交道,无论是文件 IO、网络 IO,涉及到数据的存储、交换,大多数情况都会见到IO 的身影。这次一起看下 IO 相关的一些知识,主要以网络 IO 为例进行展开。

    基础概念

    首先讲下几个概念,这里看不懂也没关系,结合下文就会明白了,这些概念都是基于 Linux 系统。

    FD

    在 Linux 下,一切皆文件。无论是普通文件,目录文件,字符设备文件(如键盘,鼠标...),块设备文件(如硬盘,光驱...),套接字(网络通信)等等,所有一切均抽象成文件,提供了统一的接口,方便应用程序调用。

    FD 是 File descriptor 的缩写,也就是文件描述符。当应用程序请求内核打开、新建一个文件时,内核会返回一个非负整数(每个进程都从 0 开始分配)用于标记对应这个打开、新建的文件,这个数字就是文件描述符,文件描述符可以看成是文件的 id。后续应用程序都是通过这个文件描述符来操作相应的文件。

    系统为了维护文件描述符建立了 3 个表:

    • 进程级的文件描述符表
    • 系统级的文件描述符表
    • 文件系统的i-node表
    查找文件过程

    通过上图可以看出,每个进程有自己的文件描述符表,通过文件描述符表可以关联到系统的文件描述符表,而系统描述符表存储了 inode 指针,通过inode指针就可以找到真实的文件进行操作了。而什么是 inode 表可以看下阮一峰大佬的博客,写的非常清楚。

    文件描述符限制

    文件描述符是一个重要的系统资源,理论上系统内存多大就应该可以打开多少个文件描述符,但是实际情况是,内核会有系统级限制,以及用户级限制(不让某一个应用程序进程消耗掉所有的文件资源),可以通过下面命令进行查看、修改。

    用户级限制 系统级限制
    查看 ulimit -n sysctl -a | grep -i file-max --color;
    cat /proc/sys/fs/file-max
    修改(重启失效) ulimit -SHn 65535 sysctl -w fs.file-max=65535;
    echo 65535 > /proc/sys/fs/file-max
    修改(永久生效) 修改配置文件:
    /etc/security/limits.conf
    在最后一行加入:
    * - nofile 65535
    修改配置文件:
    /etc/sysctl.conf
    在任意位置加入:
    fs.file-max=65535
    执行(不执行则不生效):
    sysctl -p

    内核态、用户态

    从上图可以看出来 Linux 系统大致分为这三层。

    内核态从本质上说就是我们所说的内核,它是一种特殊的软件程序,用于控制计算机的硬件资源,例如协调 CPU 资源,分配内存资源,并且提供稳定的环境供应用程序运行。

    而用户态就是提供应用程序运行的空间,为了使应用程序访问到内核管理的资源例如 CPU,内存,I/O。内核必须提供一组通用的访问接口,这些接口就叫系统调用就是图中 system-call interface to the kernel。

    系统调用

    系统调用时操作系统的最小功能单位。根据不同的应用场景,不同的 Linux 发行版本提供的系统调用数量也不尽相同,大致在 240-350 之间。这些系统调用组成了用户态跟内核态交互的基本接口,例如:用户态想要申请一块 20K 大小的动态内存,就需要 brk 系统调用。

    在执行系统调用的时候就会从用户态切换到内核态,而调用结束后又会从内核态切换到用户态,整个过程对资源的消耗较大。因为过程中会涉及到 CPU 的中断(软中断)、堆栈的切换、寄存器的保存等各种现场保护工作,以便待调用结束之后程序能继续执行下去。

    总体来说系统调用是昂贵的。

    IO演进

    BIO

    BIO(Blocking IO)即阻塞 I/O,数据的读取写入必须阻塞在一个线程内,直至 IO 完成。

    
    /**
     * BIO实现的Server
     * @author 鱼蛮 on 2021/3/8
     **/
    @Slf4j
    public class ServerBio {
        public static void main(String[] args) throws IOException {
            ServerSocket serverSocket = new ServerSocket(8090);
            log.info("start server at {}", serverSocket.getLocalPort());
            while (true) {
                Socket socket = serverSocket.accept();
                log.info("accept socket,ip:{},port:{}", socket.getInetAddress().getHostAddress(), socket.getPort());
                new Thread(() -> {
                    String line;
                    try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                         BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()))) {
                        while (true) {
                            // 读取
                            line = bufferedReader.readLine();
                            log.info("socket:{},input:{}", socket.getPort(), line);
    
                            // 应答
                            bufferedWriter.write("receive:" + line);
                            bufferedWriter.newLine();
                            bufferedWriter.flush();
                        }
                    } catch (Exception e) {
                        // do nothing
                    }
                }).start();
            }
        }
    }
    

    上面这段程序就是基于 BIO 模型的一个简单服务端实现,我们可以看到由于是阻塞 IO,当主线程接收到一个连接之后如果不想被 Block 住,那么就必须新建一个线程来处理这个连接,这样才能接续监听新的请求。当然如果分成两个线程一个负责接收,一个负责处理请求,接收线程的确不会被 Block 住,但是负责处理请求的线程会一直被某个 Socket Block 住,直至处理完成才会处理下一个。

    也就是说基于 BIO 模型,每来一个新连接都必须建立一个新的线程与之对应进行处理。而线程其实是系统宝贵的资源,本身系统线程数量就存在限制,而且太多的线程将导致过多的 CPU 资源浪费在线程调度上面。所以基于 BIO 的模型无法支撑单机过多的连接访问,大家可以搜索看下 C10K 问题。

    但是 BIO 模型简单、易懂,在少量连接情况下会有很好的性能表现。

    NIO

    NIO(Non-blocking IO)即非阻塞 I/O,Socket 的数据读取、写入将不再是阻塞的,当没有数据输入的时候将返回特殊的值。

    
    /**
     * NIO实现的Server
     * @author 鱼蛮 on 2021/3/8
     **/
    @Slf4j
    public class ServerNio {
        public static void main(String[] args) throws IOException {
            LinkedList<SocketChannel> sockets = new LinkedList<>();
            ServerSocketChannel serverSocket = ServerSocketChannel.open();
            serverSocket.bind(new InetSocketAddress(8090));
            serverSocket.configureBlocking(false);
            log.info("start server at {}", serverSocket.socket().getLocalPort());
            while (true) {
                SocketChannel socketChannel = serverSocket.accept();
                if (Objects.isNull(socketChannel)) {
                    // do nothing
                } else {
                    log.info("accept socket,ip:{},port:{}", socketChannel.socket().getInetAddress().getHostAddress(), socketChannel.socket().getPort());
                    socketChannel.configureBlocking(false);
                    sockets.add(socketChannel);
                }
    
                ByteBuffer buffer = ByteBuffer.allocate(2048);
                for (SocketChannel item : sockets) {
                    // 读取
                    if (item.read(buffer) <= 0) {
                        continue;
                    }
                    buffer.flip();
                    byte[] bytes = new byte[buffer.limit()];
                    buffer.get(bytes);
                    String recStr = new String(bytes);
                    buffer.clear();
                    log.info("socket:{},input:{}", item.socket().getPort(), recStr);
    
                    // 应答
                    buffer.put(("receive:" + recStr).getBytes(StandardCharsets.UTF_8));
                    buffer.flip();
                    item.write(buffer);
                    buffer.clear();
                }
                try {
                    TimeUnit.MILLISECONDS.sleep(10L);
                } catch (InterruptedException e) {
                    // do nothing
                }
            }
        }
    }
    

    可以看出来使用 NIO 模型,只用一个线程就可以建立服务器端了,而且可以同时处理多个 Socket 的请求。但其实当调用 SocketChannel.read() 的时候实际执行的是“系统调用”,因为应用程序是不知道网络数据是否已经就绪,我们是需要询问内核来读取数据的。

    我们上面说了“系统调用”是很昂贵的,当存在很多 Socket,而这些 Socket 又没多活跃的时候,这样频繁的系统调用就造成了很大的资源浪费。

    IO 多路复用

    由于频繁的执行“系统调用”造成了过多的资源浪费,那我们能不能一次传递多个 Socket 给内核,内核再一次性告诉我们那些已经完成了呢?这样就减少了“系统调用”的次数。因此产生了多路复用技术,多路复用基本思想就是一次传递多个需要询问的对象,内核告诉我哪些已经准备就绪。

    select

    最早出现的是 select 函数,我们先看下函数方法,可以通过:mau 2 select 查看这个方法的介绍。

    select(int nfds, fd_set *restrict readfds, fd_set *restrict writefds, fd_set *restrict errorfds, struct timeval *restrict timeout);
    

    在 select 方法中,支持了传入 readfds,即多个“文件描述符”也就是多个 Socket。但是一次传入的数量受操作系统的限制。

    poll

    poll(struct pollfd fds[], nfds_t nfds, int timeout);
    

    之后出现的 poll 函数整体功能与 select 一致,可以看成是 select 的增强版本,去掉了传入文件描述符数量的限制。

    epoll

    select/poll 虽然解决了频繁进行“系统调用”的问题,但是传递 fd 的时候依然需要从用户态 copy 到内核态,内核态也需要频繁的遍历这些 fd,而且这个过程是循环进行,还是造成了不少的资源浪费。而 epoll 的出现就是为了解决这个问题,epoll 核心命令为:

    • epoll_create:在内核申请一块内存区域用于存储需要监控的 fd,采用红黑树结构存储。
    • epoll_ctl:将需要监控的 fd 添加到上面创建的集合里面。
    • epoll_wait:获取就绪的 fd 集合,操作系统会将执行就绪的 fd 放入就绪集合中(采用链表存储),epoll_wait 检查这个集合即可。

    使用 epoll 很好的解决了上面的问题,总体来说 epoll 在内核态开辟了两个集合,采用了空间换时间的方式来提升执行效率。

    
    /**
     * NIO实现的Server,利用了多路复用器,NIO最终实现方式是由系统决定,默认为epoll,也可能是select或者poll,可以通过参数指定
     * socket() = 4
     * fcntl(4, NONBLOCK)
     * bind(4, 8090)
     * listen(4)
     *
     * epoll_create(256) = 7
     *
     * epoll_ctl(7, EPOLL_CTL_ADD, 4)
     *
     * while(true) {
     *     epoll_wait(7, 2000)
     *
     *     // 新连接
     *     accept(4) = 8
     *     epoll_ctl(7, EPOLL_CTL_ADD, 8)
     * }
     *
     * @author 鱼蛮 on 2021/3/8
     **/
    @Slf4j
    public class ServerNioMultiplex {
    
        private Selector selector;
    
        private ByteBuffer buffer;
    
        private void initServer(int port) throws IOException {
            ServerSocketChannel serverSocket = ServerSocketChannel.open();
            serverSocket.configureBlocking(false);
            serverSocket.bind(new InetSocketAddress(port));
            // 对epoll相当于epoll_create,而对select,poll相当于建了个数组而已
            this.selector = Selector.open();
            // 对epoll相当于epoll_ctl,而对select,poll相当于网数组里面添加一个值
            serverSocket.register(this.selector, SelectionKey.OP_ACCEPT);
    
            buffer = ByteBuffer.allocate(2048);
            log.info("start server at {}", serverSocket.socket().getLocalPort());
    
        }
    
        public void start(int port) throws IOException {
            // 初始化服务
            initServer(port);
    
            // 监听服务
            while (true) {
                // 对epoll相当于epoll_wait,而对select,poll相当select(num,fds)
                if (this.selector.select(2000) <= 0) {
                    continue;
                }
                Set<SelectionKey> keys = this.selector.selectedKeys();
                Iterator<SelectionKey> it = keys.iterator();
                while (it.hasNext()) {
                    SelectionKey key = it.next();
                    it.remove();
    
                    // 客户端已经发送数据
                    if (key.isReadable()) {
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        int readCount = socketChannel.read(buffer);
                        // 客户端断开连接
                        if (readCount == -1) {
                            key.cancel();
                            socketChannel.close();
                            log.info("socket:{},exit", socketChannel.socket().getPort());
                            continue;
                        }
                        buffer.flip();
                        byte[] bytes = new byte[buffer.limit()];
                        buffer.get(bytes);
                        String recStr = new String(bytes);
                        log.info("socket:{},readCount:{},input:{}", socketChannel.socket().getPort(), readCount, recStr);
                        buffer.clear();
    
                        // 应答
                        buffer.put(("receive:" + recStr ).getBytes(StandardCharsets.UTF_8));
                        buffer.flip();
                        socketChannel.write(buffer);
                        buffer.clear();
                    }  // 新客户端连接
                    else if (key.isAcceptable()) {
                        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
                        SocketChannel socketChannel = serverSocketChannel.accept();
                        socketChannel.configureBlocking(false);
                        socketChannel.register(this.selector, SelectionKey.OP_READ);
                        log.info("accept socket,ip:{},port:{}", socketChannel.socket().getInetAddress().getHostAddress(), socketChannel.socket().getPort());
                    }
                }
            }
        }
    
        public static void main(String[] args) throws IOException {
            ServerNioMultiplex serverNioMultiplex = new ServerNioMultiplex();
            serverNioMultiplex.start(8090);
        }
    }
    

    上面这段程序就是使用多路复用的一个小例子,Java NIO API 会根据操作系统来选择使用 select、poll、epoll,默认使用的是 epoll。

    在一个线程中如果既处理新 Socket 的连接,又处理已连接 Socket 的读写,可能会因为某个连接的处理较慢而导致整个过程阻塞。所以一般的服务框架会采用多线程进行处理,例如一个线程负责监听新连接的进入,再开几个线程来分别负责多个 Socket 的读写。下面这段程序就是多路复用的多线程版本,Tomcat 的 NIO 线程模型也是类似的处理过程。

    
    /**
     * NIO实现的Server,利用了多路复用器,多线程版本,跟tomcat的nio模型类似
    
     * @author 鱼蛮 on 2021/3/8
     **/
    @Slf4j
    public class ServerNioMultiplexMT {
    
        public void start(int port, int threadNum) throws IOException {
            ServerSocketChannel serverSocket = ServerSocketChannel.open();
            serverSocket.configureBlocking(true);
            serverSocket.bind(new InetSocketAddress(port));
    
            AtomicInteger counter = new AtomicInteger();
            Poller[] pollers = new Poller[threadNum];
    
            for (int i = 0; i < threadNum; i++) {
                pollers[i] = new Poller();
                new Thread(pollers[i]).start();
            }
            log.info("start server at {}", serverSocket.socket().getLocalPort());
    
            // 主线程监听连接
            while (true) {
                SocketChannel socketChannel = serverSocket.accept();
                log.info("accept socket,ip:{},port:{}", socketChannel.socket().getInetAddress().getHostAddress(), socketChannel.socket().getPort());
                pollers[counter.getAndIncrement() % pollers.length].register(socketChannel);
            }
        }
    
        public static class Poller implements Runnable {
    
            private final Selector selector;
    
            private final ByteBuffer buffer;
    
            private final BlockingQueue<SocketChannel> queue;
    
            public Poller() throws IOException {
                this.selector = Selector.open();
                this.buffer = ByteBuffer.allocate(2048);
                queue = new LinkedBlockingDeque<>(1024);
            }
    
            @SneakyThrows
            public void register(SocketChannel socketChannel) {
                this.queue.put(socketChannel);
            }
    
            @SneakyThrows
            @Override
            public void run() {
                while (true) {
                    SocketChannel socketChannel = queue.poll();
                    if (Objects.nonNull(socketChannel)) {
                        socketChannel.configureBlocking(false);
                        socketChannel.register(selector, SelectionKey.OP_READ);
                    }
                    // 没有事件
                    if (this.selector.select(1000) <= 0) {
                        continue;
                    }
                    Set<SelectionKey> keys = this.selector.selectedKeys();
                    Iterator<SelectionKey> it = keys.iterator();
                    while (it.hasNext()) {
                        SelectionKey key = it.next();
                        it.remove();
                        // 客户端是否已经发送数据
                        if (!key.isReadable()) {
                            continue;
                        }
                        SocketChannel channel = (SocketChannel) key.channel();
                        // 读取数据
                        int readCount = channel.read(buffer);
                        // 客户端断开连接
                        if (readCount == -1) {
                            key.cancel();
                            channel.close();
                            log.info("socket:{},exit", channel.socket().getPort());
                            continue;
                        }
                        buffer.flip();
                        byte[] bytes = new byte[readCount];
                        buffer.get(bytes);
                        String recStr = new String(bytes);
                        log.info("socket:{},readCount:{},input:{}", channel.socket().getPort(), readCount, recStr);
                        buffer.clear();
    
                        // 应答
                        buffer.put(("receive:" + recStr ).getBytes(StandardCharsets.UTF_8));
                        buffer.flip();
                        channel.write(buffer);
                        buffer.clear();
                    }
                }
            }
        }
    
        public static void main(String[] args) throws IOException {
            ServerNioMultiplexMT serverNioMultiplexMT = new ServerNioMultiplexMT();
            serverNioMultiplexMT.start(8090, 2);
        }
    }
    

    到这里我们可以看出 IO 技术的发展像是一个逐步解决问题的过程,其实整体技术的发展不也是这个过程吗?拿数据库举例,当单表瓶颈的时候我们需要分表,当分表也存储不下或者单机瓶颈的时候就需要分库。

    同步 IO、异步 IO

    说下我的理解,内核态读取完毕数据后是否主动写入到用户空间,如果是则为异步,否则都为同步。像 BIO、NIO、多路复用的 select、poll、epoll 都属于同步 IO。

    在 Windows 系统中提供的IOCP机制实现了真正的异步 IO,Linux 系统中虽然也有异步 IO 的一些实现但是都不算成熟,存在一定性能问题。Java的 AIO Api 在 Windows 系统是使用的操作系统机制实现,而在 Linux 系统中是使用自建线程池来模拟实现的。这也是造成异步 IO 在 Linux 环境中没有大规模应用的原因吧,像很多知名的中间件如 Redis、Nignx 都是使用 epoll 机制实现。

    其他

    开启服务三部曲

    任何网络服务开启都需要如下三次系统调用来完成服务的启动、绑定端口、监听。

    • socket
    • bind
    • listen

    epoll 三部曲

    使用 epoll 机制的都需要如下三次系统调用来使用 epoll

    • epoll_create
    • epoll_ctl
    • epoll_wait

    系统调用查看

    可以使用 strace 命令进行进程的系统调用追踪,同样可以用 man strace 进行详细的命令介绍查看。

    示例里面使用的是 slf4j 进行的日志输出,最好修改为 System.out.println 就不需要再去加载依赖包了。例如我们使用 strace 命令监控一段我们上面的程序:strace -ff -o out java com/blackbread/io/ServerNioMultiplex

    我们就可以观察到下面的输出信息,下图是 socket 的创建、监听过程。

    socket创建

    下面是 epoll 的创建、使用过程。

    epoll创建 添加监听

    下面是一个新客户端连入的过程,以及对新客户端的读写过程

    新客户端接入 分配描述符 描述符监控 读取输入 输出信息

    简单客户端连接工具

    nc命令,例如启动上面任意一段代码的程序后,执行 nc 127.0.0.1 8090,在控制台将输出如下信息:

    开启服务

    在命令行窗口输入如下信息之后

    nc 连接

    在控制台也将受到输入,并反馈信息给 nc 的客户端。

    服务端交互

    文件描述符(File Descriptor)简介

    相关文章

      网友评论

          本文标题:IO 演进

          本文链接:https://www.haomeiwen.com/subject/ebggurtx.html