RunInLoop函数允许其他线程往当前线程添加回调函数,并能够保证所添加的回调函数都在当前线程中执行
typedef std::function<void()> Functor;
void runInLoop(const Functor& cb);
由于添加的回调函数并不能立即被执行,为了让runInLoop函数尽快返回,EventLoop对象需要持有一个函数数组,用于临时存储这些被添加的回调函数
typedef std::function<void()> Functor;
std::vector<Functor> pendingFunctors_;
由于pendingFunctors_可以被其他线程访问,所以需要加锁保护
runInLoop的执行逻辑如下:
首先判断是否在当前线程,若在当前线程,直接执行该回调,否则将该回调加入pendingFunctors_数组,代码逻辑如下:
void EventLoop::runInLoop(const Functor& cb)
{
if (isInLoopThread())
{
cb();
}
else
{
queueInLoop(cb);
}
}
void EventLoop::queueInLoop(const Functor& cb)
{
{
std::lock_guard<std::mutex> lock(mutex_);
pendingFunctors_.push_back(cb);
}
if (!isInLoopThread() || callingPendingFunctors_)
{
wakeup();
}
}
注意:当将回调函数加入回调队列之后,需要调用一下weakup函数;
由于IO线程平时会阻塞在事件循环EventLoop::loop()对poll()的调用中,weakup的作用是将当前线程尽快从阻塞中唤醒。
Wakeup的逻辑如下:
首先EventLoop需要创建一个fd,并创建一个负责该fd的Channel,然后添加该channel的读事件回调,并一直检测该Channel的读事件,代码逻辑如下:
EventLoop::EventLoop()
: wakeupFd_(createEventfd()),
wakeupChannel_(new Channel(this, wakeupFd_))
{
wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));
// 一直检查该fd是否可读
wakeupChannel_->enableReading();
}
void EventLoop::wakeup()
{
uint64_t one = 1;
ssize_t n = ::write(wakeupFd_, &one, sizeof one);
if (n != sizeof one)
{
LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
}
}
void EventLoop::handleRead()
{
uint64_t one = 1;
ssize_t n = ::read(wakeupFd_, &one, sizeof one);
if (n != sizeof one)
{
LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
}
}
每次EventLoop执行完事件的抓取(poll)及事件的分发(handleEvent)之后,就会依次调用回调函数队列里的函数,代码逻辑如下:
void EventLoop::loop()
{
while (!quit_)
{
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
for (auto it = activeChannels_.begin(); it != activeChannels_.end(); ++it)
{
(*it)->handleEvent();
}
doPendingFunctors(); //执行回调队列里的函数
}
}
执行回调函数的逻辑如下:
void EventLoop::doPendingFunctors()
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
std::lock_guard<std::mutex> lock(mutex_);
functors.swap(pendingFunctors_);
}
for (size_t i = 0; i < functors.size(); ++i)
{
functors[i]();
}
callingPendingFunctors_ = false;
}
此处,为了避免在加锁的状态下对函数队列的访问时间过长,采取了一个优化,即先将函数队列swap到一个临时的vector里面,然后再依次遍历并执行临时vector里的回调函数。
这样既减少了临界区的长度(意味着不会阻塞其他线程调用queueInLoop),也避免了死锁(因为在回调函数中还可能调用queueInLoop,在已加锁的状态下还要去申请锁)。
网友评论