本文主要介绍Redis的事件驱动模型。
建议阅读:
1、Redis 事件的理论说明见:wenmingxing Redis之事件
I、事件驱动数据结构
Redis事件驱动内部有四个主要数据结构,分别是:事件循环结构体,文件事件结构体,时间事件结构体和触发事件结构体。
/*文件事件结构体*/
/* File event structure */
/* src/ae.h/aeFileEvent */
typedef struct aeFileEvent {
int mask; /*one of AE_(READABLE|WRITABLE) 表示是可读还是可写*/
// 回调函数指针
aeFileProc *rfileProc;
aeFileProc *wfileProc;
// clientData 参数一般是指向redisClient 的指针
void *clientData;
} aeFileEvent;
/* 时间事件结构体*/
/* Time event structure */
/* src/ae.h/aeTimeEvent*/
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* 时间戳,seconds */
long when_ms; /* milliseconds */
// 定时回调函数指针
aeTimeProc *timeProc;
// 定时事件清理函数,当删除时间事件的时候会被调用
aeEventFinalizerProc *finalizerProc;
// clientData 参数一般是指向redisClient 的指针
void *clientData;
// 时间事件表采用链表来维护,但是会退化为指针操作
struct aeTimeEvent *next;
} aeTimeEvent;
/* 触发事件*/
/* A fired event */
/* src/ae.h/aeFiredEvent */
typedef struct aeFiredEvent {
int fd; //socket
int mask; //标记是读事件还是写事件
} aeFiredEvent;
/*事件循环结构体*/
/* State of an event based program */
/*src/ae.h/aeEventLoop*/
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked,如select为1024 */
// 记录最大的定时事件id + 1
long long timeEventNextId;
// 用于系统时间的矫正
time_t lastTime; /* Used to detect system clock skew */
// 文件事件表
aeFileEvent *events; /* Registered events */
// 被触发的事件
aeFiredEvent *fired; /* Fired events */
// 时间事件表
aeTimeEvent *timeEventHead;
// 事件循环结束标识
int stop;
// 对于不同的I/O 多路复用技术,有不同的数据,详见各自实现
//根据参数不同调用不同的I/O多路复用库函数,有相同的API
void *apidata; /* This is used for polling API specific data */
// 新的循环前需要执行的操作
aeBeforeSleepProc *beforesleep;
} aeEventLoop;
事件循环结构体即维护文件事件表,被触发的时间表,时间时间表。
II、初始化事件循环 aeEventLoop
Redis的主函数中调用initServer()
函数从而初始化事件循环aeEventLoop,其主要是在aeCreateEventLoop()
中完成的:
/*事件循环初始化函数*/
/*src/ae.c/aeCreateEventLoop*/
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
// 分配空间
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
// 分配文件事件结构体空间
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
// 分配已触发事件结构体空间
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->lastTime = time(NULL);
// 时间事件链表头
eventLoop->timeEventHead = NULL;
// 其他初始化,后续会说明
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
// 进入事件循环前需要执行的操作,此项会在redis main() 函数中设置
eventLoop->beforesleep = NULL;
/* 根据需要调用的I/O多路复用函数初始化。
* 在这里,aeApiCreate() 函数对于每个IO 多路复用模型的实现都有不同,具体参见源代码,因为每种IO 多路复用模型的初始化都不同
*/
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
// 初始化事件类型掩码为无事件状态
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
//对应于goto
err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}
aeCreateEventLoop()
函数只是完成了最初步的初始化任务,并没有进一步注册事件。驱动事件循环,还需要完成以下工作。
III、文件事件注册
文件事件注册在aeCreateFileEvent()
函数中完成,aeCreateFileEvent()
根据socket的数值大小在aeEventLoop
结构体中读取这个空间,并利用所提供的I/O多路复用函数监听文件事件,并设置回调函数:
/*文件事件注册函数*/
/* 参数包括,事件循环结构体,socket描述符,读写事件mask,回调函数指针,以及clientData指针
*/
/* src/ae.c/aeCreateFileEvent */
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
// 在I/O 事件表中选择一个空间
aeFileEvent *fe = &eventLoop->events[fd];
// aeApiAddEvent() 只在此函数中调用,对于不同IO 多路复用实现,会有所不同
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
// 设置回调函数
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
//更新maxfd
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
这里说明一下,根据函数参数的不同有多个I/O多路复用API,但封装了统一的接口。
IV、监听实现
1、initServer()
中完成了对事件循环的初始化操作,其还为监听工作做了准备:
/* initServer()中监听工作准备*/
/* Open the TCP listening socket for the user commands. */
// listenToPort() 中有调用listen()
/*src/redis.c/initServer*/
//TCP方式在listenToPort中调用listen及bind完成绑定和监听套接字创建
if (server.port != 0 &&
listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR)
exit(1);
// UNIX 域套接字
/* Open the listening Unix domain socket. */
if (server.unixsocket != NULL) {
unlink(server.unixsocket); /* don't care if this fails */
server.sofd = anetUnixServer(server.neterr,server.unixsocket,
server.unixsocketperm);
if (server.sofd == ANET_ERR) {
redisLog(REDIS_WARNING, "Opening socket: %s", server.neterr);
exit(1);
}
}
2、完成了监听套接字的初始化,initServer()
还需要为所有监听套接字注册读事件,相应函数分别为acceptTcpHandler()
和acceptUnixHandlet()
:
// 为监听socket注册读事件
// TCP
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
redisPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
// UNIX 域套接字
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
acceptUnixHandler,NULL) == AE_ERR)
redisPanic("Unrecoverable error creating server.sofd file event.");
对于acceptTcpHandler()
来说:
/*监听socket的TCP回调函数*/
/* src/networking.c/acceptTcpHandler */
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd;
char cip[REDIS_IP_STR_LEN];
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);
// 接收客户端请求,这里封装了accept函数,得到已连接的套接字cfd
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
// 出错
if (cfd == AE_ERR) {
redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr);
return;
}
// 记录
redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
// 下面说明,很巧妙
acceptCommonHandler(cfd,0);
}
在与客户端成功建立连接之后,调用了acceptCommonHandler()
函数,其作用为:
1、 建立并保存服务器与客户端的连接信息,将信息保存到一个struct redisClient 中;
2、为客户端的cfd(已连接的socket)注册读事件,相应的回调函数为readQueryFromClient()
,其作用是从socket读取数据,执行相应操作,并回复给客户端(而acceptCommonHandler是为监听socket的读事件回调函数)。
V、事件循环
完成了上述工作,之后进入事件循环,在服务器main()
函数中对调用aeMain()
:
/*src/ae.c/aeMain*/
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
// 进入事件循环可能会进入睡眠状态。在睡眠之前,执行预设置的函数
// aeSetBeforeSleepProc()。
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
// AE_ALL_EVENTS 表示处理所有的事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
其事件循环发生在aeProcessEvents()
中:
1、根据时间事件链表计算需要等待的最短事件;
2、调用redis aeApiPoll()
进入监听轮询,如果没有事件发生就进入睡眠状态,其实就是进行I/O多路复用函数调用。
3、如果有事件发生,处理事件。
VI、事件触发
接下来以select
为例说明I/O多路复用api aeApiPoll()
函数的实现,其主要完成:
1、拷贝读写的fdset,select调用会破坏传入的fdset,所以需要备份。每次调用select()
之前都从备份中直接拷贝一份;
2、调用select()
;
3、被唤醒后,检查fdset中的每一个文件描述符,并将可读可写的描述符添加到触发表中。
/*I/O多路复用继承API*/
/*src/ae_epoll.c & ae_select.c & ae_kqueue.c & ae.evport.c*/
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, j, numevents = 0;
/*
真有意思,在aeApiState 结构中:
typedef struct aeApiState {
fd_set rfds, wfds;
fd_set _rfds, _wfds;
} aeApiState;
在调用select() 的时候传入的是_rfds 和_wfds,所有监听的数据
在rfds 和wfds 中。
在下次需要调用selec() 的时候,会将rfds 和wfds 中的数据拷贝
进_rfds 和_wfds 中。*/
memcpy(&state->_rfds,&state->rfds,sizeof(fd_set));
memcpy(&state->_wfds,&state->wfds,sizeof(fd_set));
retval = select(eventLoop->maxfd+1,
&state->_rfds,&state->_wfds,NULL,tvp);
if (retval > 0) {
// 轮询
for (j = 0; j <= eventLoop->maxfd; j++) {
int mask = 0;
aeFileEvent *fe = &eventLoop->events[j];
if (fe->mask == AE_NONE) continue;
if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds))
mask |= AE_READABLE;
if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds))
mask |= AE_WRITABLE;
// 添加到触发事件表中
eventLoop->fired[numevents].fd = j;
eventLoop->fired[numevents].mask = mask;
numevents++;
}
}
return numevents;
}
VII、总结
总结来说,事件驱动完成了如下图操作:
【参考】
[1] 《Redis设计与实现》
[2] 《Redis源码日志》
网友评论