美文网首页Android 面试面试精选
从内核接收数据到EPOLL原理

从内核接收数据到EPOLL原理

作者: 进击的蚂蚁zzzliu | 来源:发表于2021-03-27 16:56 被阅读0次

    一、内核接收数据流程

    内核接收数据.png
    1.网卡发现 MAC 地址符合,就将包收进来;发现 IP 地址符合,根据 IP 头中协议项,知道上一层是 TCP 协议;
    2.DMA把TCP数据包copy到内核缓冲区;
    3.触发CPU中断,中断程序摘除TCP头通过socket五要素(源IP/PORT、目的IP/PORT、协议)找到对应的socket文件,并把原始二进制数据报copy到socket接收缓冲区;
    4.中断程序唤醒被阻塞的内核线程;
    5.内核线程切换到用户线程把数据从socket接口缓冲区copy到应用内存;

    二、中断处理流程

    中断处理.png

    I/O发出的信号的异常代码,拿到异常代码之后,CPU就会触发异常处理的流程。计算机在内存里会保存中断向量表,用来存放不同的异常代码对应的异常处理程序所在的地址。
    CPU在拿到了异常码之后,会先把当前的程序执行的现场,保存到程序栈里面,然后根据异常码查询,找到对应的异常处理程序,最后把后续指令执行的指挥权,交给这个异常处理程序。
    异常处理程序结束之后返回到原来指令执行的位置继续执行;

    三、阻塞不占用 cpu

    网卡何时接收到数据是依赖发送方和传输路径的,这个延迟通常都很高,是毫秒(ms)级别的。而应用程序处理数据是纳秒(ns)级别的。也就是说整个过程中,内核态等待数据,处理协议栈是个相对很慢的过程。这么长的时间里,用户态的进程是无事可做的,因此用到了“阻塞(挂起)”。

    阻塞是进程调度的关键一环,指的是进程在等待某事件发生之前的等待状态。请看下表,在 Linux 中,进程状态大致有 7 种(在 include/linux/sched.h 中有更多状态):



    从说明中可以发现,“可运行状态”会占用 CPU 资源,另外创建和销毁进程也需要占用 CPU 资源(内核)。重点是,当进程被"阻塞/挂起"时,是不会占用 CPU 资源的。

    为了支持多任务,Linux 实现了进程调度的功能(CPU 时间片的调度)。而这个时间片的切换,只会在“可运行状态”的进程间进行。因此“阻塞/挂起”的进程是不占用 CPU 资源的。

    四、工作队列和等待队列

    工作队列和等待队列.png
    工作队列:为了方便时间片的调度,所有“可运行状态”状态的进程组成的队列;
    fd文件列表:内核打开的文件句柄,Linux一切皆文件,用户线程执行创建Socket时内核就会创建一个由文件系统管理的sock对象;
    sock:socket内核中的数据结构,主要包含发送缓冲区、接收缓冲区、等待队列;
    struct sock {
        __u32   daddr;  /* 外部IP地址   */
        __u32   rcv_saddr; /* 绑定的本地IP地址  */
        __u16   dport;  /* 目标端口   */
        __u16   sport;  /* 源端口    */
        unsigned short  family;  /* 地址簇   */
        int   rcvbuf;  /* 接受缓冲区长度(单位:字节) */
        struct sk_buff_head receive_queue; /* 接收包队列   */
        int   sndbuf;  /* 发送缓冲区长度(单位:字节)  */
        struct sk_buff_head write_queue; /* 包发送队列   */
        wait_queue_head_t *sleep;  /* 等待队列,通常指向socket的wait域 */
        ......
    }
    

    等待队列:等待当前socket的线程;

    工作队列中线程执行到阻塞操作等待socket时,会从工作队列中移除,移动到该socket的等待队列中;当socket接收到数据后,操作系统将该socket等待队列上的进程重新放回到工作队列,该进程变成运行状态,继续执行代码。

    五、BIO

    public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = new ServerSocket(9000);
        while (true) {
            // 没有连接-阻塞
            Socket socket = serverSocket.accept();
    
            byte[] bytes = new byte[1024];
            InputStream inputStream = socket.getInputStream();
            while (true) {
                // 没有数据-阻塞
                int read = inputStream.read(bytes);
                if (read != -1) {
                    System.out.println(new String(bytes, 0, read));
                } else {
                    break;
                }
            }
            
            socket.close();
        }
    }
    

    BIO模式存在两个阻塞点,一个时accept阻塞等待客户端连接,一个是阻塞等待socket请求数据;简单跟以下源码就会发现
    new ServerSocket(9000)最终通过

    • int newfd = socket0(stream, false /*v6 Only*/);调用Linux int socket(int domain, int type, int protocol);创建服务端socket;
    • bind0(nativefd, address, port, exclusiveBind);调用Linux int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);绑定端口;
    • listen0(nativefd, backlog);调用Linux int listen(int sockfd, int backlog);设置监听;

    接下来serverSocket.accept()最终通过

    • newfd = accept0(nativefd, isaa);调用Linux int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);阻塞等待客户端连接,会创建一个socket用于跟该客户端进行通信,返回socket文件描述符fd;

    最后inputStream.read(bytes);调用Linux ssize_t recv(int sockfd, void *buf, size_t len, int flags);阻塞从socket读缓冲区读取客户端请求数据;

    可以通过 man 命令查看Linux 系统调用方法具体描述;

    通过传统BIO的操作方式可以看出一个请求必须要创建一个内核线程进行处理,recv只能监视单个socket,当并发比较高时就会消耗大量系统资源,也就是所谓的C10K问题;那么如何解决这个问题呢?后面出现的多路复用 select / poll / epoll思路都是使用一个线程来处理若干个连接(监视若干个socket),类似餐厅服务员的角色。

    六、select

    select 方案是用一个 fd_set 结构体来告诉内核同时监控多个socket,当其中有socket的状态发生变化或超时,则调用返回。之后应用使用 FD_ISSET 来逐个查看是哪个socket的状态发生了变化。

    6.1 select原理

    以下面select伪代码为例:

    int s = socket(AF_INET, SOCK_STREAM, 0);  
    bind(s, ...)
    listen(s, ...)
    //接受客户端连接
    int c = accept(s, ...)
    int readSet[] =  存放需要监听的socket文件描述符
    while(1){
        int n = select(..., readSet, ...)
        for(int i=0; i < readSet.count; i++){
            if(FD_ISSET(readSet[i], ...)){
                //fds[i]的数据处理
            }
        }
    }
    
    select 系统调用函数
    # maxfd:表示的是待测试的描述符基数,它的值是待测试的最大描述符加 1
    # readset:读描述符集合
    # writeset:写描述符集合
    # exceptset:错误描述符集合
    # timeout:超时时间
    int select(int maxfd, fd_set *readset, fd_set *writeset, fd_set *exceptset, const struct timeval *timeout);
    

    上面示例代码中,先准备一个数组 readSet 存放着所有需要监视读事件的socket。然后调用select,如果 readSet 中的所有socket都没有数据,select会阻塞,直到有一个socket接收到数据,select返回,唤醒线程。用户可以遍历 readSet,通过FD_ISSET (对这个数组进行检测,判断出对应socket的元素 readSet[fd]是 0 还是 1)判断具体哪个 socket 收到数据,然后做出处理。

    6.2 select流程
    select-1.png

    线程 A 调用 select readSet 数组元素为sock1、sock2、sock3 ,此时3个socket 都没有数据可读,就把线程A 从工作队列中移除,并分别添加到3个sock 的等待队列中;


    select-2.png

    当3个sock中任意一个有数据可读时,中断程序都会把线程A 从所有sock等待队列中移除并重新加入工作队列,等待cpu时间片继续执行(即从select中返回)。但是此时线程A 并不知道哪个socket有数据,于是还要遍历readSet使用 FD_ISSET 来逐个查看是哪个socket的有数据可读。

    6.3 select的缺点

    1、每次调用select都需要将线程加入到所有监视socket的等待队列,每次唤醒都需要从每个队列中移除。这里涉及了两次遍历,而且每次都要将整个fd_set列表传递给内核,有一定的开销。正是因为遍历操作开销大,出于效率的考量,才会规定select的最大监视数量,默认只能监视1024个socket。
    2、由于只有一个字段记录关注和发生事件,每次调用之前要重新初始化 fd_set 结构体。
    3、线程被唤醒后,程序并不知道哪些socket收到数据,还需要遍历一次。

    七、poll

    总结以下 select 的缺点就是句柄上限+重复初始化+逐个排查所有文件句柄状态效率不高。poll 主要解决 select 的前两个问题:通过改变跟内核交互的数据结构突破了文件描述符的限制,同时使用不同字段分别标注关注事件和发生事件,来避免重复初始化。

    int poll(struct pollfd *fds, unsigned long nfds, int timeout); 
    
    # fd:描述符 fd
    # events:描述符上待检测的事件类型events,注意这里的 events 可以表示多个不同的事件,
    #         具体的实现可以通过使用二进制掩码位操作来完成,例如,POLLIN 和 POLLOUT 可以表示读和写事件
    # revents:
    struct pollfd {
        int    fd;       /* file descriptor */    
        short  events;   /* events to look for */    
        short  revents;  /* events returned */ 
    };
    

    和 select 非常不同的地方在于:

    • poll 每次检测之后的结果不会修改原来的传入值,而是将结果保留在 revents 字段中,这样就不需要每次检测完都得重置待检测的描述字和感兴趣的事件。
    • 在 select 里面,文件描述符的个数已经随着 fd_set 的实现而固定,没有办法对此进行配置;而在 poll 函数里,可以控制 pollfd 结构的数组大小,这意味着可以突破原来 select 函数最大描述符的限制,在这种情况下,应用程序调用者需要分配 pollfd 数组并通知 poll 函数该数组的大小。

    八、epoll

    epoll 使用 eventpoll 作为中间层,线程不用加入在被监视的每一个 socket 阻塞队列中,也不用再遍历 socket 列表查看哪些有事件发生。
    epoll提供了三个函数:

    # 创建了一个 epoll 实例 epollevent
    int epoll_create(int size);
    
    # 往这个 epoll 实例增加或删除监控的事件
    # epfd:epoll_create 创建的 epoll 实例句柄
    # op:增加还是删除一个监控事件
    # fd:注册的事件的文件描述符
    # event:注册的事件类型,并且可以在这个结构体里设置用户需要的数据
    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
    
    # 类似之前的 poll 和 select 函数,调用者进程被挂起,在等待内核 I/O 事件的分发
    # epfd:实例描述字,也就是 epoll 句柄
    # events:用户空间需要处理的 I/O 事件,这是一个数组,数组的大小由 epoll_wait 的返回值决定,这个数组的每个元素都是一个需要待处理的 I/O 事件,其中 events 表示具体的事件类型,事件类型取值和 epoll_ctl 可设置的值一样,这个 epoll_event 结构体里的 data 值就是在 epoll_ctl 那里设置的 data,也就是用户空间和内核空间调用时需要的数据
    # maxevents:大于 0 的整数,表示 epoll_wait 可以返回的最大事件值
    # timeout:阻塞调用的超时值,如果这个值设置为 -1,表示不超时;如果设置为 0 则立即返回,即使没有任何 I/O 事件发生
    int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
    
    8.1 代码示例
    public class ServerConnect {
        private static final int BUF_SIZE = 1024;
        private static final int PORT = 8080;
        private static final int TIMEOUT = 3000;
    
        public static void main(String[] args) {
            selector();
        }
        public static void selector() {
            Selector selector = null;
            ServerSocketChannel ssc = null;
            try {
                // 打开一个Channel
                ssc = ServerSocketChannel.open();
                // 将Channel绑定端口
                ssc.socket().bind(new InetSocketAddress(PORT));
                // 设置Channel为非阻塞,如果设置为阻塞,其实和BIO差不多了。
                ssc.configureBlocking(false);
                // 打开一个Slectore
                selector = Selector.open();
                // 向selector中注册Channel和感兴趣的事件
                ssc.register(selector, SelectionKey.OP_ACCEPT);
                while (true) {
                    // selector监听事件,select会被阻塞,直到selector监听的channel中有事件发生或者超时,会返回一个事件数量
                    //TIMEOUT就是超时时间,selector初始化的时候会添加一个用于主动唤醒的pipe,待会源码分析会说
                    if (selector.select(TIMEOUT) == 0) {
                        System.out.println("==");
                        continue;
                    }
                    /**
                     * SelectionKey的组成是selector和Channel
                     * 有事件发生的channel会被包装成selectionKey添加到selector的publicSelectedKeys属性中
                     * publicSelectedKeys是SelectionKey的Set集合
                     *下面这一部分遍历,就是遍历有事件的channel
                     */
                    Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        if (key.isAcceptable()) {
                            handleAccept(key);
                        }
                        if (key.isReadable()) {
                            handleRead(key);
                        }
                        if (key.isWritable() && key.isValid()) {
                            handleWrite(key);
                        }
                        if (key.isConnectable()) {
                            System.out.println("isConnectable = true");
                        }
                        //每次使用完,必须将该SelectionKey移除,否则会一直存储在publicSelectedKeys中
                        //下一次遍历又会重复处理
                        iter.remove();
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    if (selector != null) {
                        selector.close();
                    }
                    if (ssc != null) {
                        ssc.close();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        public static void handleAccept(SelectionKey key) throws IOException {
            ServerSocketChannel ssChannel = (ServerSocketChannel) key.channel();
            SocketChannel sc = ssChannel.accept();
            sc.configureBlocking(false);
            sc.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocateDirect(BUF_SIZE));
        }
        public static void handleRead(SelectionKey key) throws IOException {
            SocketChannel sc = (SocketChannel) key.channel();
            ByteBuffer buf = (ByteBuffer) key.attachment();
            long bytesRead = sc.read(buf);
            while (bytesRead > 0) {
                buf.flip();
                while (buf.hasRemaining()) {
                    System.out.print((char) buf.get());
                }
                System.out.println();
                buf.clear();
                bytesRead = sc.read(buf);
            }
            if (bytesRead == -1) {
                sc.close();
            }
        }
        public static void handleWrite(SelectionKey key) throws IOException {
            ByteBuffer buf = (ByteBuffer) key.attachment();
            buf.flip();
            SocketChannel sc = (SocketChannel) key.channel();
            while (buf.hasRemaining()) {
                sc.write(buf);
            }
            buf.compact();
        }
    }
    

    调用到内核的流程如下:
    1、ssc = ServerSocketChannel.open() -> EPollSelectorProvider -> int socket(int domain, int type, int protocol);
    2、ssc.socket().bind(newInetSocketAddress(PORT)); -> int bind(int sockfd, const struct sockaddr addr, socklen_t addrlen);
    3、getImpl().listen(backlog); -> int listen(int sockfd, int backlog);
    4、ssc.configureBlocking(false); -> int fcntl(int fd, int cmd, ... /
    arg */ );
    5、selector=Selector.open(); -> EPollSelectorImpl epollCreate() -> int epoll_create(int size);
    6、ssc.register(selector,SelectionKey.OP_ACCEPT); -> int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
    while
    7、selector.select(TIMEOUT) -> int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

    8.2 epoll流程
    epoll.png

    如图,eventpoll 主要包含3个结构

    • 监视队列:epoll_ctl添加或删除所要监听的socket。例如:图中通过epoll_ctl添加sock1、sock2的监视,内核会将eventpoll添加到这socket的等待队列中(图中虚线)。为了方便的添加和移除,还要便于搜索,以避免重复添加,使用红黑树结构进行存储(搜索、插入和删除时间复杂度都是O(log(N)),效率较好。
    • 等待队列:eventpoll也是一个文件句柄,因此也有等待队列,记录阻塞的线程,例如:图中线程A 执行了 epoll_wait 操作,被从工作队列中移除,然后被eventpoll 的等待队列引用;
    • 就绪队列:当某个socket有事件发生时,中断处理程序就会把该socket加入到就绪队列,同时唤醒eventpoll 阻塞队列中的线程,此时线程只需要遍历就绪队列就可以知道哪个socket有事件发生,例如:图中sock2有数据到来时,中断处理程序先把sock2 放入就绪队列中,然后唤醒等待队列中的线程A,这时线程A 被重新加入工作队列中,等到CPU时间片轮询到线程A时,遍历就绪队列中的socket进行处理。

    九、总结

    select,poll,epoll都是IO多路复用机制,即可以同时监视多个文件描述符,一旦某个描述符就绪(读或写就绪),能够通知程序进行相应读写操作

    • select,poll需要多次遍历文件描述符集合 fd_set,把阻塞线程放到每一个socket中,当某个描述符就绪时再把这些线程从socket阻塞队列中移除;而epoll 通过eventpoll作为中介,只需要把eventpoll放到socket 的阻塞队列中,当有描述符就绪时只需要遍历就绪队列,而不需要遍历所有fd;
    • select,poll每次调用都要把整个fd_set集合从用户态往内核态拷贝一次,而epoll只要一次拷贝,这也能节省不少的开销。

    相关文章

      网友评论

        本文标题:从内核接收数据到EPOLL原理

        本文链接:https://www.haomeiwen.com/subject/tkhrhltx.html