美文网首页
从逻辑上扩展Epoll的事件列表

从逻辑上扩展Epoll的事件列表

作者: 萧然AND沐橦 | 来源:发表于2018-11-26 00:37 被阅读24次

    环境:X86_64 CentOS 7.0
    对于Epoll自身支持的事件类型有如下几种,详细可以man 2 epoll_ctl

    • EPOLLIN
    • EPOLLOUT
    • EPOLLERR
    • EPOLLET
    • EPOLLONESHUT
    • EPOLLHUP 没用过
    • EPOLLRDHUP 没用过
    • EPOLLPRI 没用过
    • EPOLLWAKEUP 没用过
    • EPOLLEXCLUSIVE 没用过

    以上是Epoll支持的事件类型,鶸只用过前几个,没有用过的也不敢多言。这些事件对于一般的场景使用应当已足以。

    但是对于需要扩充事件类型时,比如:

    1. 需要优雅退出的时候,如何让epoll_wait知道现在程序需要退出了。
    2. 有数据包需要发送的时候,如何通知epoll_wait 让其尽快唤醒网络线程,去处理网络包。
    3. ... ... 等等

    上述两条是鶸用到的场景,是学习了Tars的网络层处理之后,在自己的项目中使用到的。(当然实现方式很多,这里只是其中一种粗浅的方式)

    众所周知,epoll维护了一个红黑树,来快速查找到有数据到来的fd对应的epoll_event结构体,从而copy给用户态,供用户详细地处理。epoll_event的结构如下:

    struct epoll_event {
      uint32_t   events;         /* Epoll events */
      epoll_data_t data ;      /* User data variable */
    }
    
    typedef union epoll_data {
      void *     ptr ;
      int        fd ;
      uint32_t   u32;
      uint64_t   u64;
    } epoll_data_t ;
    

    从这里可以看出来,epoll自己的Event类型,最终会赋值给events变量。给用户留的只有epoll_data。但epoll_data是一个union,只能使用其中一个字段。我们大多数时间使用fd,用来知道具体是哪一个fd产生了事件。但这不能够用,比如我们还要记录一些这个fd上的上下文信息, 举个例子,假设我们使用ET模式,上一次收包可能只收到一部分,这时候就需要暂时将这部分数据暂存起来,并且记录一下已经收到的长度等等信息。

    这时候就可以有多种做法了,可以使用map来将fd作为key,通过每次索引,找到fd对应的上下文结构体,也是没有问题的。

    也可以使用epoll_data的ptr字段,将详细的fd信息、上下文信息保存在一个动态申请的结构体中,将指针赋值给ptr就可以。只需要维护好这个指针的生存期就可以了。

    鶸就使用了类似于类似于第二种的做法。只不过是自己生成了一个连接ID,保存在了epoll_data_t 的低32位中。

    关键就在这里,epoll_data_t是一个union,union的特点是,其大小是最长的一个元素的大小。也就是说,不管你给epoll_data_t里面存了什么,哪怕是只写了一个bit,那它也占8个字节。

    我们就借助这个特点,将epoll_data_t的高32位也利用起来。

      enum EVENT{
          EV_NET = 0,
          EV_LISTEN,
          EV_CLOSE,
          EV_SEND,
          EV_TERMINATED
      };
    
    
      template<typename T, typename DataQueue>
      void NetThread<T,DataQueue>::start_run() noexcept {
          while ( !terminated.load(std::memory_order_acquire) ) {
              int evs = epoll_instance.start_epoll_wait(-1) ;
              if ( evs <= 0 ) {
                  continue ;
              }
              for( size_t i = 0 ; i < evs ; i++ ) {
                  const epoll_ev& ev = epoll_instance.get_event(i) ;
                  uint32_t ev_type = ev.data.u64 >> 32 ;
                  switch (ev_type) {
                      case EV_LISTEN:
                          if ( ev.events & EPOLLIN ) {
                              LOG_INFO("recv EV_LISTEN event\n");
                              processListenfd() ;
                          }
                          break ;
                      case EV_NET:
                          processNet(ev) ;
                          break ;
                          // process close 
                      case EV_CLOSE:
                          // maybe will not occur
                          processClose(ev) ;
                          break ;
                      case EV_TERMINATED:
                          // terminated all net thread
                          break ;
                      case EV_SEND:
                          // send packages ~
                          processSend() ;
                          break ;
                  }
              }
          }
      }
    

    这样,我们就借助了epoll_data_t的高32位,从逻辑上扩展了Epoll的事件列表。

    乍一看有些二,写的是啥玩意儿。稍作解释一下:

    前面也说了,鶸用epoll_data_t的低32位记录了一个客户端的连接ID。

    默认情况下,EV_NET,EV_CLOSE,EV_TERMINATED,EV_SEND是都不会触发的。这些只有在fd需要这些事件的时候,通过epoll_ctl,注册EPOLLOUT事件的同时,将EV_NET,EV_CLOSE,EV_TERMINATED,EV_SEND写入到epoll_data_t的高32位。

    举个例子,当需要通知网络线程发送数据的时候,就可以如此处理(这里的sendfd是一个单独的fd,只用socket创建出来,让其可以加入到epoll中即可。)

      #define H64(x)              ((long)(x)<<32)
    
              // for handler use set response data
              virtual bool add_send_data(DataPkg& datapkg) override {
                  if ( curr_send_queue_size.load(std::memory_order_acquire) > max_send_size ) {
                      LOG_ERROR("send queue is full ...") ;
                      return false ;
                  }
                  // add to send queue
                  {
                      std::lock_guard<std::mutex> l{lock} ;
                      if ( send_queue.size() > max_send_size) {
                          LOG_ERROR("send queue is full ...") ;
                          return false ;
                      }
                      send_queue.emplace_back(datapkg) ;
                      curr_send_queue_size++;
                  }
                  // add EPOLLOUT event of this fd
                  epoll_instance.mod(sendfd , H64(EV_SEND), EPOLLOUT|EPOLLET) ;
                  return true ;
              }
    

    给sendfd注册EPOLLOUT事件:

              void ctrl(int fd , uint64_t data , uint32_t events , int op) noexcept {
                  epoll_ev ev ;
                  ev.data.u64 = data ;
                  ev.events = events ;
                  ::epoll_ctl(epollfd , op , fd , &ev) ;
              }
              void add_fd(int fd ,uint64_t data,uint32_t ev , bool nonblock) noexcept {
                  if ( nonblock) {
                      set_nonblocking(fd) ;
                  }
                  ctrl(fd , data , ev, EPOLL_CTL_ADD) ;
              }
              void mod(int fd , uint64_t data , uint32_t ev) noexcept {
                  ctrl(fd , data ,ev, EPOLL_CTL_MOD) ;
              }
    

    这样,epoll_wait 醒过来之后,就可以通过EV_SEND事件来处理发送队列的数据。而sendfd在这里只是扮演了一个给epoll来监听的媒介。真正用到的是epoll_data_t的高32位。

    萧然
    2018-11-26 00:35

    相关文章

      网友评论

          本文标题:从逻辑上扩展Epoll的事件列表

          本文链接:https://www.haomeiwen.com/subject/actvqqtx.html