本篇我们来讲Redis的事件循环,Redis的事件循环会根据系统选择evport、epoll、kqueue或select来进行IO多路复用,我们这里只分析epoll。
Redis中的IO多路复用
首先我们来看一下Redis的IO多路复用对事件循环(aeEventLoop)提供的接口。
static int aeApiCreate(aeEventLoop *eventLoop)
static int aeApiResize(aeEventLoop *eventLoop, int setsize)
static void aeApiFree(aeEventLoop *eventLoop)
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask)
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask)
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)
以epoll(ae_epoll.c)为例,先来看一下Redis的IO多路复用的使用过程:
首先需要创建,即调用aeApiCreate:
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
state->epfd = epoll_create(1024);
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
eventLoop->apidata = state;
return 0;
}
aeApiState结构体有两个成员,events和epfd。events用于存储就绪的epoll事件,epfd存储epoll的文件描述符。aeApiCreate的主要逻辑是为aeApiState分配存储空间,调用epoll_create系统调用创建epoll并获取描述符,最后将aeApiState赋值给aeEventLoop的apidata。
然后在有新的文件描述符(比如接受了一个新连接)需要加入到epoll中时,调用aeApiAddEvent:
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0};
int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask;
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
参数fd是需要监视的文件描述符,mask标明是需要监视可读还是可写事件。aeApiAddEvent的主要逻辑是调用系统调用epoll_ctl注册或修改添加事件的监听类型到epoll。
然后在有文件描述符失效或者需要修改监听类型时,调用aeApiDelEvent:
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0};
int mask = eventLoop->events[fd].mask & (~delmask);
ee.events = 0;
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
if (mask != AE_NONE) {
epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
} else {
epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
}
}
参数fd是需要删除的文件描述符,mask标明是需要删除可读还是可写事件。aeApiDelEvent主要逻辑是调用系统调用epoll_ctl删除或修改删除事件的监听类型到epoll。
然后需要检查是否有就绪的事件,调用aeApiPoll:
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
tvp是等待时间,一般而言,这个值是0(不是NULL)代表没有就绪事件立即返回。主要逻辑是调用系统调用epoll_wait拿到就绪事件保存到events中,然后将events中的就绪事件复制到事件循环aeEventLoop的fired中,最后返回就绪事件的数量。
Redis中事件循环(主循环)
我们来分析一下Redis的事件循环(ae.c)。
先看主要接口:
aeEventLoop *aeCreateEventLoop(int setsize)
void aeDeleteEventLoop(aeEventLoop *eventLoop)
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData)
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc)
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id)
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep)
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep)
void aeMain(aeEventLoop *eventLoop)
创建过程:
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->lastTime = time(NULL);
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
eventLoop->aftersleep = NULL;
eventLoop->flags = 0;
if (aeApiCreate(eventLoop) == -1) goto err;
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}
初始化aeEventLoop和aeApiState并返回aeEventLoop。
注册文件事件:
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
aeFileEvent *fe = &eventLoop->events[fd];
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
存储到events中并调用aeApiAddEvent注册到epoll中。
注册定时事件:
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc)
{
long long id = eventLoop->timeEventNextId++;
aeTimeEvent *te;
te = zmalloc(sizeof(*te));
if (te == NULL) return AE_ERR;
te->id = id;
aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
te->timeProc = proc;
te->finalizerProc = finalizerProc;
te->clientData = clientData;
te->prev = NULL;
te->next = eventLoop->timeEventHead;
te->refcount = 0;
if (te->next)
te->next->prev = te;
eventLoop->timeEventHead = te;
return id;
}
创建aeTimeEvent并将其插入到定时事件链表的头部。
主循环:
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP);
}
}
不停的调用aeProcessEvents拉取并处理事件。
接下来看aeProcessEvents的逻辑:
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
if (eventLoop->maxfd != -1 ||((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
long now_sec, now_ms;
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
long long ms = (shortest->when_sec - now_sec)*1000 + shortest->when_ms - now_ms;
if (ms > 0) {
tvp->tv_sec = ms/1000;
tvp->tv_usec = (ms % 1000)*1000;
} else {
tvp->tv_sec = 0;
tvp->tv_usec = 0;
}
} else {
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
tvp = NULL;
}
}
if (eventLoop->flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
}
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);
numevents = aeApiPoll(eventLoop, tvp);
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0;
int invert = fe->mask & AE_BARRIER;
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
fe = &eventLoop->events[fd];
}
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
if (invert) {
fe = &eventLoop->events[fd];
if ((fe->mask & mask & AE_READABLE) && (!fired || fe->wfileProc != fe->rfileProc))
{
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
}
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed;
}
- 设置了AE_TIME_EVENTS(主循环设置了)标志但未设置AE_DONT_WAIT(主循环未设置)标志的情况下,找到最近的定时事件距离现在的毫秒数,作为aeApiPoll的超时时间。如果AE_TIME_EVENTS和AE_DONT_WAIT标志未设置或者AE_TIME_EVENTS设置了但AE_DONT_WAIT标志未设置且没有定时任务,则超时时间为无穷。AE_DONT_WAIT决定超时时间是否为0,也就是是否等待。AE_TIME_EVENTS决定是否触发定时事件。
- 调用beforesleep钩子函数。
- 调用aeApiPoll拉取就绪事件。
- 调用aftersleep钩子函数。
- 调用文件描述符对应的事件的回调函数处理就绪事件,AE_BARRIER决定是先处理可读事件还是可写事件。
- 处理定时事件。
再看一下定时事件的触发,也就是processTimeEvents的逻辑:
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
time_t now = time(NULL);
if (now < eventLoop->lastTime) {
te = eventLoop->timeEventHead;
while(te) {
te->when_sec = 0;
te = te->next;
}
}
eventLoop->lastTime = now;
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
while(te) {
long now_sec, now_ms;
long long id;
if (te->id == AE_DELETED_EVENT_ID) {
aeTimeEvent *next = te->next;
if (te->refcount) {
te = next;
continue;
}
if (te->prev)
te->prev->next = te->next;
else
eventLoop->timeEventHead = te->next;
if (te->next)
te->next->prev = te->prev;
if (te->finalizerProc)
te->finalizerProc(eventLoop, te->clientData);
zfree(te);
te = next;
continue;
}
if (te->id > maxId) {
te = te->next;
continue;
}
aeGetTime(&now_sec, &now_ms);
if (now_sec > te->when_sec || (now_sec == te->when_sec && now_ms >= te->when_ms))
{
int retval;
id = te->id;
te->refcount++;
retval = te->timeProc(eventLoop, id, te->clientData);
te->refcount--;
processed++;
if (retval != AE_NOMORE) {
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
} else {
te->id = AE_DELETED_EVENT_ID;
}
}
te = te->next;
}
return processed;
}
遍历注册的定时事件,找出到期的事件并调用处理函数,如果处理函数返回了下次执行的时间,则更新下次触发的时间,否则删除该事件。
网友评论