美文网首页
结合skynet socket模块看epoll的原理

结合skynet socket模块看epoll的原理

作者: 水_97d9 | 来源:发表于2022-06-02 11:03 被阅读0次

    skynet中socket模块对epoll接口的调用

    先来看用法,再来梳理内核的调用流程

    1. 首先要初始化一个epoll的实例

      #eventpoll.c
      //以下是内核代码的定义,可见在当前linux版本中size的作用已经抛弃了
      SYSCALL_DEFINE1(epoll_create, int, size)
      {
          if (size <= 0)
              return -EINVAL;
      
          return do_epoll_create(0);
      }
      /*
      * eventpoll结构体的关键定义
      * This structure is stored inside the "private_data" member of the file
      * structure and represents the main data structure for the eventpoll
      * interface.
      */
      struct eventpoll {
          /* Wait queue used by sys_epoll_wait() */
          wait_queue_head_t wq;
      
          /* Wait queue used by file->poll() */
          wait_queue_head_t poll_wait;
      
          /* List of ready file descriptors */
          struct list_head rdllist;
      
          /* RB tree root used to store monitored fd structs */
          struct rb_root_cached rbr;
          /*
          * This is a single linked list that chains all the "struct epitem" that
          * happened while transferring ready events to userspace w/out
          * holding ->lock.
          */
          struct epitem *ovflist;
      
          /* wakeup_source used when ep_scan_ready_list is running */
          struct wakeup_source *ws;
      };
      

      结构体中比较重要的一个是rdllist,也就是各种epoll文章中所说的记录有事件的fd的双向链表。另一个rbr,红黑树,用于对这些fd节点做增删改查。

    2. socket在监听及创建连接的时候都会生成一个fd。这是skynet中创建连接的部分代码。对于大部分连接来说,ai_family都是AF_INET,表示ipv4。我们暂且只看SOCK_STREAM,也就是tcp连接的部分。

      static int
      open_socket(struct socket_server *ss, struct request_open * request, struct socket_message *result) {
          ai_hints.ai_family = AF_UNSPEC;
          ai_hints.ai_socktype = SOCK_STREAM;
          ai_hints.ai_protocol = IPPROTO_TCP;
      
          status = getaddrinfo( request->host, port, &ai_hints, &ai_list );
          sock = socket( ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol );
      }
      

      内核中创建socket就是返回一个sock的结构体,以及通过sock_map_fd为该sock分配一个文件socket_file_ops的结构体。结构体中跟我们最相关的三个函数,poll、read、write。

      #socket.c
      int __sys_socket(int family, int type, int protocol)
      {
          struct socket *sock;
          sock = __sys_socket_create(family, type, protocol);
      
          return sock_map_fd(sock, flags & (O_CLOEXEC | O_NONBLOCK));
      }
      
      static int sock_map_fd(struct socket *sock, int flags)
      {
          struct file *newfile;
          newfile = sock_alloc_file(sock, flags, NULL);
      }
      
      struct file *sock_alloc_file(struct socket *sock, int flags, const char *dname)
      {
          struct file *file;
          file = alloc_file_pseudo(SOCK_INODE(sock), sock_mnt, dname,
                      O_RDWR | (flags & O_NONBLOCK),
                      &socket_file_ops);
      }
      
      static const struct file_operations socket_file_ops = {
      .read_iter =    sock_read_iter,
      .write_iter =   sock_write_iter,
      .poll =     sock_poll,
      }
      
      static __poll_t sock_poll(struct file *file, poll_table *wait)
      {
          struct socket *sock = file->private_data;
          return sock->ops->poll(file, sock, wait) | flag;
      }
      

      我们再来看一下socket申请时结构体中的ops是何方神圣。

      #socket.c
      static struct socket *__sys_socket_create(int family, int type, int protocol)
      {
          struct socket *sock;
          retval = sock_create(family, type, protocol, &sock);
          return sock;
      }
      //sock_create实际调用的就是__sock_create
      int __sock_create(struct net *net, int family, int type, int protocol,
                  struct socket **res, int kern)
      {
          struct socket *sock;
          sock = sock_alloc();
          pf = rcu_dereference(net_families[family]);
          err = pf->create(net, sock, protocol, kern);
      }
      

      重点就这两行代码,从net_families中取得的pf结构体调用了create方法初始化了sock结构体。接下来我们看看net_families数组又是存储的什么数据。记住我们当前的family是AF_INET,表示ip协议。下面代码定义了一个结构体和数组,内核初始化的时候会调用inet_init初始化函数,用来初始化相关的网络协议。注意,PF_INET和上文的AF_INET是相同的宏定义。

      #af_inet.c
      static const struct net_proto_family inet_family_ops = {
          .family = PF_INET,
          .create = inet_create,
          .owner  = THIS_MODULE,
      };
      static struct inet_protosw inetsw_array[] =
      {
          {
              .type =       SOCK_STREAM,
              .protocol =   IPPROTO_TCP,
              .prot =       &tcp_prot,
              .ops =        &inet_stream_ops,
              .flags =      INET_PROTOSW_PERMANENT |
                      INET_PROTOSW_ICSK,
          },
      }
      
      static int __init inet_init(void)
      {
          (void)sock_register(&inet_family_ops);
          for (q = inetsw_array; q < &inetsw_array[INETSW_ARRAY_LEN]; ++q)
              inet_register_protosw(q);
      }
      
      int sock_register(const struct net_proto_family *ops)
      {
          if (rcu_dereference_protected(net_families[ops->family],
                          lockdep_is_held(&net_family_lock)))
              err = -EEXIST;
          else {
              rcu_assign_pointer(net_families[ops->family], ops);
              err = 0;
          }
      }
      
      //将上面inetsw_array数组的成员根据类型注册到inetsw这个静态变量的数组中。
      static struct list_head inetsw[SOCK_MAX];
      void inet_register_protosw(struct inet_protosw *p)
      {
          /* If we are trying to override a permanent protocol, bail. */
          last_perm = &inetsw[p->type];
          list_for_each(lh, &inetsw[p->type]) {
              answer = list_entry(lh, struct inet_protosw, list);
              /* Check only the non-wild match. */
              if ((INET_PROTOSW_PERMANENT & answer->flags) == 0)
                  break;
              if (protocol == answer->protocol)
                  goto out_permanent;
              last_perm = lh;
          }
      }
      

      inet_init函数调用sock_register接口,将inet_family_ops这个结构体放入了net_families中。所以我们在上文中ip协议的pf就是inet_family_ops结构体。这其实也是插件化的一种思路。在net_families注册的结构体对外都提供create接口,初始化的时候根据不同的family调用不同的create函数。当前inet_family_ops对应的create函数就是inet_create,我们来看看这个函数的实现。

      static int inet_create(struct net *net, struct socket *sock, int protocol,
                 int kern)
      {
          list_for_each_entry_rcu(answer, &inetsw[sock->type], list) {
      
              err = 0;
              /* Check the non-wild match. */
              if (protocol == answer->protocol) {
                  if (protocol != IPPROTO_IP)
                      break;
              } else {
                  /* Check for the two wild cases. */
                  if (IPPROTO_IP == protocol) {
                      protocol = answer->protocol;
                      break;
                  }
                  if (IPPROTO_IP == answer->protocol)
                      break;
              }
              err = -EPROTONOSUPPORT;
          }
          ...
          sock->ops = answer->ops;
      }
      

      创建的操作实际上就是从上面初始化好的静态变量中inetsw,根据类型协议获得answer并将其ops赋值给sock的ops。对应于我们使用的type为SOCK_STREAM和protocol为IPPROTO_TCP的socket,对应的ops就是inet_stream_ops这个结构体。来看看这个结构体的定义

      const struct proto_ops inet_stream_ops = {
          .family        = PF_INET,
          .owner         = THIS_MODULE,
          .release       = inet_release,
          .bind          = inet_bind,
          .connect       = inet_stream_connect,
          .socketpair    = sock_no_socketpair,
          .accept        = inet_accept,
          .getname       = inet_getname,
          .poll          = tcp_poll,
      };
      

      还记得上文socket的poll执行的函数吗?没错,执行sock的ops的poll函数,也就是inet_stream_ops的poll函数。

      static __poll_t sock_poll(struct file *file, poll_table *wait)
      {
          struct socket *sock = file->private_data;
          return sock->ops->poll(file, sock, wait) | flag;
      }
      
      #tcp.c
      __poll_t tcp_poll(struct file *file, struct socket *sock, poll_table *wait)
      {
          struct sock *sk = sock->sk;
          sock_poll_wait(file, sock, wait);
          ...
          if (sk->sk_shutdown == SHUTDOWN_MASK || state == TCP_CLOSE)
                  mask |= EPOLLHUP;
          if (sk->sk_shutdown & RCV_SHUTDOWN)
              mask |= EPOLLIN | EPOLLRDNORM | EPOLLRDHUP;
          if (state != TCP_SYN_SENT &&
              (state != TCP_SYN_RECV || rcu_access_pointer(tp->fastopen_rsk))) {
      
              if (tcp_stream_is_readable(sk, target))
                  mask |= EPOLLIN | EPOLLRDNORM;
      
              if (!(sk->sk_shutdown & SEND_SHUTDOWN)) {
                  if (__sk_stream_is_writeable(sk, 1)) {
                      mask |= EPOLLOUT | EPOLLWRNORM;
                  }
                  ...
              }
          }
          return mask;
      }
      
      static inline void sock_poll_wait(struct file *filp, struct socket *sock,
                    poll_table *p)
      {
          if (!poll_does_not_wait(p)) {
              poll_wait(filp, &sock->wq.wait, p);
              /* We need to be sure we are in sync with the
              * socket flags modification.
              *
              * This memory barrier is paired in the wq_has_sleeper.
              */
              smp_mb();
          }
      }
      

      从这小部分代码可以看出,poll执行了sock_poll_wait函数,这个函数最重要的行为是调用了poll_wait函数,并返回了当前socket发生的事件,比如可读或关闭的时候mask或EPOLLIN,可写的时候mask或EPOLLOUT。poll_wait函数非常重要,关系到监听的socket节点是如何注册回调函数的问题。

    3. 回到epoll,我们来看socket fd加入epoll节点的过程,调用的是epoll_ctl接口。我们只看新增fd的情况。

      #eventpoll.c
      /* Valid opcodes to issue to sys_epoll_ctl() */
      #define EPOLL_CTL_ADD 1
      #define EPOLL_CTL_DEL 2
      #define EPOLL_CTL_MOD 3
      
      //这个fd就是socket接口返回的fd
      int do_epoll_ctl(int epfd, int op, int fd, struct epoll_event *epds,
              bool nonblock)
      {
          //epi的意思就是eventpollitem
          epi = ep_find(ep, tf.file, fd);
      
          error = -EINVAL;
          switch (op) {
          case EPOLL_CTL_ADD:
              if (!epi) {
                  epds->events |= EPOLLERR | EPOLLHUP;
                  error = ep_insert(ep, epds, tf.file, fd, full_check);
              } else
                  error = -EEXIST;
              break;
              ...
          }
      }
      
      static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
                  struct file *tfile, int fd, int full_check)
      {
          struct epitem *epi;
          //kmem_cache_zalloc是从缓存中申请内存并清零。这也是epoll效率高的一个原因
          if (!(epi = kmem_cache_zalloc(epi_cache, GFP_KERNEL))) {
              percpu_counter_dec(&ep->user->epoll_watches);
              return -ENOMEM;
          }
      
          ep_rbtree_insert(ep, epi);
          epq.epi = epi;
          //在ep_ptable_queue_proc这个函数中会注册一个callback函数。
          init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
      
          //真正执行上面注册的ep_ptable_queue_proc函数是在此处触发
          revents = ep_item_poll(epi, &epq.pt, 1);
      }
      
      static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc)
      {
          pt->_qproc = qproc;
          pt->_key   = ~(__poll_t)0; /* all events enabled */
      }
      
      static __poll_t ep_item_poll(const struct epitem *epi, poll_table *pt,
                      int depth)
      {
          struct file *file = epi->ffd.file;
          //除非用epoll监听epoll节点,不然这里都是进入vfs_poll的流程。
          if (!is_file_epoll(file))
              res = vfs_poll(file, pt);
          else
              res = __ep_eventpoll_poll(file, pt, depth);
          return res & epi->event.events;
      }
      
      static inline __poll_t vfs_poll(struct file *file, struct poll_table_struct *pt)
      {
          if (unlikely(!file->f_op->poll))
              return DEFAULT_POLLMASK;
          return file->f_op->poll(file, pt);
      }
      
      static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
                      poll_table *pt)
      {
          //这个ep_poll_callback函数就是epoll事件发生时被唤醒时真正的回调函数。
          init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
      
      }
      
      static inline void
      init_waitqueue_func_entry(struct wait_queue_entry *wq_entry, wait_queue_func_t func)
      {
          wq_entry->flags     = 0;
          wq_entry->private   = NULL;
          wq_entry->func      = func;
      }
      

      来梳理下上述代码的调用过程。增加fd的情况下调用的ep_insert接口,这个接口申请了一个名为epi的epitem结构体,将其插入到epoll实例的红黑树中,用于后续的增删改查操作。调用init_poll_funcptr接口将ep_ptable_queue_proc函数添加到polltable的_qproc成员中。之后调用ep_item_poll接口,对于socketfd而言,会调用vfs_poll接口,这个接口执行的是socket保存的file,也就是上文sock_alloc_file申请的file,那么他对应的f_op也就是该接口调用时的参数socket_file_ops,poll函数也就是对应的sock_poll函数。第二点说socket的源码显示了,sock_poll最后调用了poll_wait接口。

      static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
      {
          if (p && p->_qproc && wait_address)
              p->_qproc(filp, wait_address, p);
      }
      

      poll_wait接口真正调用了init_poll_funcptr函数赋值的_qproc函数,也就是ep_ptable_queue_proc函数。这个函数才是真正将ep_poll_callback这个回调函数记录到等待队列wait_queue_entry的func成员中。

    4. 再来看看回调执行的时候发生了什么

      static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, void *key)
       {
           struct epitem *epi = ep_item_from_wait(wait);
           struct eventpoll *ep = epi->ep;
           /*
           * If we are transferring events to userspace, we can hold no locks
           * (because we're accessing user memory, and because of linux f_op->poll()
           * semantics). All the events that happen during that period of time are
           * chained in ep->ovflist and requeued later on.
           */
           if (READ_ONCE(ep->ovflist) != EP_UNACTIVE_PTR) {
               if (chain_epi_lockless(epi))
                   ep_pm_stay_awake_rcu(epi);
           } else if (!ep_is_linked(epi)) {
               /* In the usual case, add event to ready list. */
               if (list_add_tail_lockless(&epi->rdllink, &ep->rdllist))
                   ep_pm_stay_awake_rcu(epi);
           }
       }
      

      从注释中可以看出,当ovflist != EP_UNACTIVE_PTR,此时正在执行epoll_wait将双向链表的数据拷贝给用户,所以此时将节点放入ovflist中。通常情况下是将其直接插入到rdllist,也就是双向列表中。

    5. 到了将事件从双向列表中取出来的时候了。去掉一堆检查之后,实际执行的是ep_poll这个函数,重点在于判断有事件,执行ep_send_events函数,将事件拷贝到用户传进来的events数组里。

      static int do_epoll_wait(int epfd, struct epoll_event __user *events,
                int maxevents, struct timespec64 *to)
       {
           /* Time to fish for events ... */
           error = ep_poll(ep, events, maxevents, to);
       }
       static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
              int maxevents, struct timespec64 *timeout)
       {
           eavail = ep_events_available(ep);
      
           while (1) {
               if (eavail) {
                   /*
                   * Try to transfer events to user space. In case we get
                   * 0 events and there's still timeout left over, we go
                   * trying again in search of more luck.
                   */
                   res = ep_send_events(ep, events, maxevents);
                   if (res)
                       return res;
               }
      
               if (timed_out)
                   return 0;
           }       
       }
       static int ep_send_events(struct eventpoll *ep,
                 struct epoll_event __user *events, int maxevents)
       {
           struct epitem *epi, *tmp;
           LIST_HEAD(txlist);
      
           //这里是将双向链表剪到txlist上
           ep_start_scan(ep, &txlist);
      
           list_for_each_entry_safe(epi, tmp, &txlist, rdllink) {
               struct wakeup_source *ws;
               __poll_t revents;
      
               if (res >= maxevents)
                   break;
      
               list_del_init(&epi->rdllink);
      
               /*
               * If the event mask intersect the caller-requested one,
               * deliver the event to userspace. Again, we are holding ep->mtx,
               * so no operations coming from userspace can change the item.
               */
               revents = ep_item_poll(epi, &pt, 1);
               if (!revents)
                   continue;
      
               events = epoll_put_uevent(revents, epi->event.data, events);
               if (!events) {
                   list_add(&epi->rdllink, &txlist);
                   ep_pm_stay_awake(epi);
                   if (!res)
                       res = -EFAULT;
                   break;
               }
               res++;
               if (epi->event.events & EPOLLONESHOT)
                   epi->event.events &= EP_PRIVATE_BITS;
               else if (!(epi->event.events & EPOLLET)) {
                   /*
                   * If this file has been added with Level
                   * Trigger mode, we need to insert back inside
                   * the ready list, so that the next call to
                   * epoll_wait() will check again the events
                   * availability. At this point, no one can insert
                   * into ep->rdllist besides us. The epoll_ctl()
                   * callers are locked out by
                   * ep_scan_ready_list() holding "mtx" and the
                   * poll callback will queue them in ep->ovflist.
                   */
                   list_add_tail(&epi->rdllink, &ep->rdllist);
                   ep_pm_stay_awake(epi);
               }
           }
           ep_done_scan(ep, &txlist);
       }
      

      ep_send_events函数遍历了剪切出来的双向链表,调用ep_item_poll接口获取了所有节点的事件,我们在第三点讲过,ep_item_poll最后调用了sock的poll接口,也就是sock_poll函数。在第二点说过,sock_poll最终返回了当前节点的事件,以及调用了poll_wait函数,这里要注意与insert时候的区别,insert时候传进去的polltable设置了poll回调,而这里的回调是设置为null的。获得节点的event事件后将其放入传进来的events数组中。最后判断如果非ET模式的话,也就是LT模式,会将节点重新放入epoll实例的双向链表中。这样在下一次调用epoll_wait接口的时候,该节点会再次判断事件。如果没有事件的话,会直接跳过后续的处理。这也就是大部分文章所说的LT模式下,没有读写完成的情况下,在下次epoll的时候可以继续触发读写事件。所以LT和ET的本质区别是,双向链表的节点取出来后,如果有事件发生,LT模式下会重新放入双向链表中。下次epoll_wait的时候,依旧会遍历这个节点,如果没有事件,就不会返回给用户,也就是一个socket发生读写事件,至少会被epoll_wait遍历两次。而ET模式下则取出事件后不会重新插入,得等到下次socket节点上有新的读写事件发生,才会触发上面的epoll_callback接口重新插入双向链表。

    总结一下整个流程,先创建一个epoll实例,新建socket后,创建一个epitem节点,将其插入到epoll实例的红黑树中,并注册回调epoll_callback函数,在读写事件发生后会将节点插入到epoll实例的双向链表当中,epoll_wait会遍历双向链表,将读写事件传给用户,LT模式下会将节点重新放回双向链表当中。

    相关文章

      网友评论

          本文标题:结合skynet socket模块看epoll的原理

          本文链接:https://www.haomeiwen.com/subject/blhqmrtx.html