一、IO multiplexing简介
首先介绍一下pollfd结构体及poll接口:
struct pollfd {
int fd; //想查询的文件描述符
short events; //要求查询的事件掩码(我们感兴趣的事件)
short revents; //返回的事件掩码(实际发生的事件)
};
//events_ |= POLLIN | POLLPRI; 表示监控是否可读。
//events_ |= POLLOUT; 表示监控是否可写。
//events_ |= POLLIN|POLLOUT; 表示监控是否可读或可写。
//revents_ & (POLLERR | POLLNVAL) 为true 表示出错
//revents_ & (POLLIN | POLLPRI | POLLRDHUP) 为true 表示可读
//revents_ & POLLOUT 为true 表示可写
int poll(struct pollfd *ufds, unsigned int nfds, int timeout);
//ufds 指向 struct pollfd 数组
//nfds 指定 pollfd 数组元素的个数,也就是要监测几个 pollfd
//timeout是等待的毫秒数,timeout为负数表示无线等待,timeout为0表示调用后立即返回。
//执行结果:为0表示超时前没有任何事件发生;
// -1表示失败;
// 成功则返回结构体中revents不为0的文件描述符个数。
二、Reator的关键结构
Reator的核心是事件分发机制,即将IO多路复用(IO multiplexing)拿到的IO事件分发给各个文件描述符(fd)的事件处理函数。
这里主要有三个关键的类Poller、EventLoop、Channel:
Poller类负责IO多路复用
EventLoop负责IO事件分发
Channel类负责管理一个文件描述符(fd),并存储该fd所关注的每个事件对应的回调函数
1、Poller类——只负责多路复用
Poller包含一个vector和一个map
其中vector用来存放pollfd, map用来存放 <fd, channel> 键值对
其中vector用于每次调用poll接口的参数
int numEvents = ::poll(pollfds_.data(), pollfds_.size(), timeoutMs);
vector和map更新的逻辑如下:
1) 当有一个新的channel进来时
a) 创建新的pollfd,从channel中取出fd,要查询的事件events,赋值给pollfd
b) 将新的pollfd插入vector的尾部,并将pollfd在vector中的位置赋值给 channel的index成员
c) 将键值对<fd, channel>插入map
2)已存在的channel
a) 取出channel的index成员变量的值
b) 根据index从vector中取出pollfd
c) 从channel中取出要查询的事件events,赋值给pollfd
具体的代码逻辑如下:
void Poller::updateChannel(Channel* channel)
{
assertInLoopThread(); //必须要在当前线程中执行
if (channel->index() < 0) { //channel的index默认为 -1
//新的channel对象
struct pollfd pfd;
pfd.fd = channel->fd();
pfd.events = static_cast<short>(channel->events()); //这里channel的events是一个整形 int events_;
pfd.revents = 0; //新插入的channel revents初始化为0
pollfds_.push_back(pfd); //将fd插入vector
int idx = static_cast<int>(pollfds_.size()) - 1;
channel->set_index(idx); //channel的index设置为fd在
channels_[pfd.fd] = channel; //将 <fd, channel> 插入map
} else {
//更新已存在的channel对象
int idx = channel->index(); //已存在的channel对象,其index值记录fd在vector中的位置
struct pollfd& pfd = pollfds_[idx]; //从vector中取出index位置的pfd
pfd.events = static_cast<short>(channel->events()); //更新要查询的事件
pfd.revents = 0; //revents初始化为0
if (channel->isNoneEvent()) {
// ignore this pollfd
pfd.fd = -1;
}
}
可以看出updateChannel函数必须要在当前线程中执行,当前线程也即创建EventLoop对象的线程。
这里还没看出 map 的作用,map其实是用于 每次调用poll之后,可以通过fd快速查询到channel对象,从而返还给EventLoop用于事件的分发,具体代码如下:
void Poller::fillActiveChannels(int numEvents, ChannelList* activeChannels) const
{
//遍历vector
for (auto pfd = pollfds_.begin(); pfd != pollfds_.end() && numEvents > 0; ++pfd)
{
if (pfd->revents > 0) //有要发生的事件
{
--numEvents;
auto ch = channels_.find(pfd->fd); //从map中取出channel
Channel* channel = ch->second;
channel->set_revents(pfd->revents); //更新的channel的revents成员
activeChannels->push_back(channel); //插入到activeChannels
}
}
}
2、EventLoop类——负责事件的轮询和分发
每个EventLoop类在创建时都会持有一个Poller对象
EventLoop::EventLoop()
: looping_(false),
quit_(false),
threadId_(std::this_thread::get_id()),
poller_(new Poller(this)),
{
}
EventLoop类负责事件的轮询和分发,具体代码如下:
void EventLoop::loop()
{
assertInLoopThread();
looping_ = true;
quit_ = false;
while (!quit_)
{
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs/*10000*/, &activeChannels_);
for (auto it = activeChannels_.begin(); it != activeChannels_.end(); ++it)
{
(*it)->handleEvent();
}
}
LOG_TRACE << "EventLoop " << this << " stop looping";
looping_ = false;
}
每次轮询结束后得到的是一个channel的数组,通过遍历该数组,依次调用每个channel的handleEvent函数。
3、Channel类——负责管理一个文件描述符(fd)及相应的事件回调函数
Channel类在创建的时候需要传入一个fd:
Channel::Channel(EventLoop* loop, int fd)
: loop_(loop),
fd_(fd),
events_(0),
revents_(0),
index_(-1)
{
}
这里可以看到Channel类另外三个关键的成员:events、revents、index, 它们的作用在介绍Poller类时已经说过了.
1)Channel如何更新事件(events):
void enableReading() { events_ |= kReadEvent; update(); } //关注是否可读
void enableWriting() { events_ |= kWriteEvent; update(); } //关注是否可写
void disableWriting() { events_ &= ~kWriteEvent; update(); } //不再关注是否可写
void disableAll() { events_ = kNoneEvent; update(); } //不关注任何事件
void Channel::update()
{
loop_->updateChannel(this);
}
- Channel如何处理事件的回调
首先Channel的调用者需要先注册回调:
typedef std::function<void()> EventCallback;
void setReadCallback(const EventCallback& cb)
{ readCallback_ = cb; }
void setWriteCallback(const EventCallback& cb)
{ writeCallback_ = cb; }
void setErrorCallback(const EventCallback& cb)
{ errorCallback_ = cb; }
其次EventLoop在每次事件分发时,对于有事件发生的Channel都会调用channel.handleEvent()方法
void EventLoop::loop()
{
while (!quit_)
{
activeChannels_.clear();
poller_->poll(kPollTimeMs, &activeChannels_);
for (auto it = activeChannels_.begin(); it != activeChannels_.end(); ++it)
{
(*it)->handleEvent();
}
}
}
void Channel::handleEvent()
{
if (revents_ & POLLNVAL) {
LOG_WARN << "Channel::handle_event() POLLNVAL";
}
if (revents_ & (POLLERR | POLLNVAL)) {
if (errorCallback_) errorCallback_();
}
if (revents_ & (POLLIN | POLLPRI | POLLRDHUP)) {
if (readCallback_) readCallback_();
}
if (revents_ & POLLOUT) {
if (writeCallback_) writeCallback_();
}
}
可以看出Channel事件的更新update() 和 事件的回调handleEvent()都是在当前线程中执行的。
下面看一个具体Channel的用法示例:
void timeout()
{
printf("Timeout!\n");
g_loop->quit();
}
int main()
{
muduo::EventLoop loop;
int timerfd = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
muduo::Channel channel(&loop, timerfd);
channel.setReadCallback(timeout);
channel.enableReading();
struct itimerspec howlong;
bzero(&howlong, sizeof howlong);
howlong.it_value.tv_sec = 5;
::timerfd_settime(timerfd, 0, &howlong, NULL);
loop.loop();
::close(timerfd);
}
网友评论