skynet中socket模块对epoll接口的调用
先来看用法,再来梳理内核的调用流程
-
首先要初始化一个epoll的实例
#eventpoll.c //以下是内核代码的定义,可见在当前linux版本中size的作用已经抛弃了 SYSCALL_DEFINE1(epoll_create, int, size) { if (size <= 0) return -EINVAL; return do_epoll_create(0); } /* * eventpoll结构体的关键定义 * This structure is stored inside the "private_data" member of the file * structure and represents the main data structure for the eventpoll * interface. */ struct eventpoll { /* Wait queue used by sys_epoll_wait() */ wait_queue_head_t wq; /* Wait queue used by file->poll() */ wait_queue_head_t poll_wait; /* List of ready file descriptors */ struct list_head rdllist; /* RB tree root used to store monitored fd structs */ struct rb_root_cached rbr; /* * This is a single linked list that chains all the "struct epitem" that * happened while transferring ready events to userspace w/out * holding ->lock. */ struct epitem *ovflist; /* wakeup_source used when ep_scan_ready_list is running */ struct wakeup_source *ws; };
结构体中比较重要的一个是rdllist,也就是各种epoll文章中所说的记录有事件的fd的双向链表。另一个rbr,红黑树,用于对这些fd节点做增删改查。
-
socket在监听及创建连接的时候都会生成一个fd。这是skynet中创建连接的部分代码。对于大部分连接来说,ai_family都是AF_INET,表示ipv4。我们暂且只看SOCK_STREAM,也就是tcp连接的部分。
static int open_socket(struct socket_server *ss, struct request_open * request, struct socket_message *result) { ai_hints.ai_family = AF_UNSPEC; ai_hints.ai_socktype = SOCK_STREAM; ai_hints.ai_protocol = IPPROTO_TCP; status = getaddrinfo( request->host, port, &ai_hints, &ai_list ); sock = socket( ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol ); }
内核中创建socket就是返回一个sock的结构体,以及通过sock_map_fd为该sock分配一个文件socket_file_ops的结构体。结构体中跟我们最相关的三个函数,poll、read、write。
#socket.c int __sys_socket(int family, int type, int protocol) { struct socket *sock; sock = __sys_socket_create(family, type, protocol); return sock_map_fd(sock, flags & (O_CLOEXEC | O_NONBLOCK)); } static int sock_map_fd(struct socket *sock, int flags) { struct file *newfile; newfile = sock_alloc_file(sock, flags, NULL); } struct file *sock_alloc_file(struct socket *sock, int flags, const char *dname) { struct file *file; file = alloc_file_pseudo(SOCK_INODE(sock), sock_mnt, dname, O_RDWR | (flags & O_NONBLOCK), &socket_file_ops); } static const struct file_operations socket_file_ops = { .read_iter = sock_read_iter, .write_iter = sock_write_iter, .poll = sock_poll, } static __poll_t sock_poll(struct file *file, poll_table *wait) { struct socket *sock = file->private_data; return sock->ops->poll(file, sock, wait) | flag; }
我们再来看一下socket申请时结构体中的ops是何方神圣。
#socket.c static struct socket *__sys_socket_create(int family, int type, int protocol) { struct socket *sock; retval = sock_create(family, type, protocol, &sock); return sock; } //sock_create实际调用的就是__sock_create int __sock_create(struct net *net, int family, int type, int protocol, struct socket **res, int kern) { struct socket *sock; sock = sock_alloc(); pf = rcu_dereference(net_families[family]); err = pf->create(net, sock, protocol, kern); }
重点就这两行代码,从net_families中取得的pf结构体调用了create方法初始化了sock结构体。接下来我们看看net_families数组又是存储的什么数据。记住我们当前的family是AF_INET,表示ip协议。下面代码定义了一个结构体和数组,内核初始化的时候会调用inet_init初始化函数,用来初始化相关的网络协议。注意,PF_INET和上文的AF_INET是相同的宏定义。
#af_inet.c static const struct net_proto_family inet_family_ops = { .family = PF_INET, .create = inet_create, .owner = THIS_MODULE, }; static struct inet_protosw inetsw_array[] = { { .type = SOCK_STREAM, .protocol = IPPROTO_TCP, .prot = &tcp_prot, .ops = &inet_stream_ops, .flags = INET_PROTOSW_PERMANENT | INET_PROTOSW_ICSK, }, } static int __init inet_init(void) { (void)sock_register(&inet_family_ops); for (q = inetsw_array; q < &inetsw_array[INETSW_ARRAY_LEN]; ++q) inet_register_protosw(q); } int sock_register(const struct net_proto_family *ops) { if (rcu_dereference_protected(net_families[ops->family], lockdep_is_held(&net_family_lock))) err = -EEXIST; else { rcu_assign_pointer(net_families[ops->family], ops); err = 0; } } //将上面inetsw_array数组的成员根据类型注册到inetsw这个静态变量的数组中。 static struct list_head inetsw[SOCK_MAX]; void inet_register_protosw(struct inet_protosw *p) { /* If we are trying to override a permanent protocol, bail. */ last_perm = &inetsw[p->type]; list_for_each(lh, &inetsw[p->type]) { answer = list_entry(lh, struct inet_protosw, list); /* Check only the non-wild match. */ if ((INET_PROTOSW_PERMANENT & answer->flags) == 0) break; if (protocol == answer->protocol) goto out_permanent; last_perm = lh; } }
inet_init函数调用sock_register接口,将inet_family_ops这个结构体放入了net_families中。所以我们在上文中ip协议的pf就是inet_family_ops结构体。这其实也是插件化的一种思路。在net_families注册的结构体对外都提供create接口,初始化的时候根据不同的family调用不同的create函数。当前inet_family_ops对应的create函数就是inet_create,我们来看看这个函数的实现。
static int inet_create(struct net *net, struct socket *sock, int protocol, int kern) { list_for_each_entry_rcu(answer, &inetsw[sock->type], list) { err = 0; /* Check the non-wild match. */ if (protocol == answer->protocol) { if (protocol != IPPROTO_IP) break; } else { /* Check for the two wild cases. */ if (IPPROTO_IP == protocol) { protocol = answer->protocol; break; } if (IPPROTO_IP == answer->protocol) break; } err = -EPROTONOSUPPORT; } ... sock->ops = answer->ops; }
创建的操作实际上就是从上面初始化好的静态变量中inetsw,根据类型协议获得answer并将其ops赋值给sock的ops。对应于我们使用的type为SOCK_STREAM和protocol为IPPROTO_TCP的socket,对应的ops就是inet_stream_ops这个结构体。来看看这个结构体的定义
const struct proto_ops inet_stream_ops = { .family = PF_INET, .owner = THIS_MODULE, .release = inet_release, .bind = inet_bind, .connect = inet_stream_connect, .socketpair = sock_no_socketpair, .accept = inet_accept, .getname = inet_getname, .poll = tcp_poll, };
还记得上文socket的poll执行的函数吗?没错,执行sock的ops的poll函数,也就是inet_stream_ops的poll函数。
static __poll_t sock_poll(struct file *file, poll_table *wait) { struct socket *sock = file->private_data; return sock->ops->poll(file, sock, wait) | flag; } #tcp.c __poll_t tcp_poll(struct file *file, struct socket *sock, poll_table *wait) { struct sock *sk = sock->sk; sock_poll_wait(file, sock, wait); ... if (sk->sk_shutdown == SHUTDOWN_MASK || state == TCP_CLOSE) mask |= EPOLLHUP; if (sk->sk_shutdown & RCV_SHUTDOWN) mask |= EPOLLIN | EPOLLRDNORM | EPOLLRDHUP; if (state != TCP_SYN_SENT && (state != TCP_SYN_RECV || rcu_access_pointer(tp->fastopen_rsk))) { if (tcp_stream_is_readable(sk, target)) mask |= EPOLLIN | EPOLLRDNORM; if (!(sk->sk_shutdown & SEND_SHUTDOWN)) { if (__sk_stream_is_writeable(sk, 1)) { mask |= EPOLLOUT | EPOLLWRNORM; } ... } } return mask; } static inline void sock_poll_wait(struct file *filp, struct socket *sock, poll_table *p) { if (!poll_does_not_wait(p)) { poll_wait(filp, &sock->wq.wait, p); /* We need to be sure we are in sync with the * socket flags modification. * * This memory barrier is paired in the wq_has_sleeper. */ smp_mb(); } }
从这小部分代码可以看出,poll执行了sock_poll_wait函数,这个函数最重要的行为是调用了poll_wait函数,并返回了当前socket发生的事件,比如可读或关闭的时候mask或EPOLLIN,可写的时候mask或EPOLLOUT。poll_wait函数非常重要,关系到监听的socket节点是如何注册回调函数的问题。
-
回到epoll,我们来看socket fd加入epoll节点的过程,调用的是epoll_ctl接口。我们只看新增fd的情况。
#eventpoll.c /* Valid opcodes to issue to sys_epoll_ctl() */ #define EPOLL_CTL_ADD 1 #define EPOLL_CTL_DEL 2 #define EPOLL_CTL_MOD 3 //这个fd就是socket接口返回的fd int do_epoll_ctl(int epfd, int op, int fd, struct epoll_event *epds, bool nonblock) { //epi的意思就是eventpollitem epi = ep_find(ep, tf.file, fd); error = -EINVAL; switch (op) { case EPOLL_CTL_ADD: if (!epi) { epds->events |= EPOLLERR | EPOLLHUP; error = ep_insert(ep, epds, tf.file, fd, full_check); } else error = -EEXIST; break; ... } } static int ep_insert(struct eventpoll *ep, const struct epoll_event *event, struct file *tfile, int fd, int full_check) { struct epitem *epi; //kmem_cache_zalloc是从缓存中申请内存并清零。这也是epoll效率高的一个原因 if (!(epi = kmem_cache_zalloc(epi_cache, GFP_KERNEL))) { percpu_counter_dec(&ep->user->epoll_watches); return -ENOMEM; } ep_rbtree_insert(ep, epi); epq.epi = epi; //在ep_ptable_queue_proc这个函数中会注册一个callback函数。 init_poll_funcptr(&epq.pt, ep_ptable_queue_proc); //真正执行上面注册的ep_ptable_queue_proc函数是在此处触发 revents = ep_item_poll(epi, &epq.pt, 1); } static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc) { pt->_qproc = qproc; pt->_key = ~(__poll_t)0; /* all events enabled */ } static __poll_t ep_item_poll(const struct epitem *epi, poll_table *pt, int depth) { struct file *file = epi->ffd.file; //除非用epoll监听epoll节点,不然这里都是进入vfs_poll的流程。 if (!is_file_epoll(file)) res = vfs_poll(file, pt); else res = __ep_eventpoll_poll(file, pt, depth); return res & epi->event.events; } static inline __poll_t vfs_poll(struct file *file, struct poll_table_struct *pt) { if (unlikely(!file->f_op->poll)) return DEFAULT_POLLMASK; return file->f_op->poll(file, pt); } static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead, poll_table *pt) { //这个ep_poll_callback函数就是epoll事件发生时被唤醒时真正的回调函数。 init_waitqueue_func_entry(&pwq->wait, ep_poll_callback); } static inline void init_waitqueue_func_entry(struct wait_queue_entry *wq_entry, wait_queue_func_t func) { wq_entry->flags = 0; wq_entry->private = NULL; wq_entry->func = func; }
来梳理下上述代码的调用过程。增加fd的情况下调用的ep_insert接口,这个接口申请了一个名为epi的epitem结构体,将其插入到epoll实例的红黑树中,用于后续的增删改查操作。调用init_poll_funcptr接口将ep_ptable_queue_proc函数添加到polltable的_qproc成员中。之后调用ep_item_poll接口,对于socketfd而言,会调用vfs_poll接口,这个接口执行的是socket保存的file,也就是上文sock_alloc_file申请的file,那么他对应的f_op也就是该接口调用时的参数socket_file_ops,poll函数也就是对应的sock_poll函数。第二点说socket的源码显示了,sock_poll最后调用了poll_wait接口。
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p) { if (p && p->_qproc && wait_address) p->_qproc(filp, wait_address, p); }
poll_wait接口真正调用了init_poll_funcptr函数赋值的_qproc函数,也就是ep_ptable_queue_proc函数。这个函数才是真正将ep_poll_callback这个回调函数记录到等待队列wait_queue_entry的func成员中。
-
再来看看回调执行的时候发生了什么
static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, void *key) { struct epitem *epi = ep_item_from_wait(wait); struct eventpoll *ep = epi->ep; /* * If we are transferring events to userspace, we can hold no locks * (because we're accessing user memory, and because of linux f_op->poll() * semantics). All the events that happen during that period of time are * chained in ep->ovflist and requeued later on. */ if (READ_ONCE(ep->ovflist) != EP_UNACTIVE_PTR) { if (chain_epi_lockless(epi)) ep_pm_stay_awake_rcu(epi); } else if (!ep_is_linked(epi)) { /* In the usual case, add event to ready list. */ if (list_add_tail_lockless(&epi->rdllink, &ep->rdllist)) ep_pm_stay_awake_rcu(epi); } }
从注释中可以看出,当ovflist != EP_UNACTIVE_PTR,此时正在执行epoll_wait将双向链表的数据拷贝给用户,所以此时将节点放入ovflist中。通常情况下是将其直接插入到rdllist,也就是双向列表中。
-
到了将事件从双向列表中取出来的时候了。去掉一堆检查之后,实际执行的是ep_poll这个函数,重点在于判断有事件,执行ep_send_events函数,将事件拷贝到用户传进来的events数组里。
static int do_epoll_wait(int epfd, struct epoll_event __user *events, int maxevents, struct timespec64 *to) { /* Time to fish for events ... */ error = ep_poll(ep, events, maxevents, to); } static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, struct timespec64 *timeout) { eavail = ep_events_available(ep); while (1) { if (eavail) { /* * Try to transfer events to user space. In case we get * 0 events and there's still timeout left over, we go * trying again in search of more luck. */ res = ep_send_events(ep, events, maxevents); if (res) return res; } if (timed_out) return 0; } } static int ep_send_events(struct eventpoll *ep, struct epoll_event __user *events, int maxevents) { struct epitem *epi, *tmp; LIST_HEAD(txlist); //这里是将双向链表剪到txlist上 ep_start_scan(ep, &txlist); list_for_each_entry_safe(epi, tmp, &txlist, rdllink) { struct wakeup_source *ws; __poll_t revents; if (res >= maxevents) break; list_del_init(&epi->rdllink); /* * If the event mask intersect the caller-requested one, * deliver the event to userspace. Again, we are holding ep->mtx, * so no operations coming from userspace can change the item. */ revents = ep_item_poll(epi, &pt, 1); if (!revents) continue; events = epoll_put_uevent(revents, epi->event.data, events); if (!events) { list_add(&epi->rdllink, &txlist); ep_pm_stay_awake(epi); if (!res) res = -EFAULT; break; } res++; if (epi->event.events & EPOLLONESHOT) epi->event.events &= EP_PRIVATE_BITS; else if (!(epi->event.events & EPOLLET)) { /* * If this file has been added with Level * Trigger mode, we need to insert back inside * the ready list, so that the next call to * epoll_wait() will check again the events * availability. At this point, no one can insert * into ep->rdllist besides us. The epoll_ctl() * callers are locked out by * ep_scan_ready_list() holding "mtx" and the * poll callback will queue them in ep->ovflist. */ list_add_tail(&epi->rdllink, &ep->rdllist); ep_pm_stay_awake(epi); } } ep_done_scan(ep, &txlist); }
ep_send_events函数遍历了剪切出来的双向链表,调用ep_item_poll接口获取了所有节点的事件,我们在第三点讲过,ep_item_poll最后调用了sock的poll接口,也就是sock_poll函数。在第二点说过,sock_poll最终返回了当前节点的事件,以及调用了poll_wait函数,这里要注意与insert时候的区别,insert时候传进去的polltable设置了poll回调,而这里的回调是设置为null的。获得节点的event事件后将其放入传进来的events数组中。最后判断如果非ET模式的话,也就是LT模式,会将节点重新放入epoll实例的双向链表中。这样在下一次调用epoll_wait接口的时候,该节点会再次判断事件。如果没有事件的话,会直接跳过后续的处理。这也就是大部分文章所说的LT模式下,没有读写完成的情况下,在下次epoll的时候可以继续触发读写事件。所以LT和ET的本质区别是,双向链表的节点取出来后,如果有事件发生,LT模式下会重新放入双向链表中。下次epoll_wait的时候,依旧会遍历这个节点,如果没有事件,就不会返回给用户,也就是一个socket发生读写事件,至少会被epoll_wait遍历两次。而ET模式下则取出事件后不会重新插入,得等到下次socket节点上有新的读写事件发生,才会触发上面的epoll_callback接口重新插入双向链表。
总结一下整个流程,先创建一个epoll实例,新建socket后,创建一个epitem节点,将其插入到epoll实例的红黑树中,并注册回调epoll_callback函数,在读写事件发生后会将节点插入到epoll实例的双向链表当中,epoll_wait会遍历双向链表,将读写事件传给用户,LT模式下会将节点重新放回双向链表当中。
网友评论