美文网首页
[epoll 源码走读] epoll 实现原理

[epoll 源码走读] epoll 实现原理

作者: wenfh2020 | 来源:发表于2020-08-05 07:21 被阅读0次

    文章主要对 tcp 通信进行 epoll 源码走读。

    Linux 源码:Linux 5.7 版本。epoll 核心源码:eventpoll.h / eventpoll.c


    1. 应用场景

    epoll 应用,适合海量用户,一个时间段内部分活跃的用户群体。

    例如 app,正常用户并不是 24 小时都拿起手机玩个不停,可能玩一下,又去干别的事,回头又玩一下,断断续续地操作。即便正在使用 app 也不是连续产生读写通信事件,可能手指点击几下页面,页面产生需要的内容,用户就去浏览内容,不再操作了。换句话说,在海量用户里,同一个时间段内,很可能只有一小部分用户正在活跃,而在这一小部分活跃用户里,又只有一小撮人同时点击页面上的操作。那 epoll 管理海量用户,只需要将这一小撮人产生的事件,及时通知 appserver 处理逻辑即可。

    问题:同样场景,如果用户是机器人,24 小时持续工作,这种场景下使用 epoll 还合适吗?


    2. 预备知识


    3. 使用

    • 接口。
    接口 描述
    epoll_create 创建 epoll。
    epoll_ctl fd 事件注册函数,用户通过这个函数关注 fd 读写事件。
    epoll_wait 阻塞等待 fd 事件发生。
    • 使用流程。
    epoll 使用流程

    🔥文章来源:wenfh2020.com


    4. 事件

    常用事件注释可以请参考 epoll_ctl 文档

    // eventpoll.h
    #define EPOLLIN     (__force __poll_t)0x00000001
    #define EPOLLOUT    (__force __poll_t)0x00000004
    #define EPOLLERR    (__force __poll_t)0x00000008
    #define EPOLLHUP    (__force __poll_t)0x00000010
    #define EPOLLRDHUP  (__force __poll_t)0x00002000
    #define EPOLLEXCLUSIVE  ((__force __poll_t)(1U << 28))
    #define EPOLLET     ((__force __poll_t)(1U << 31))
    
    事件 描述
    EPOLLIN 有可读数据到来。
    EPOLLOUT 有数据要写。
    EPOLLERR 该文件描述符发生错误。
    EPOLLHUP 该文件描述符被挂断。常见 socket 被关闭(read == 0)。
    EPOLLRDHUP 对端已关闭链接,或者用 shutdown 关闭了写链接。
    EPOLLEXCLUSIVE 唯一唤醒事件,主要为了解决 epoll_wait 惊群问题。多线程下多个 epoll_wait 同时等待,只唤醒一个 epoll_wait 执行。 该事件只支持 epoll_ctl 添加操作 EPOLL_CTL_ADD。
    EPOLLET 边缘触发模式。

    通过 tcp_poll 函数,可以看到 socket 事件对应的相关事件逻辑。

    // tcp.c
    /*
     *    Wait for a TCP event.
     *
     *    Note that we don't need to lock the socket, as the upper poll layers
     *    take care of normal races (between the test and the event) and we don't
     *    go look at any of the socket buffers directly.
     */
    __poll_t tcp_poll(struct file *file, struct socket *sock, poll_table *wait) {
        __poll_t mask;
        struct sock *sk = sock->sk;
        const struct tcp_sock *tp = tcp_sk(sk);
        int state;
    
        // fd 添加等待事件,关联事件回调。
        sock_poll_wait(file, sock, wait);
    
        // socket 对应事件逻辑。
        state = inet_sk_state_load(sk);
        if (state == TCP_LISTEN)
            return inet_csk_listen_poll(sk);
    
        /* Socket is not locked. We are protected from async events
         * by poll logic and correct handling of state changes
         * made by other threads is impossible in any case.
         */
    
        mask = 0;
    
        /*
         * EPOLLHUP is certainly not done right. But poll() doesn't
         * have a notion of HUP in just one direction, and for a
         * socket the read side is more interesting.
         *
         * Some poll() documentation says that EPOLLHUP is incompatible
         * with the EPOLLOUT/POLLWR flags, so somebody should check this
         * all. But careful, it tends to be safer to return too many
         * bits than too few, and you can easily break real applications
         * if you don't tell them that something has hung up!
         *
         * Check-me.
         *
         * Check number 1. EPOLLHUP is _UNMASKABLE_ event (see UNIX98 and
         * our fs/select.c). It means that after we received EOF,
         * poll always returns immediately, making impossible poll() on write()
         * in state CLOSE_WAIT. One solution is evident --- to set EPOLLHUP
         * if and only if shutdown has been made in both directions.
         * Actually, it is interesting to look how Solaris and DUX
         * solve this dilemma. I would prefer, if EPOLLHUP were maskable,
         * then we could set it on SND_SHUTDOWN. BTW examples given
         * in Stevens' books assume exactly this behaviour, it explains
         * why EPOLLHUP is incompatible with EPOLLOUT.    --ANK
         *
         * NOTE. Check for TCP_CLOSE is added. The goal is to prevent
         * blocking on fresh not-connected or disconnected socket. --ANK
         */
        if (sk->sk_shutdown == SHUTDOWN_MASK || state == TCP_CLOSE)
            mask |= EPOLLHUP;
        if (sk->sk_shutdown & RCV_SHUTDOWN)
            mask |= EPOLLIN | EPOLLRDNORM | EPOLLRDHUP;
    
        /* Connected or passive Fast Open socket? */
        if (state != TCP_SYN_SENT &&
            (state != TCP_SYN_RECV || rcu_access_pointer(tp->fastopen_rsk))) {
            int target = sock_rcvlowat(sk, 0, INT_MAX);
    
            if (READ_ONCE(tp->urg_seq) == READ_ONCE(tp->copied_seq) &&
                !sock_flag(sk, SOCK_URGINLINE) &&
                tp->urg_data)
                target++;
    
            if (tcp_stream_is_readable(tp, target, sk))
                mask |= EPOLLIN | EPOLLRDNORM;
    
            if (!(sk->sk_shutdown & SEND_SHUTDOWN)) {
                if (sk_stream_is_writeable(sk)) {
                    mask |= EPOLLOUT | EPOLLWRNORM;
                } else {  /* send SIGIO later */
                    sk_set_bit(SOCKWQ_ASYNC_NOSPACE, sk);
                    set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
    
                    /* Race breaker. If space is freed after
                     * wspace test but before the flags are set,
                     * IO signal will be lost. Memory barrier
                     * pairs with the input side.
                     */
                    smp_mb__after_atomic();
                    if (sk_stream_is_writeable(sk))
                        mask |= EPOLLOUT | EPOLLWRNORM;
                }
            } else
                mask |= EPOLLOUT | EPOLLWRNORM;
    
            if (tp->urg_data & TCP_URG_VALID)
                mask |= EPOLLPRI;
        } else if (state == TCP_SYN_SENT && inet_sk(sk)->defer_connect) {
            /* Active TCP fastopen socket with defer_connect
             * Return EPOLLOUT so application can call write()
             * in order for kernel to generate SYN+data
             */
            mask |= EPOLLOUT | EPOLLWRNORM;
        }
        /* This barrier is coupled with smp_wmb() in tcp_reset() */
        smp_rmb();
        if (sk->sk_err || !skb_queue_empty_lockless(&sk->sk_error_queue))
            mask |= EPOLLERR;
    
        return mask;
    }
    EXPORT_SYMBOL(tcp_poll);
    

    5. 源码工作流程

    epoll 源码工作流程

    6. 数据结构

    6.1. eventpoll

    /*
     * This structure is stored inside the "private_data" member of the file
     * structure and represents the main data structure for the eventpoll
     * interface.
     */
    struct eventpoll {
        /*
         * This mutex is used to ensure that files are not removed
         * while epoll is using them. This is held during the event
         * collection loop, the file cleanup path, the epoll file exit
         * code and the ctl operations.
         */
        struct mutex mtx;
    
        /* Wait queue used by sys_epoll_wait() */
        wait_queue_head_t wq;
    
        /* Wait queue used by file->poll() */
        wait_queue_head_t poll_wait;
    
        /* List of ready file descriptors */
        struct list_head rdllist;
    
        /* Lock which protects rdllist and ovflist */
        rwlock_t lock;
    
        /* RB tree root used to store monitored fd structs */
        struct rb_root_cached rbr;
    
        /*
         * This is a single linked list that chains all the "struct epitem" that
         * happened while transferring ready events to userspace w/out
         * holding ->lock.
         */
        struct epitem *ovflist;
    
        /* wakeup_source used when ep_scan_ready_list is running */
        struct wakeup_source *ws;
    
        /* The user that created the eventpoll descriptor */
        struct user_struct *user;
    
        struct file *file;
    
        /* used to optimize loop detection check */
        int visited;
        struct list_head visited_list_link;
    
    #ifdef CONFIG_NET_RX_BUSY_POLL
        /* used to track busy poll napi_id */
        unsigned int napi_id;
    #endif
    };
    
    成员 描述
    mtx 互斥变量,避免在遍历 epi 节点时(例如 ep_send_events),epi 被删除。
    wq 等待队列,当 epoll_wait 没发现就绪事件需要处理,添加等待事件,需要睡眠阻塞等待唤醒进程。
    poll_wait 等待队列,当epoll_ctl 监听的是另外一个 epoll fd 时使用。
    rdllist 就绪列表,产生了用户注册的 fd读写事件的 epi 链表。
    ovflist 单链表,当 rdllist 被锁定遍历,向用户空间发送数据时,rdllist 不允许被修改,新触发的就绪 epitem 被 ovflist 串联起来,等待 rdllist 被处理完了,重新将 ovflist 数据写入 rdllist。 详看 ep_scan_ready_list 逻辑。
    user 创建 eventpoll 的用户结构信息。
    lock 锁,保护 rdllist 和 ovflist 。
    rbr 红黑树根结点,管理 fd 结点。
    file eventpoll 对应的文件结构,Linux 一切皆文件,用 vfs 管理数据。
    napi_id 应用于中断缓解技术。

    6.2. epitem

    fd 事件管理节点。可以添加到红黑树,也可以串联成就绪列表或其它列表。

    /*
     * Each file descriptor added to the eventpoll interface will
     * have an entry of this type linked to the "rbr" RB tree.
     * Avoid increasing the size of this struct, there can be many thousands
     * of these on a server and we do not want this to take another cache line.
     */
    struct epitem {
        union {
            /* RB tree node links this structure to the eventpoll RB tree */
            struct rb_node rbn;
            /* Used to free the struct epitem */
            struct rcu_head rcu;
        };
    
        /* List header used to link this structure to the eventpoll ready list */
        struct list_head rdllink;
    
        /*
         * Works together "struct eventpoll"->ovflist in keeping the
         * single linked chain of items.
         */
        struct epitem *next;
    
        /* The file descriptor information this item refers to */
        struct epoll_filefd ffd;
    
        /* Number of active wait queue attached to poll operations */
        int nwait;
    
        /* List containing poll wait queues */
        struct list_head pwqlist;
    
        /* The "container" of this item */
        struct eventpoll *ep;
    
        /* List header used to link this item to the "struct file" items list */
        struct list_head fllink;
    
        /* wakeup_source used when EPOLLWAKEUP is set */
        struct wakeup_source __rcu *ws;
    
        /* The structure that describe the interested events and the source fd */
        struct epoll_event event;
    };
    
    成员 描述
    rbn 连接红黑树结构节点。
    rdllink 就绪队列节点,用于将 epitem 串联成就绪队列列表。
    next 指向下一个单链表节点的指针。配合 eventpoll 的 ovflist 使用。
    ffd 记录节点对应的 fd 和 file 文件信息。
    nwait 等待队列个数。
    pwqlist 等待事件回调队列。当数据进入网卡,底层中断执行 ep_poll_callback。
    ep eventpoll 指针,epitem 关联 eventpoll。
    fllink epoll 文件链表结点,与 epoll 文件链表进行关联 file.f_ep_links。参考 fs.h, struct file 结构。
    ws EPOLLWAKEUP 模式下使用。
    event 用户关注的事件。

    6.3. epoll_filefd

    fd 对应 file 文件结构,Linux 一切皆文件,采用了 vfs (虚拟文件系统)管理文件或设备。

    struct epoll_filefd {
        struct file *file;
        int fd;
    } __packed;
    

    6.4. epoll_event

    用户关注的 epoll 事件结构。

    struct epoll_event {
        __poll_t events;
        __u64 data;
    } EPOLL_PACKED;
    
    成员 描述
    events 事件集合
    data fd

    6.5. poll_table_struct

    就绪事件处理结构。

    /* poll.h
     * Do not touch the structure directly, use the access functions
     * poll_does_not_wait() and poll_requested_events() instead.
     */
    typedef struct poll_table_struct {
        poll_queue_proc _qproc;
        __poll_t _key;
    } poll_table;
    
    /*
     * structures and helpers for f_op->poll implementations
     */
    typedef void (*poll_queue_proc)(struct file *, wait_queue_head_t *, struct poll_table_struct *);
    
    成员 描述
    _qproc 处理函数,可以指向 ep_ptable_queue_proc 函数,或者空。
    _key 事件组合。

    6.6. ep_pqueue

    包装就绪事件处理结构,关联 epitem。

    /* Wrapper struct used by poll queueing */
    struct ep_pqueue {
        poll_table pt;
        struct epitem *epi;
    };
    
    成员 描述
    pt 就绪事件处理结构。
    epi epitem 对应节点。

    7. 关键函数

    函数 描述
    eventpoll_init 初始化 epoll 模块。eventpoll 作为 Linux 内核的一部分,模块化管理。
    do_epoll_create 为 eventpoll 结构分配资源。
    do_epoll_ctl epoll 管理 fd 事件接口。
    do_epoll_wait 有条件阻塞等待 fd 事件发生,返回对fd 和对应事件数据。
    ep_item_poll 获取 fd 就绪事件,并关联 fd 和事件触发回调函数 ep_poll_callback。
    ep_poll_callback fd 事件回调函数。当底层收到数据,中断调用 fd 关联的 ep_poll_callback 回调函数,如果事件是用户关注的事件,会将 fd 对应的 epi 结点添加进就绪队列,然后唤醒阻塞等待的 epoll_wait 处理。
    ep_send_events 遍历就绪列表,拷贝内核空间就绪数据到用户空间。结合 ep_scan_ready_list 和 ep_send_events_proc 使用。
    ep_scan_ready_list 遍历就绪列表。当 fd 收到数据,回调 ep_poll_callback,如果事件是用户关注的,那么将 fd 对应的 epi 结点添加到就绪队列,ep_scan_ready_list 会遍历这个就绪列表,将数据从内核空间拷贝到用户空间,或者其它操作。
    ep_send_events_proc 内核将就绪列表数据,发送到用户空间。结合 ep_scan_ready_list 使用。LT/ET 模式在这个函数里实现。
    ep_ptable_queue_proc 添加 fd 的等待事件到等待队列,关联 fd 与回调函数 ep_poll_callback。

    8. 核心源码

    8.1. 初始化

    添加 epoll 模块到内核,slab 算法为 epoll 分配资源。

    static int __init eventpoll_init(void) {
        struct sysinfo si;
        ...
        /* Allocates slab cache used to allocate "struct epitem" items */
        epi_cache = kmem_cache_create("eventpoll_epi", sizeof(struct epitem),
                0, SLAB_HWCACHE_ALIGN|SLAB_PANIC|SLAB_ACCOUNT, NULL);
    
        /* Allocates slab cache used to allocate "struct eppoll_entry" */
        pwq_cache = kmem_cache_create("eventpoll_pwq",
            sizeof(struct eppoll_entry), 0, SLAB_PANIC|SLAB_ACCOUNT, NULL);
    
        return 0;
    }
    fs_initcall(eventpoll_init);
    

    8.2. epoll_create

    创建 eventpoll 对象,关联文件资源。

    static int do_epoll_create(int flags) {
        int error, fd;
        struct eventpoll *ep = NULL;
        struct file *file;
        ...
        // slab 算法为 eventpoll 结构分配内存,并初始化 eventpoll 成员数据。
        error = ep_alloc(&ep);
        if (error < 0)
            return error;
    
        // 分配一个空闲的文件描述符。
        fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
        if (fd < 0) {
            error = fd;
            goto out_free_ep;
        }
    
        // slab 分配一个新的文件结构对象(struct file *)
        file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
                     O_RDWR | (flags & O_CLOEXEC));
        if (IS_ERR(file)) {
            error = PTR_ERR(file);
            goto out_free_fd;
        }
        ep->file = file;
    
        // fd 与 file* 结构进行绑定。
        fd_install(fd, file);
        return fd;
        ...
    }
    

    8.3. epoll_ctl

    fd 对应的事件管理(增删改)。

    • 添加 fd 事件管理流程:fd 关联回调 ep_poll_callback。
    fd -> socket -> poll -> ep_ptable_queue_proc -> wait_queue -> ep_poll_callback
    
    • 触发了 fd 关注的事件回调处理。
    driver -> ep_poll_callback -> waitup -> epoll_wait(wake up)
    

    SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
            struct epoll_event __user *, event) {
        struct epoll_event epds;
    
        // 为了 event 数据的安全性,将数据进行拷贝,再进行逻辑处理。
        if (ep_op_has_event(op) &&
            copy_from_user(&epds, event, sizeof(struct epoll_event)))
            return -EFAULT;
    
        return do_epoll_ctl(epfd, op, fd, &epds, false);
    }
    
    int do_epoll_ctl(int epfd, int op, int fd, struct epoll_event *epds, bool nonblock) {
        int error;
        int full_check = 0;
        struct fd f, tf;
        struct eventpoll *ep;
        struct epitem *epi;
        struct eventpoll *tep = NULL;
        ...
        // 检查参数合法性。
        ...
        // 在 do_epoll_create 实现里 anon_inode_getfile 将 private_data 与 eventpoll 关联。
        ep = f.file->private_data;
        ...
        // 红黑树检查 fd 是否已经被添加。
        epi = ep_find(ep, tf.file, fd);
    
        error = -EINVAL;
        switch (op) {
        case EPOLL_CTL_ADD:
            if (!epi) {
                /* epoll 如果没有添加过该 fd,就添加到红黑树进行管理。
                 * 事件默认关注异常处理(EPOLLERR | EPOLLHUP)。*/
                epds->events |= EPOLLERR | EPOLLHUP;
                error = ep_insert(ep, epds, tf.file, fd, full_check);
            } else
                error = -EEXIST;
            if (full_check)
                clear_tfile_check_list();
            break;
        case EPOLL_CTL_DEL:
            if (epi)
                error = ep_remove(ep, epi);
            else
                error = -ENOENT;
            break;
        case EPOLL_CTL_MOD:
            if (epi) {
                if (!(epi->event.events & EPOLLEXCLUSIVE)) {
                    epds->events |= EPOLLERR | EPOLLHUP;
                    error = ep_modify(ep, epi, epds);
                }
            } else
                error = -ENOENT;
            break;
        }
        ...
        return error;
    }
    
    static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
                 struct file *tfile, int fd, int full_check) {
        // epoll 管理 fd 和对应事件节点 epitem 数据结构。
        struct epitem *epi;
        struct ep_pqueue epq;
        ...
        epq.epi = epi;
    
        // 初始化就绪事件处理函数调用。poll() 接口调用 ep_ptable_queue_proc。
        init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
    
        // 添加等待队列,如果 fd 有用户关注的事件发生,返回对应 fd 关注的事件 revents。
        revents = ep_item_poll(epi, &epq.pt, 1);
        ...
        // 将当前节点,添加到 epoll 文件钩子,将 epoll 文件与 fd 对应文件串联起来。
        list_add_tail_rcu(&epi->fllink, &tfile->f_ep_links);
    
        // 将节点添加进二叉树
        ep_rbtree_insert(ep, epi);
    
        // 如果有关注的事件发生,将节点关联到就绪事件列表。
        if (revents && !ep_is_linked(epi)) {
            list_add_tail(&epi->rdllink, &ep->rdllist);
            ep_pm_stay_awake(epi);
    
            /* 如果进程正在睡眠等待,唤醒它去处理就绪事件。睡眠事件 ep->wq 在 epoll_wait 中添加*/
            if (waitqueue_active(&ep->wq))
                // 唤醒进程
                wake_up(&ep->wq);
    
            // 如果监控的是另外一个 epoll_create 的 fd,有就绪事件,也唤醒进程。
            if (waitqueue_active(&ep->poll_wait))
                pwake++;
        }
        ...
        if (pwake)
            ep_poll_safewake(&ep->poll_wait);
    
        return 0;
    }
    

    8.4. ep_item_poll

    fd 节点就绪事件处理。

    static __poll_t ep_item_poll(const struct epitem *epi, poll_table *pt, int depth) {
        struct eventpoll *ep;
        bool locked;
    
        pt->_key = epi->event.events;
        if (!is_file_epoll(epi->ffd.file)) {
            // 非 epoll fd,tcp_poll 检查 socket 就绪事件,fd 关联回调函数 ep_poll_callback。
            return vfs_poll(epi->ffd.file, pt) & epi->event.events;
        } else {
            // epoll 嵌套。epoll_ctl 添加关注了另外一个 epoll 的 fd(epfd)。
            ep = epi->ffd.file->private_data;
            poll_wait(epi->ffd.file, &ep->poll_wait, pt);
            locked = pt && (pt->_qproc == ep_ptable_queue_proc);
    
            return ep_scan_ready_list(epi->ffd.file->private_data,
                        ep_read_events_proc, &depth, depth, locked) &
                epi->event.events;
        }
    }
    
    // vfs - Virtual Filesystem Switch(Linux 虚拟文件系统)
    // poll.h 就绪事件处理函数。
    static inline __poll_t vfs_poll(struct file *file, struct poll_table_struct *pt) {
        if (unlikely(!file->f_op->poll))
            return DEFAULT_POLLMASK;
        // 这里的 poll 函数指针指向 tcp_poll 函数。
        return file->f_op->poll(file, pt);
    }
    
    // tcp.c
    // tcp 就绪事件获取函数。
    __poll_t tcp_poll(struct file *file, struct socket *sock, poll_table *wait) {
        __poll_t mask;
        struct sock *sk = sock->sk;
        const struct tcp_sock *tp = tcp_sk(sk);
        int state;
    
        /* 添加等待队列和关联事件回调函数 ep_poll_callback
         *(只有 epoll_ctl EPOLL_CTL_ADD 的情况下,才会添加等待事件,否则 wait == NULL)*/
        sock_poll_wait(file, sock, wait);
    
        // 检查 fd 是否有事件发生。
        state = inet_sk_state_load(sk);
        if (state == TCP_LISTEN)
            return inet_csk_listen_poll(sk);
        ...
    }
    
    // socket.h
    static inline void sock_poll_wait(struct file *filp, struct socket *sock, poll_table *p) {
        // ep_insert 调用 ep_item_poll 才会插入等待事件。
        if (!poll_does_not_wait(p)) {
            poll_wait(filp, &sock->wq.wait, p);
            ...
        }
    }
    
    // poll.h
    static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p) {
        if (p && p->_qproc && wait_address)
            // _qproc ---> ep_ptable_queue_proc
            p->_qproc(filp, wait_address, p);
    }
    

    8.5. ep_ptable_queue_proc

    socket 的等待队列关联回调函数 ep_poll_callback

    static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead, poll_table *pt) {
        struct epitem *epi = ep_item_from_epqueue(pt);
        struct eppoll_entry *pwq;
    
        if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
            // 关联等待队列和ep_poll_callback。
            init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
    
            // whead ---> socket->wq.wait
            pwq->whead = whead;
            pwq->base = epi;
    
            /* 等待事件,添加到等待队列。EPOLLEXCLUSIVE 为了解决 epoll_wait 惊群问题。
             * 如果多线程同时调用 epoll_wait,那么 fd 应该设置 EPOLLEXCLUSIVE 事件。 */
            if (epi->event.events & EPOLLEXCLUSIVE) {
                add_wait_queue_exclusive(whead, &pwq->wait);
            } else {
                add_wait_queue(whead, &pwq->wait);
            }
    
            /* 等待事件,关联 epitem。epitem 为什么要有一个等待队列呢,
             * 因为有可能一个进程里存在多个 epoll 实例同时 epoll_ctl 关注一个 fd。*/
            list_add_tail(&pwq->llink, &epi->pwqlist);
            epi->nwait++;
        } else {
            /* We have to signal that an error occurred */
            epi->nwait = -1;
        }
    }
    

    8.6. epoll_wait

    SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
            int, maxevents, int, timeout) {
        return do_epoll_wait(epfd, events, maxevents, timeout);
    }
    
    static int do_epoll_wait(int epfd, struct epoll_event __user *events,
                 int maxevents, int timeout) {
        ...
        // timeout 阻塞等待处理并返回就绪事件。
        error = ep_poll(ep, events, maxevents, timeout);
        ...
    }
    
    static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
               int maxevents, long timeout) {
        int res = 0, eavail, timed_out = 0;
        u64 slack = 0;
        bool waiter = false;
        wait_queue_entry_t wait;
        ktime_t expires, *to = NULL;
    
        // 计算 timeout 睡眠时间。如果有就绪事件,处理并发送到用户空间。
        ...
    
    fetch_events:
    
        if (!ep_events_available(ep))
            // napi 中断缓解技术,避免网卡频繁中断 cpu,提高数据获取的效率。这里为了积攒网络数据进行返回。
            ep_busy_loop(ep, timed_out);
    
        // 检查就绪队列是否有数据。
        eavail = ep_events_available(ep);
        if (eavail)
            // 如果有就绪事件了,就直接不用睡眠等待了,进入发送环节。
            goto send_events;
    
        ...
    
        // 没有就绪事件发生,需要睡眠等待。
        if (!waiter) {
            waiter = true;
            // 等待事件,关联当前进程。
            init_waitqueue_entry(&wait, current);
    
            spin_lock_irq(&ep->wq.lock);
            // 添加等待事件。(为了解决惊群效应,所以等待事件添加了 WQ_FLAG_EXCLUSIVE 标识。查看 __wake_up_common 实现。)
            __add_wait_queue_exclusive(&ep->wq, &wait);
            spin_unlock_irq(&ep->wq.lock);
        }
    
        for (;;) {
            /*
             * We don't want to sleep if the ep_poll_callback() sends us
             * a wakeup in between. That's why we set the task state
             * to TASK_INTERRUPTIBLE before doing the checks.
             */
    
            // 设置当前进程状态为等待状态,可以被信号解除等待。
            set_current_state(TASK_INTERRUPTIBLE);
            /*
             * Always short-circuit for fatal signals to allow
             * threads to make a timely exit without the chance of
             * finding more events available and fetching
             * repeatedly.
             */
    
            // 信号中断,不要执行睡眠了。
            if (fatal_signal_pending(current)) {
                res = -EINTR;
                break;
            }
    
            // 检查就绪队列。
            eavail = ep_events_available(ep);
            if (eavail)
                break;
    
            // 信号中断,不要执行睡眠了。
            if (signal_pending(current)) {
                res = -EINTR;
                break;
            }
    
            // 进程进入睡眠状态。
            if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS)) {
                timed_out = 1;
                break;
            }
        }
    
        // 进程等待超时,或者被唤醒,设置进程进入运行状态,等待内核调度运行。
        __set_current_state(TASK_RUNNING);
    
    send_events:
        /*
         * Try to transfer events to user space. In case we get 0 events and
         * there's still timeout left over, we go trying again in search of
         * more luck.
         */
    
        // 有就绪事件就发送到用户空间,否则继续获取数据直到超时。
        if (!res && eavail && !(res = ep_send_events(ep, events, maxevents)) &&
            !timed_out)
            goto fetch_events;
    
        // 从等待队列中,删除等待事件。
        if (waiter) {
            spin_lock_irq(&ep->wq.lock);
            __remove_wait_queue(&ep->wq, &wait);
            spin_unlock_irq(&ep->wq.lock);
        }
    
        return res;
    }
    
    /* Used by the ep_send_events() function as callback private data */
    struct ep_send_events_data {
        int maxevents;
        struct epoll_event __user *events;
        int res;
    };
    
    static int ep_send_events(struct eventpoll *ep,
                  struct epoll_event __user *events, int maxevents) {
        struct ep_send_events_data esed;
    
        esed.maxevents = maxevents;
        esed.events = events;
    
        // 遍历事件就绪列表,发送就绪事件到用户空间。
        ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false);
        return esed.res;
    }
    

    8.7. ep_scan_ready_list

    遍历就绪列表,处理 sproc 函数。这里 sproc 函数指针的使用,是为了减少代码冗余,将 ep_scan_ready_list 做成一个通用的函数。

    // 
    static __poll_t ep_scan_ready_list(struct eventpoll *ep,
                      __poll_t (*sproc)(struct eventpoll *,
                           struct list_head *, void *),
                      void *priv, int depth, bool ep_locked) {
        __poll_t res;
        struct epitem *epi, *nepi;
        LIST_HEAD(txlist);
        ...
        // 将就绪队列分片链接到 txlist 链表中。
        list_splice_init(&ep->rdllist, &txlist);
        res = (*sproc)(ep, &txlist, priv);
        ...
        // 在处理 sproc 回调处理过程中,可能产生新的就绪事件被写入 ovflist,将 ovflist 回写 rdllist。
        for (nepi = READ_ONCE(ep->ovflist); (epi = nepi) != NULL;
             nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
            if (!ep_is_linked(epi)) {
                list_add(&epi->rdllink, &ep->rdllist);
                ep_pm_stay_awake(epi);
            }
        }
        ...
        // txlist 在 epitem 回调中,可能没有完全处理完,那么重新放回到 rdllist,下次处理。
        list_splice(&txlist, &ep->rdllist);
        ...
    }
    

    8.8. ep_send_events_proc

    处理就绪列表,将数据从内核空间拷贝到用户空间。

    static __poll_t ep_send_events_proc(struct eventpoll *ep, struct list_head *head, void *priv) {
        struct ep_send_events_data *esed = priv;
        __poll_t revents;
        struct epitem *epi, *tmp;
        struct epoll_event __user *uevent = esed->events;
        struct wakeup_source *ws;
        poll_table pt;
        init_poll_funcptr(&pt, NULL);
        ...
    
        // 遍历处理 txlist(原 ep->rdllist 数据)就绪队列结点,获取事件拷贝到用户空间。
        list_for_each_entry_safe (epi, tmp, head, rdllink) {
            if (esed->res >= esed->maxevents)
                break;
            ...
            // 先从就绪队列中删除 epi,如果是 LT 模式,就绪事件还没处理完,再把它添加回去。
            list_del_init(&epi->rdllink);
    
            // 获取 epi 对应 fd 的就绪事件。
            revents = ep_item_poll(epi, &pt, 1);
            if (!revents)
                continue;
    
            // 内核空间向用户空间传递数据。__put_user 成功拷贝返回 0。
            if (__put_user(revents, &uevent->events) ||
                __put_user(epi->event.data, &uevent->data)) {
                // 如果拷贝失败,继续保存在就绪列表里。
                list_add(&epi->rdllink, head);
                ep_pm_stay_awake(epi);
                if (!esed->res)
                    esed->res = -EFAULT;
                return 0;
            }
    
            // 成功处理就绪事件的 fd 个数。
            esed->res++;
            uevent++;
            if (epi->event.events & EPOLLONESHOT)
                // #define EP_PRIVATE_BITS (EPOLLWAKEUP | EPOLLONESHOT | EPOLLET | EPOLLEXCLUSIVE)
                epi->event.events &= EP_PRIVATE_BITS;
            else if (!(epi->event.events & EPOLLET)) {
                /*
                 * If this file has been added with Level
                 * Trigger mode, we need to insert back inside
                 * the ready list, so that the next call to
                 * epoll_wait() will check again the events
                 * availability. At this point, no one can insert
                 * into ep->rdllist besides us. The epoll_ctl()
                 * callers are locked out by
                 * ep_scan_ready_list() holding "mtx" and the
                 * poll callback will queue them in ep->ovflist.
                 */
                /* lt 模式下,当前事件被处理完后,不会从就绪列表中删除,留待下一次 epoll_wait
                 * 调用,再查看是否还有事件没处理,如果没有事件了就从就绪列表中删除。
                 * 在遍历事件的过程中,不能写 ep->rdllist,因为已经上锁,只能把新的就绪信息
                 * 添加到 ep->ovflist */
                list_add_tail(&epi->rdllink, &ep->rdllist);
                ep_pm_stay_awake(epi);
            }
        }
    
        return 0;
    }
    

    8.9. ep_poll_callback

    fd 事件回调。当 fd 有网络事件发生,就会通过等待队列,进行回调。参考 __wake_up_common,如果事件是用户关注的事件,回调会唤醒进程进行处理。

    static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, void *key) {
        int pwake = 0;
        struct epitem *epi = ep_item_from_wait(wait);
        struct eventpoll *ep = epi->ep;
        __poll_t pollflags = key_to_poll(key);
        unsigned long flags;
        int ewake = 0;
    
        // 禁止本地中断并获得指定读锁。
        read_lock_irqsave(&ep->lock, flags);
    
        ep_set_busy_poll_napi_id(epi);
    
        // #define EP_PRIVATE_BITS (EPOLLWAKEUP | EPOLLONESHOT | EPOLLET | EPOLLEXCLUSIVE)
        // 如果 fd 没有关注除了 EP_PRIVATE_BITS 之外的事件,那么走解锁流程。
        if (!(epi->event.events & ~EP_PRIVATE_BITS))
            goto out_unlock;
    
        // 如果回调的事件,不是用户关注的 fd 事件,那么走解锁流程。
        if (pollflags && !(pollflags & epi->event.events))
            goto out_unlock;
    
        /*
         * If we are transferring events to userspace, we can hold no locks
         * (because we're accessing user memory, and because of linux f_op->poll()
         * semantics). All the events that happen during that period of time are
         * chained in ep->ovflist and requeued later on.
         */
        // 当内核空间向用户空间拷贝数据时,不添加 epi 到 rdllist,将它添加到 ovflist。
        if (READ_ONCE(ep->ovflist) != EP_UNACTIVE_PTR) {
            if (epi->next == EP_UNACTIVE_PTR && chain_epi_lockless(epi))
                ep_pm_stay_awake_rcu(epi);
            goto out_unlock;
        }
    
        // epi 已经加入就绪链表就不需要添加了。
        if (!ep_is_linked(epi) &&
            list_add_tail_lockless(&epi->rdllink, &ep->rdllist)) {
            ep_pm_stay_awake_rcu(epi);
        }
    
        // 当回调事件是用户关注的事件,那么需要唤醒进程处理。
    
        // ep->wq 在 epoll_wait 时添加,当没有就绪事件,epoll_wait 进行睡眠等待唤醒。
        if (waitqueue_active(&ep->wq)) {
            if ((epi->event.events & EPOLLEXCLUSIVE) &&
                !(pollflags & POLLFREE)) {
                // #define EPOLLINOUT_BITS (EPOLLIN | EPOLLOUT)
                switch (pollflags & EPOLLINOUT_BITS) {
                case EPOLLIN:
                    if (epi->event.events & EPOLLIN)
                        ewake = 1;
                    break;
                case EPOLLOUT:
                    if (epi->event.events & EPOLLOUT)
                        ewake = 1;
                    break;
                case 0:
                    ewake = 1;
                    break;
                }
            }
            wake_up(&ep->wq);
        }
    
        // ep->poll_wait 是 epoll 监控另外一个 epoll fd 的等待队列。如果触发事件,也需要唤醒进程处理。
        if (waitqueue_active(&ep->poll_wait))
            pwake++;
    
    out_unlock:
        read_unlock_irqrestore(&ep->lock, flags);
    
        /* We have to call this outside the lock */
        if (pwake)
            ep_poll_safewake(&ep->poll_wait);
    
        if (!(epi->event.events & EPOLLEXCLUSIVE))
            ewake = 1;
    
        if (pollflags & POLLFREE) {
            /*
             * If we race with ep_remove_wait_queue() it can miss
             * ->whead = NULL and do another remove_wait_queue() after
             * us, so we can't use __remove_wait_queue().
             */
            list_del_init(&wait->entry);
            /*
             * ->whead != NULL protects us from the race with ep_free()
             * or ep_remove(), ep_remove_wait_queue() takes whead->lock
             * held by the caller. Once we nullify it, nothing protects
             * ep/epi or even wait.
             */
            smp_store_release(&ep_pwq_from_wait(wait)->whead, NULL);
        }
    
        return ewake;
    }
    

    9. 参考

    相关文章

      网友评论

          本文标题:[epoll 源码走读] epoll 实现原理

          本文链接:https://www.haomeiwen.com/subject/dntprktx.html