美文网首页
Muduo库设计(3)——EventLoop::runInLoo

Muduo库设计(3)——EventLoop::runInLoo

作者: Magic11 | 来源:发表于2019-11-28 14:49 被阅读0次

    RunInLoop函数允许其他线程往当前线程添加回调函数,并能够保证所添加的回调函数都在当前线程中执行

    typedef std::function<void()> Functor;
    void runInLoop(const Functor& cb);
    

    由于添加的回调函数并不能立即被执行,为了让runInLoop函数尽快返回,EventLoop对象需要持有一个函数数组,用于临时存储这些被添加的回调函数

    typedef std::function<void()> Functor;
    std::vector<Functor> pendingFunctors_;
    

    由于pendingFunctors_可以被其他线程访问,所以需要加锁保护
    runInLoop的执行逻辑如下:
    首先判断是否在当前线程,若在当前线程,直接执行该回调,否则将该回调加入pendingFunctors_数组,代码逻辑如下:

    void EventLoop::runInLoop(const Functor& cb)
    {
      if (isInLoopThread())
      {
        cb();
      }
      else
      {
        queueInLoop(cb);
      }
    }
    
    void EventLoop::queueInLoop(const Functor& cb)
    {
      {
        std::lock_guard<std::mutex> lock(mutex_);
        pendingFunctors_.push_back(cb);
      }
    
      if (!isInLoopThread() || callingPendingFunctors_)
      {
        wakeup();
      }
    }
    

    注意:当将回调函数加入回调队列之后,需要调用一下weakup函数;
    由于IO线程平时会阻塞在事件循环EventLoop::loop()对poll()的调用中,weakup的作用是将当前线程尽快从阻塞中唤醒。
    Wakeup的逻辑如下:
    首先EventLoop需要创建一个fd,并创建一个负责该fd的Channel,然后添加该channel的读事件回调,并一直检测该Channel的读事件,代码逻辑如下:

    EventLoop::EventLoop() 
        : wakeupFd_(createEventfd()),
        wakeupChannel_(new Channel(this, wakeupFd_))
    {
      wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));
      // 一直检查该fd是否可读
      wakeupChannel_->enableReading();
    }
    
    void EventLoop::wakeup()
    {
      uint64_t one = 1;
      ssize_t n = ::write(wakeupFd_, &one, sizeof one);
      if (n != sizeof one)
      {
        LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
      }
    }
    
    void EventLoop::handleRead()
    {
      uint64_t one = 1;
      ssize_t n = ::read(wakeupFd_, &one, sizeof one);
      if (n != sizeof one)
      {
        LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
      }
    }
    

    每次EventLoop执行完事件的抓取(poll)及事件的分发(handleEvent)之后,就会依次调用回调函数队列里的函数,代码逻辑如下:

    void EventLoop::loop()
    {
      while (!quit_)
      {
        activeChannels_.clear();
        pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
        for (auto it = activeChannels_.begin(); it != activeChannels_.end(); ++it)
        {
          (*it)->handleEvent();
        }
        doPendingFunctors();    //执行回调队列里的函数
      }
    }
    

    执行回调函数的逻辑如下:

    void EventLoop::doPendingFunctors()
    {
      std::vector<Functor> functors;
      callingPendingFunctors_ = true;
    
      {
        std::lock_guard<std::mutex> lock(mutex_);
        functors.swap(pendingFunctors_);
      }
    
      for (size_t i = 0; i < functors.size(); ++i)
      {
        functors[i]();
      }
      callingPendingFunctors_ = false;
    }
    

    此处,为了避免在加锁的状态下对函数队列的访问时间过长,采取了一个优化,即先将函数队列swap到一个临时的vector里面,然后再依次遍历并执行临时vector里的回调函数。
    这样既减少了临界区的长度(意味着不会阻塞其他线程调用queueInLoop),也避免了死锁(因为在回调函数中还可能调用queueInLoop,在已加锁的状态下还要去申请锁)。

    相关文章

      网友评论

          本文标题:Muduo库设计(3)——EventLoop::runInLoo

          本文链接:https://www.haomeiwen.com/subject/ghdbwctx.html