事件
Redis服务器是一个事件驱动程序,服务器主要处理时间事件与文件事件。其中,时间事件是指服务器中的一些操作比如(serverCron函数)在给定的时间点执行函数,文件事件就是指服务器通过套接字跟客户端通信,redis中client结构就是套接字的一个封装。
一、文件事件
Redis服务器采用了IO多路复用技术,实现高性能的监听。每来一个客户端连接或生成一个aeFileEvent,AE_READABLE表示此事件可读,即客户端发送命令到达服务器。AE_WRITABLE表示此事件可写,即服务器可以发送结果到客户端了。
二、时间事件
Redis目前的时间事件就是周期性处理,服务器将所有时间事件放到一个链表中,每次遍历链表,如果达到指定的时间点,那么就去处理事件。
三、结构与API
-
结构
#define AE_NONE 0 /*0 未注册事件 */ #define AE_READABLE 1 /*01可读. */ #define AE_WRITABLE 2 /*10 可写. */ #define AE_BARRIER 4 /*110 对于可写,不要触发事件如果读已经触发了,就是读的时候不要改动*/ #define AE_FILE_EVENTS 1 //文件事件 01 #define AE_TIME_EVENTS 2 //时间事件 10 #define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS) //11 #define AE_DONT_WAIT 4 //100 #define AE_CALL_AFTER_SLEEP 8 //1000 /* 文件事件结构 */ typedef struct aeFileEvent { int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */ aeFileProc *rfileProc;//读函数 aeFileProc *wfileProc;//写函数 void *clientData;//客户端数据 } aeFileEvent; /* 时间事件结构 */ typedef struct aeTimeEvent { long long id; /* 时间事件标识. */ long when_sec; /*秒 上一次被访问的时间*/ long when_ms; /*微秒*/ aeTimeProc *timeProc;//处理函数 aeEventFinalizerProc *finalizerProc;//结束处理函数 void *clientData;//客户端传入的数据 struct aeTimeEvent *prev;//前一个 struct aeTimeEvent *next;//后一个 所以是个链表 } aeTimeEvent; typedef struct aeFiredEvent { int fd;//描述符 int mask;// } aeFiredEvent;//就绪事件 typedef struct aeEventLoop { int maxfd; /*目前注册的最大的描述符*/ int setsize; /*文件描述符的数组大小*/ long long timeEventNextId; time_t lastTime; /* Used to detect system clock skew */ aeFileEvent *events; /*注册的事件组*/ aeFiredEvent *fired; /*触发事件组*/ aeTimeEvent *timeEventHead;//时间事件头结点 int stop;//事件的开关 void *apidata; /* This is used for polling API specific data */ aeBeforeSleepProc *beforesleep;//休眠之前的处理函数 aeBeforeSleepProc *aftersleep;//休眠之后的处理函数 } aeEventLoop;//事件循环的结构体
-
API
aeEventLoop *aeCreateEventLoop(int setsize);//创建并初始化事件循环结构体
void aeDeleteEventLoop(aeEventLoop *eventLoop);//析构 事件循环结构体
void aeStop(aeEventLoop *eventLoop);//停止事件循环
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData);//创建文件事件
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);//删除文件事件
int aeGetFileEvents(aeEventLoop *eventLoop, int fd);//获得文件事件
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc);//创建时间事件
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);//删除时间事件
int aeProcessEvents(aeEventLoop *eventLoop, int flags);//处理所有事件
int aeWait(int fd, int mask, long long milliseconds);//等待
void aeMain(aeEventLoop *eventLoop);//循环等待处理aeProcessEvents函数
char *aeGetApiName(void);//返回 "select"
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);//设置休眠之前的处理函数
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep);//设置休眠之后的处理函数
int aeGetSetSize(aeEventLoop *eventLoop);//get setsize
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);//reallcoc fileevent set
-
重要API解析
- aeMain是redis服务器进程的循环函数
void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) {//死循环 if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); } }
- aeProcessEvents
/*处理每个正在进行的时间事件,文件事件。没有特殊标志的函数休眠直到一些文件事件被触发,或者下一个时间事件发生
*flags=0 啥也不做
*flags=AE_ALL_EVENTS 处理所有事件
*flags=AE_FILE_EVENTS 处理所有文件事件
*flags=AE_TIME_EVENTS 处理所有时间事件
*flags=AE_DONT_WAIT 返回ASAP 直到所有不需要被等待的可能被处理的事件都被处理
*flags=AE_CALL_AFTER_SLEEP 调用休眠后函数*/
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
/*如果不是时间事件也不是文件事件返回ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
//如果maxfd!=-1表示有事件 或者 需要处理时间事件且不处理不等待事件
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
//如果flags是指需要处理时间事件且不处理不等待事件,寻找时间参数最小的时间事件
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {//只要存在时间事件 就会走到这步
long now_sec, now_ms;
aeGetTime(&now_sec, &now_ms);//得到现在的时间
tvp = &tv;
/* 下次事件触发需要多少微秒 */
long long ms=(shortest->when_sec - now_sec)*1000 +
shortest->when_ms - now_ms;
if (ms > 0) {
tvp->tv_sec = ms/1000;
tvp->tv_usec = (ms % 1000)*1000;
} else {
tvp->tv_sec = 0;
tvp->tv_usec = 0;
}
} else {
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
}
numevents = aeApiPoll(eventLoop, tvp);//多路复用API 可读 可写结果都出来了
/* 调用休眠之后的回调函数*/
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0; /* 当前fd触发的文件事件的个数 */
/*
* 设置AE_BARRIER 读-写
* 未设置AE_BARRIER 写-读
* */
int invert = fe->mask & AE_BARRIER;
//如果没有设置AE_BARRIER 且可读 处理读事件
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
//处理写
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
//设置了AE_BARRIER 最后处理读
if (invert && fe->mask & mask & AE_READABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
}
/* 如果需要处理时间事件*/
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);//处理函数,返回被处理的时间事件的个数
return processed; /* return the number of processed file/time events */
}
3. processTimeEvents
/* 处理时间事件*/
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
time_t now = time(NULL);
if (now < eventLoop->lastTime) {//防止系统时间被往前调了,重新设置时间数据
te = eventLoop->timeEventHead;
while(te) {
te->when_sec = 0;
te = te->next;
}
}
eventLoop->lastTime = now;//最近一次被处理时间
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
while(te) { //一个个的处理
long now_sec, now_ms;
long long id;
/* id==-1的话 是要被删除的 */
if (te->id == AE_DELETED_EVENT_ID) {
aeTimeEvent *next = te->next;
if (te->prev)
te->prev->next = te->next;
else
eventLoop->timeEventHead = te->next;
if (te->next)
te->next->prev = te->prev;
if (te->finalizerProc)
te->finalizerProc(eventLoop, te->clientData);
zfree(te);
te = next;
continue;
}
//再这次循环中我们不处理刚触发的时间事件
if (te->id > maxId) {
te = te->next;
continue;
}
aeGetTime(&now_sec, &now_ms);
//现在时间大于预设的时间
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
int retval;
id = te->id;
retval = te->timeProc(eventLoop, id, te->clientData);//时间处理函数
processed++;
if (retval != AE_NOMORE) {//周期事件
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
} else {//定时事件 处理完就删除
te->id = AE_DELETED_EVENT_ID;
}
}
te = te->next;
}
return processed;
}
网友评论