Redis深度历险-IO模型
Redis是单进程单线程实现的服务器,网络并发还是值得学习一下的,本文在Redis6.2.5的代码上进行分析,这部分的代码主要放在
ae
开头的源码文件当中
主线程
跨平台宏定义
//ae.c
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
在ae.c
中根据宏定义来区分使用不同的多路复用机制,这里面主要是因为不同平台的多路复用机制是不一样的
具体多路复用的实现则是在ae_epoll.c
等文件中
事件主循环
//ae.c
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}
AE_ALL_EVENTS //所有类型事件,主要是IO和定时器
AE_FILE_EVENTS //IO事件
AE_TIME_EVENTS //定时器
redis
核心主要是处理IO和定时任务两种的事件,核心就是一个循环不断的调用epoll_wait
、select
等函数等待事件
定时任务
定时任务结构体
//ae.h
typedef struct aeTimeEvent {
long long id; //定时任务的ID
monotime when; //定时任务的时间
aeTimeProc *timeProc; //定时任务执行的回调函数
aeEventFinalizerProc *finalizerProc; //定时任务终结时的回调函数,被删除时执行
void *clientData; //回调参数
struct aeTimeEvent *prev; //双向链表前节点
struct aeTimeEvent *next; //双向链表后节点
int refcount; //引用计数
} aeTimeEvent;
在Redis中的定时任务是通过链表的形式存储的,可以指定一个回调函数和回调函数的参数
等待时间
//ae.c
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
//即使当前没有IO任务要执行,同样进入此判断因为可以通过多路复用实现sleep的效果来处理定时任务
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
struct timeval tv, *tvp;
int64_t usUntilTimer = -1;
//计算当前最快的一个定时任务的时间
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
usUntilTimer = usUntilEarliestTimer(eventLoop);
if (usUntilTimer >= 0) {
tv.tv_sec = usUntilTimer / 1000000;
tv.tv_usec = usUntilTimer % 1000000;
tvp = &tv;
} else {
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
tvp = NULL;
}
}
if (eventLoop->flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
}
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);
//将最近的一个定时任务的时间设置为多路复用的等待时间,如果没有触发IO事件那么此函数结束时就是定时任务到期时
numevents = aeApiPoll(eventLoop, tvp);
.......
//处理定时任务
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
.......
无论是selct
、epoll
都是支持设置等待时间的
Redis
的定时任务的实现就是每次将等待时间设置为最近定时任务的时间,然后在休眠结束后处理定时任务
static int64_t usUntilEarliestTimer(aeEventLoop *eventLoop) {
aeTimeEvent *te = eventLoop->timeEventHead;
if (te == NULL) return -1;
//循环遍历所有的定时任务,找到最近的一个定时任务
aeTimeEvent *earliest = NULL;
while (te) {
if (!earliest || te->when < earliest->when)
earliest = te;
te = te->next;
}
monotime now = getMonotonicUs();
return (now >= earliest->when) ? 0 : earliest->when - now;
}
由于定时任务是链表的结构,这里实际的时间复杂度是O(n)
,此函数就是统计出最快的一个定时任务
定时任务的执行
/* Process time events */
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
monotime now = getMonotonicUs();
while(te) {
long long id;
//处理标记为删除的定时任务
if (te->id == AE_DELETED_EVENT_ID) {
aeTimeEvent *next = te->next;
if (te->refcount) {
te = next;
continue;
}
if (te->prev)
te->prev->next = te->next;
else
eventLoop->timeEventHead = te->next;
if (te->next)
te->next->prev = te->prev;
if (te->finalizerProc) {
//执行定时任务终结的回调函数
te->finalizerProc(eventLoop, te->clientData);
now = getMonotonicUs();
}
//释放资源,从链表中删除掉这个定时任务
zfree(te);
te = next;
continue;
}
if (te->id > maxId) {
te = te->next;
continue;
}
//执行时间已经到了的定时任务
if (te->when <= now) {
int retval;
id = te->id;
te->refcount++;
retval = te->timeProc(eventLoop, id, te->clientData);
te->refcount--;
processed++;
now = getMonotonicUs();
//根据回调函数的返回值来决定是否在再次注册定时任务,不是-1就会再次注册定时任务
if (retval != AE_NOMORE) {
te->when = now + retval * 1000;
} else {
//对于需要删除的任务不是直接删除,而是打标记
te->id = AE_DELETED_EVENT_ID;
}
}
te = te->next;
}
return processed;
}
由于多路复用的机制在有IO进来时就会提前返回,所以在这里还是需要轮询遍历所有的定时任务然后根据时间对比来处理
定时任务总结
定时任务由于受限于多路复用的机制,最多设置为ms
级别,同时从上面来看定时任务使用链表存储性能较差
IO事件处理
事件结构体
//ae.h
typedef struct aeFileEvent {
int mask; //事件掩码,读、写两种事件类型
aeFileProc *rfileProc; //读取回调函数
aeFileProc *wfileProc; //写入回调函数
void *clientData; //回调函数参数
} aeFileEvent;
事件定义
#define AE_NONE 0 //无
#define AE_READABLE 1 //可读
#define AE_WRITABLE 2 //可写
#define AE_BARRIER 4 //用来控制读写顺序
对于一个fd
同时设置了读写事件,默认是先读后写,如果设置了AE_BARRIER
则是先写后读
事件循环结构体
//ae.h
typedef struct aeEventLoop {
int maxfd; /* 当前注册的最大文件描述符 */
int setsize; /* 允许监控、注册的最大文件描述符 */
long long timeEventNextId;
aeFileEvent *events; /* 当前注册进来的的IO事件数组,事件的fd就是数组下标 */
aeFiredEvent *fired; /* 多路复用接口返回的触发事件数组 */
aeTimeEvent *timeEventHead; /* 注册的定时任务链表 */
int stop; /* 是否停止循环*/
void *apidata;
aeBeforeSleepProc *beforesleep; /* 多路复用等待前的回调程序 */
aeBeforeSleepProc *aftersleep; /* 多路复用调用结束后的回调函数 */
int flags;
} aeEventLoop;
事件处理
//ae.c
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
.........
//多路复用机制,最终会调用select、epoll等,触发的IO事件则放在fired中
numevents = aeApiPoll(eventLoop, tvp);
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
for (j = 0; j < numevents; j++) {
//events的大小就是setsize的大小,直接以文件描述符作为数组的下标来存储事件结构题
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0;
//判断当前的处理顺序
int invert = fe->mask & AE_BARRIER;
//执行读写的回调函数
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
if (invert && fe->mask & mask & AE_READABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
}
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* 返回处理的事件个数 file+time events */
}
这里用了一个比较巧妙的设计就是,在eventLoop->events
存储注册的事件时直接就是以fd
作为索引,实现了类似hash
的方式
多路复用实现
这里只介绍一下
epoll
的实现,注意这里是直接include
源文件的方式加载到ae.c
中,所以虽然函数都是static
但是都是可以用的
结构体定义
typedef struct aeApiState {
int epfd; //epoll专用的文件描述符
struct epoll_event *events; //用来接收epoll_wait触发的事件
} aeApiState;
等待事件
//ae_epoll.c
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
//等待IO事件进入
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
if (retval > 0) {
int j;
//遍历所有触发的事件,放在fired中返回
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
和正常的处理相同, 调用epoll_wait
等待事件触发, 将触发的事件存储到fired
中返回
注册事件
//ae.c
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
//不允许超过最大允许注册的文件描述符
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
//以fd作为下标索引将事件放到events中去
aeFileEvent *fe = &eventLoop->events[fd];
//注册到epoll中去
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
//更新当前注册的最大文件描述符
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
网友评论