美文网首页
Muduo库设计(2)——Reator的关键结构

Muduo库设计(2)——Reator的关键结构

作者: Magic11 | 来源:发表于2019-11-27 14:33 被阅读0次

    一、IO multiplexing简介

    首先介绍一下pollfd结构体及poll接口:

    struct pollfd {
     int fd;        //想查询的文件描述符
     short events;  //要求查询的事件掩码(我们感兴趣的事件)
     short revents; //返回的事件掩码(实际发生的事件)
    };
    
    //events_ |= POLLIN | POLLPRI;  表示监控是否可读。
    //events_ |= POLLOUT;           表示监控是否可写。
    //events_ |= POLLIN|POLLOUT;    表示监控是否可读或可写。
    
    //revents_ & (POLLERR | POLLNVAL)           为true  表示出错
    //revents_ & (POLLIN | POLLPRI | POLLRDHUP) 为true  表示可读
    //revents_ & POLLOUT                        为true  表示可写
    
    int poll(struct pollfd *ufds, unsigned int nfds, int timeout);
    
    //ufds 指向 struct pollfd 数组 
    //nfds 指定 pollfd 数组元素的个数,也就是要监测几个 pollfd
    //timeout是等待的毫秒数,timeout为负数表示无线等待,timeout为0表示调用后立即返回。
    
    //执行结果:为0表示超时前没有任何事件发生;
    //          -1表示失败;
    //          成功则返回结构体中revents不为0的文件描述符个数。
    

    二、Reator的关键结构

    Reator的核心是事件分发机制,即将IO多路复用(IO multiplexing)拿到的IO事件分发给各个文件描述符(fd)的事件处理函数。

    这里主要有三个关键的类Poller、EventLoop、Channel:
    Poller类负责IO多路复用
    EventLoop负责IO事件分发
    Channel类负责管理一个文件描述符(fd),并存储该fd所关注的每个事件对应的回调函数

    1、Poller类——只负责多路复用

    Poller包含一个vector和一个map
    其中vector用来存放pollfd, map用来存放 <fd, channel> 键值对

    其中vector用于每次调用poll接口的参数

    int numEvents = ::poll(pollfds_.data(), pollfds_.size(), timeoutMs);
    

    vector和map更新的逻辑如下:
    1) 当有一个新的channel进来时
    a) 创建新的pollfd,从channel中取出fd,要查询的事件events,赋值给pollfd
    b) 将新的pollfd插入vector的尾部,并将pollfd在vector中的位置赋值给 channel的index成员
    c) 将键值对<fd, channel>插入map
    2)已存在的channel
    a) 取出channel的index成员变量的值
    b) 根据index从vector中取出pollfd
    c) 从channel中取出要查询的事件events,赋值给pollfd
    具体的代码逻辑如下:

    void Poller::updateChannel(Channel* channel)
    {
      assertInLoopThread();                                 //必须要在当前线程中执行
      if (channel->index() < 0) {                           //channel的index默认为 -1
        //新的channel对象
        struct pollfd pfd;
        pfd.fd = channel->fd();
        pfd.events = static_cast<short>(channel->events()); //这里channel的events是一个整形   int events_;
        pfd.revents = 0;                                    //新插入的channel  revents初始化为0
        pollfds_.push_back(pfd);                            //将fd插入vector
        
        int idx = static_cast<int>(pollfds_.size()) - 1;  
        channel->set_index(idx);                            //channel的index设置为fd在
        channels_[pfd.fd] = channel;                        //将 <fd, channel> 插入map
      } else {
        //更新已存在的channel对象
        int idx = channel->index();                         //已存在的channel对象,其index值记录fd在vector中的位置
        struct pollfd& pfd = pollfds_[idx];                 //从vector中取出index位置的pfd
        pfd.events = static_cast<short>(channel->events()); //更新要查询的事件
        pfd.revents = 0;                                    //revents初始化为0
        if (channel->isNoneEvent()) {
          // ignore this pollfd
          pfd.fd = -1;
      }
    }
    

    可以看出updateChannel函数必须要在当前线程中执行,当前线程也即创建EventLoop对象的线程。
    这里还没看出 map 的作用,map其实是用于 每次调用poll之后,可以通过fd快速查询到channel对象,从而返还给EventLoop用于事件的分发,具体代码如下:

    void Poller::fillActiveChannels(int numEvents, ChannelList* activeChannels) const
    {
      //遍历vector
      for (auto pfd = pollfds_.begin(); pfd != pollfds_.end() && numEvents > 0; ++pfd)
      { 
        if (pfd->revents > 0)                    //有要发生的事件
        {
          --numEvents;
          auto ch = channels_.find(pfd->fd);     //从map中取出channel
          Channel* channel = ch->second;
          channel->set_revents(pfd->revents);    //更新的channel的revents成员
          activeChannels->push_back(channel);    //插入到activeChannels
        }
      }
    }
    
    2、EventLoop类——负责事件的轮询和分发

    每个EventLoop类在创建时都会持有一个Poller对象

    EventLoop::EventLoop()
      : looping_(false),
        quit_(false),
        threadId_(std::this_thread::get_id()),
        poller_(new Poller(this)),
    {
    }
    

    EventLoop类负责事件的轮询和分发,具体代码如下:

    void EventLoop::loop()
    {
      assertInLoopThread();
      looping_ = true;
      quit_ = false;
    
      while (!quit_)
      {
        activeChannels_.clear();
        pollReturnTime_ = poller_->poll(kPollTimeMs/*10000*/, &activeChannels_);
        for (auto it = activeChannels_.begin(); it != activeChannels_.end(); ++it)
        {
          (*it)->handleEvent();
        }
      }
    
      LOG_TRACE << "EventLoop " << this << " stop looping";
      looping_ = false;
    }
    

    每次轮询结束后得到的是一个channel的数组,通过遍历该数组,依次调用每个channel的handleEvent函数。

    3、Channel类——负责管理一个文件描述符(fd)及相应的事件回调函数

    Channel类在创建的时候需要传入一个fd:

    Channel::Channel(EventLoop* loop, int fd)
      : loop_(loop),
        fd_(fd),
        events_(0),
        revents_(0),
        index_(-1)
    {
    }
    

    这里可以看到Channel类另外三个关键的成员:events、revents、index, 它们的作用在介绍Poller类时已经说过了.
    1)Channel如何更新事件(events):

    void enableReading()  { events_ |= kReadEvent;   update();   }  //关注是否可读
    void enableWriting()  { events_ |= kWriteEvent;  update();   }  //关注是否可写
    void disableWriting() { events_ &= ~kWriteEvent; update();   }  //不再关注是否可写
    void disableAll()     { events_  = kNoneEvent;   update();   }  //不关注任何事件
    
    void Channel::update()
    {
      loop_->updateChannel(this);
    }
    
    1. Channel如何处理事件的回调
      首先Channel的调用者需要先注册回调:
    typedef std::function<void()> EventCallback;
    
    void setReadCallback(const EventCallback& cb)
    { readCallback_ = cb; }
    void setWriteCallback(const EventCallback& cb)
    { writeCallback_ = cb; }
    void setErrorCallback(const EventCallback& cb)
    { errorCallback_ = cb; }
    

    其次EventLoop在每次事件分发时,对于有事件发生的Channel都会调用channel.handleEvent()方法

    void EventLoop::loop()
    {
      while (!quit_)
      {
        activeChannels_.clear();
        poller_->poll(kPollTimeMs, &activeChannels_);
        for (auto it = activeChannels_.begin(); it != activeChannels_.end(); ++it)
        {
          (*it)->handleEvent();
        }
      }
    }
    
    void Channel::handleEvent()
    {
      if (revents_ & POLLNVAL) {
        LOG_WARN << "Channel::handle_event() POLLNVAL";
      }
    
      if (revents_ & (POLLERR | POLLNVAL)) {
        if (errorCallback_) errorCallback_();
      }
      if (revents_ & (POLLIN | POLLPRI | POLLRDHUP)) {
        if (readCallback_) readCallback_();
      }
      if (revents_ & POLLOUT) {
        if (writeCallback_) writeCallback_();
      }
    }
    

    可以看出Channel事件的更新update() 和 事件的回调handleEvent()都是在当前线程中执行的。
    下面看一个具体Channel的用法示例:

    void timeout()
    {
      printf("Timeout!\n");
      g_loop->quit();
    }
    
    int main()
    {
      muduo::EventLoop loop;
    
      int timerfd = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
      
      muduo::Channel channel(&loop, timerfd);
      channel.setReadCallback(timeout);
      channel.enableReading();
    
      struct itimerspec howlong;
      bzero(&howlong, sizeof howlong);
      howlong.it_value.tv_sec = 5;
      ::timerfd_settime(timerfd, 0, &howlong, NULL);
    
      loop.loop();
    
      ::close(timerfd);
    }
    

    相关文章

      网友评论

          本文标题:Muduo库设计(2)——Reator的关键结构

          本文链接:https://www.haomeiwen.com/subject/kgjywctx.html