Redis 5.0.14 源码地址:https://github.com/redis/redis/tree/5.0
1. Epoll 与 Reactor 简述
1.1 Epoll
对于 epoll 机制来说,我们则需要先调用 epoll_create 函数,创建一个 epoll 实例。这个 epoll 实例内部维护了两个结构,分别是记录要监听的文件描述符和已经就绪的文件描述符,而对于已经就绪的文件描述符来说,它们会被返回给用户程序进行处理。
所以,我们在使用 epoll 机制时,就不用像使用 select 和 poll 一样,遍历查询哪些文件描述符已经就绪了。这样一来, epoll 的效率就比 select 和 poll 有了更高的提升。
在创建了 epoll 实例后,我们需要再使用 epoll_ctl 函数,给被监听的文件描述符添加监听事件类型,以及使用 epoll_wait 函数获取就绪的文件描述符。
int sock_fd,conn_fd; //监听套接字和已连接套接字的变量
sock_fd = socket() //创建套接字
bind(sock_fd) //绑定套接字
listen(sock_fd) //在套接字上进行监听,将套接字转为监听套接字
epfd = epoll_create(EPOLL_SIZE); //创建epoll实例,
//创建epoll_event结构体数组,保存套接字对应文件描述符和监听事件类型
ep_events = (epoll_event*)malloc(sizeof(epoll_event) * EPOLL_SIZE);
//创建epoll_event变量
struct epoll_event ee
//监听读事件
ee.events = EPOLLIN;
//监听的文件描述符是刚创建的监听套接字
ee.data.fd = sock_fd;
//将监听套接字加入到监听列表中
epoll_ctl(epfd, EPOLL_CTL_ADD, sock_fd, &ee);
while (1) {
//等待返回已经就绪的描述符
n = epoll_wait(epfd, ep_events, EPOLL_SIZE, -1);
//遍历所有就绪的描述符
for (int i = 0; i < n; i++) {
//如果是监听套接字描述符就绪,表明有一个新客户端连接到来
if (ep_events[i].data.fd == sock_fd) {
conn_fd = accept(sock_fd); //调用accept()建立连接
ee.events = EPOLLIN;
ee.data.fd = conn_fd;
//添加对新创建的已连接套接字描述符的监听,监听后续在已连接套接字上的读事件
epoll_ctl(epfd, EPOLL_CTL_ADD, conn_fd, &ee);
} else { //如果是已连接套接字描述符就绪,则可以读数据
...//读取数据并处理
}
}
}
也正是因为 epoll 能自定义监听的描述符数量,以及可以直接返回就绪的描述符,Redis 在设计和实现网络通信框架时,就基于 epoll 机制中的 epoll_create、epoll_ctl 和 epoll_wait 等函数和读写事件,进行了封装开发,实现了用于网络通信的事件驱动框架,从而使得 Redis 虽然是单线程运行,但是仍然能高效应对高并发的客户端访问。
1.2 Reactor 单线程模型
Reactor 模型就是网络服务器端用来处理高并发网络 IO 请求的一种编程模型。我把这个模型的特征用两个“三”来总结,也就是:
- 三类处理事件,即连接事件、写事件、读事件;
- 三个关键角色,即 reactor、acceptor、handler。
Reactor 模型处理的是客户端和服务器端的交互过程,而这三类事件正好对应了客户端和服务器端交互过程中,不同类请求在服务器端引发的待处理事件:
- 当一个客户端要和服务器端进行交互时,客户端会向服务器端发送连接请求,以建立连接,这就对应了服务器端的一个连接事件。
- 一旦连接建立后,客户端会给服务器端发送读请求,以便读取数据。服务器端在处理读请求时,需要向客户端写回数据,这对应了服务器端的写事件。
- 无论客户端给服务器端发送读或写请求,服务器端都需要从客户端读取请求内容,所以在这里,读或写请求的读取就对应了服务器端的读事件。
在了解了 Reactor 模型的三类事件后,你现在可能还有一个疑问:这三类事件是由谁来处理的呢?这其实就是模型中三个关键角色的作用了:
- 首先,连接事件由 acceptor 来处理,负责接收连接;acceptor 在接收连接后,会创建 handler,用于网络连接上对后续读写事件的处理;
- 其次,读写事件由 handler 处理;
- 最后,在高并发场景中,连接事件、读写事件会同时发生,所以,我们需要有一个角色专门监听和分配事件,这就是 reactor 角色。当有连接请求时,reactor 将产生的连接事件交由 acceptor 处理;当有读写请求时,reactor 将读写事件交由 handler 处理。
34.png
上面介绍的是单线程Reactor模式,还有多线程、主从模型模式,详情可参考:https://blog.csdn.net/qq_36414013/article/details/100620871
2. 数据结构
事件驱动框架三个重要的结构体:
- aeEventLoop,对应事件驱动框架循环流程,记录了时间事件和 I/O事件,以及进入循环流程前后执行的函数等信息。
- aeFileEvent ,I/O事件,包括连接事件、读事件、写事件。
-
aeTimeEvent,时间事件,即按一定时间周期触发的事件。
42.png
这三个结构体都定义在src/ae.h中
/*IO事件*/
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
/*时间事件*/
/* Time event structure */
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *prev;
struct aeTimeEvent *next;
} aeTimeEvent;
...
/* State of an event based program */
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
time_t lastTime; /* Used to detect system clock skew */
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
} aeEventLoop;
3. aeEventLoop的初始化
Redis 运行的基本控制逻辑是在 src/server.c 文件中完成的,Redis启动首先会执行 server.c 的 main 函数,在 main 函数中会调用 initServer 函数,initServer 调用 src/ae.c 的 aeCreateEventLoop 函数,来进行初始化了。
server.c 相关代码
int main(int argc, char **argv) {
...
initServer();
...
}
initServer() {
...
//调用aeCreateEventLoop函数创建aeEventLoop结构体,并赋值给server结构的el变量
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
...
}
aeCreateEventLoop 函数的参数只有一个,是 setsize。参数 setsize 的大小,其实是由 server 结构的 maxclients 变量和宏定义 CONFIG_FDSET_INCR 共同决定的。其中,maxclients 变量的值大小,可以在 Redis 的配置文件 redis.conf 中进行定义,默认值是 1000。而宏定义 CONFIG_FDSET_INCR 的大小,等于宏定义 CONFIG_MIN_RESERVED_FDS 的值再加上 96,如下所示,这里的两个宏定义都是在server.h文件中定义的。
aeCreateEventLoop 函数执行的操作,大致可以分成以下三个步骤。
-
第一步,aeCreateEventLoop 函数会创建一个 aeEventLoop 结构体类型的变量 eventLoop。然后,该函数会给 eventLoop 的成员变量分配内存空间,比如,按照传入的参数 setsize,给 IO 事件数组和已触发事件数组分配相应的内存空间。此外,该函数还会给 eventLoop 的成员变量赋初始值。
-
第二步,aeCreateEventLoop 函数会调用 aeApiCreate 函数,aeApiCreate 函数封装了操作系统提供的 IO 多路复用函数。在这里,具体执行什么多路复用函数,是根据当前运行的操作系统来决定的,具体定义在 ae.c 文件头部:
- ae_epoll.c,对应 Linux 上的 IO 复用函数 epoll;
- ae_evport.c,对应 Solaris 上的 IO 复用函数 evport;
- ae_kqueue.c,对应 macOS 或 FreeBSD 上的 IO 复用函数 kqueue;
- ae_select.c,对应 Linux(或 Windows)的 IO 复用函数 select。
/* Include the best multiplexing layer supported by this system.
* The following should be ordered by performances, descending. */
//根据不同的平台,来编译不同的文件,从而实现不同的多路复用
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
假设 Redis 运行在 Linux 操作系统上,并且 IO 多路复用机制是 epoll,那么此时,aeApiCreate 函数就会调用 ae_epoll.c 的 aeApiCreate 函数来创建 epoll 实例,同时会创建 epoll_event 结构的数组,数组大小等于参数 setsize。这里你需要注意,aeApiCreate 函数是把创建的 epoll 实例描述符和 epoll_event 数组,保存在了 aeApiState 结构体类型的变量 state,如下所示:
typedef struct aeApiState { //aeApiState结构体定义
int epfd; //epoll实例的描述符
struct epoll_event *events; //epoll_event结构体数组,记录监听事件
} aeApiState;
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
...
//将epoll_event数组保存在aeApiState结构体变量state中
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
...
//将epoll实例描述符保存在aeApiState结构体变量state中
state->epfd = epoll_create(1024);
- 第三步,aeCreateEventLoop 函数会把所有网络 IO 事件对应文件描述符的掩码,初始化为 AE_NONE,表示暂时不对任何事件进行监听。
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
//给eventLoop变量分配内存空间
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
//给IO事件、已触发事件分配内存空间
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
…
eventLoop->setsize = setsize;
eventLoop->lastTime = time(NULL);
//设置时间事件的链表头为NULL
eventLoop->timeEventHead = NULL;
…
//调用aeApiCreate函数,去实际调用操作系统提供的IO多路复用函数
if (aeApiCreate(eventLoop) == -1) goto err;
//将所有网络IO事件对应文件描述符的掩码设置为AE_NONE
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
//初始化失败后的处理逻辑,
err:
…
}
那么从 aeCreateEventLoop 函数的执行流程中,我们其实可以看到以下两个关键点:
- 事件驱动框架监听的 IO 事件数组大小就等于参数 setsize,这样决定了和 Redis server 连接的客户端数量。所以,当你遇到客户端连接 Redis 时报错“max number of clients reached”,你就可以去 redis.conf 文件修改 maxclients 配置项,以扩充框架能监听的客户端数量。
- 当使用 Linux 系统的 epoll 机制时,框架循环流程初始化操作,会通过 aeApiCreate 函数创建 epoll_event 结构数组,并调用 epoll_create 函数创建 epoll 实例,这都是使用 epoll 机制的准备工作要求。
4. 事件注册
4.1 I/O 事件注册
server.c 的 initServer 函数初始化 aeEventLoop 之后,会根据启用的 IP 端口个数,为每个 IP 端口上的网络事件,调用 aeCreateFileEvent函数,创建对 AE_READABLE 事件的监听,并且注册 AE_READABLE 事件的处理 handler,也就是 acceptTcpHandler 函数。
void initServer(void) {
…
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic("Unrecoverable error creating server.ipfd file event.");
}
}
…
}
我们来看 aeCreateFileEvent 函数的原型定义,如下所示:
//这个函数的参数有 5 个,分别是循环流程结构体 *eventLoop、IO 事件对应的文件描述符 fd、事件类型掩码 mask、事件处理回调函数*proc,以及事件私有数据*clientData。
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData)
因为循环流程结构体*eventLoop中有 IO 事件数组,这个数组的元素是 aeFileEvent 类型,所以,每个数组元素都对应记录了一个文件描述符(比如一个套接字)相关联的监听事件类型和回调函数。
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
//获取该描述符关联的 IO 事件指针变量*fe
aeFileEvent *fe = &eventLoop->events[fd];
//调用 aeApiAddEvent 函数,添加要监听的事件
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
aeApiAddEvent 函数实际上会调用操作系统提供的 IO 多路复用函数,来完成事件的添加。我们还是假设 Redis 实例运行在使用 epoll 机制的 Linux 上,那么就会调用 ae_epoll.c的aeApiAddEvent 函数,进而调用 epoll_ctl 函数,添加要监听的事件。 epoll_ctl 函数,这个函数会接收 4 个参数,分别是:
- epoll 实例;
- 要执行的操作类型(是添加还是修改);
- 要监听的文件描述符;
- epoll_event 类型变量。
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation. */
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
//调用 epoll_ctl,来注册希望监听的事件和相应的处理函数。
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
3.2 时间事件注册
与 IO 事件创建使用 aeCreateFileEvent 函数类似,时间事件的创建函数是 aeCreateTimeEvent 函数。这个函数的原型定义如下所示:
//milliseconds,这是所创建时间事件的触发时间距离当前时间的时长,是用毫秒表示的。另一个是 *proc,这是所创建时间事件触发后的回调函数。
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc)
aeCreateTimeEvent 函数的执行逻辑不复杂,主要就是创建一个时间事件的变量 te,对它进行初始化,并把它插入到框架循环流程结构体 eventLoop 中的时间事件链表中。在这个过程中,aeCreateTimeEvent 函数会调用 aeAddMillisecondsToNow 函数,根据传入的 milliseconds 参数,计算所创建时间事件具体的触发时间戳,并赋值给 te。
实际上,Redis server 在初始化时,除了创建监听的 IO 事件外,也会调用 aeCreateTimeEvent 函数创建时间事件。下面代码显示了 initServer 函数对 aeCreateTimeEvent 函数的调用:
initServer() {
…
//创建时间事件
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR){
… //报错信息
}
}
时间事件回调函数
从代码中,我们可以看到,时间事件触发后的回调函数是 serverCron。所以接下来,我们就来了解下 serverCron 函数。
serverCron 函数是在 server.c 文件中实现的。一方面,它会顺序调用一些函数,来实现时间事件被触发后,执行一些后台任务。比如,serverCron 函数会检查是否有进程结束信号,若有就执行 server 关闭操作。serverCron 会调用 databaseCron 函数,处理过期 key 或进行 rehash 等。你可以参考下面给出的代码:
...
//如果收到进程结束信号,则执行server关闭操作
if (server.shutdown_asap) {
if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0);
...
}
...
clientCron(); //执行客户端的异步操作
databaseCron(); //执行数据库的后台操作
...
另一方面,serverCron 函数还会以不同的频率周期性执行一些任务,这是通过执行宏 run_with_period 来实现的。比如,serverCron 函数中会以 1 秒 1 次的频率,检查 AOF 文件是否有写错误。如果有的话,serverCron 就会调用 flushAppendOnlyFile 函数,再次刷回 AOF 文件的缓存数据。下面的代码展示了这一周期性任务:
serverCron() {
…
//每1秒执行1次,检查AOF是否有写错误
run_with_period(1000) {
if (server.aof_last_write_status == C_ERR)
flushAppendOnlyFile(0);
}
…
}
5. 事件捕获与分发
在 server.c 的 main 函数最后,会调用 aeMain 函数,aeMain 函数的逻辑很简单,就是用一个循环不停地判断事件循环的停止标记。如果事件循环的停止标记被设置为 true,那么针对事件捕获、分发和处理的整个主循环就停止了;否则,主循环会一直执行。aeMain 函数的主体代码如下所示:
int main(int argc, char **argv) {
...
aeMain(server.el);
...
return 0;
}
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
aeProcessEvents 函数实现的主要功能,包括捕获事件、判断事件类型和调用具体的事件处理函数,从而实现事件的处理。从 aeProcessEvents 函数的主体结构中,我们可以看到主要有三个 if 条件分支,如下所示:
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
/* 1. 若没有事件处理,则立刻返回*/
/* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
/* 2. 如果有IO事件发生,或者紧急的时间事件发生,则开始处理*/
/* Note that we want call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
...
//调用aeApiPoll(epoll_wait)函数捕获事件
/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
numevents = aeApiPoll(eventLoop, tvp);
...
for (j = 0; j < numevents; j++) {
...
//如果触发的是可读事件,调用事件注册时设置的读事件回调处理函数
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
//如果触发的是可写事件,调用事件注册时设置的写事件回调处理函数
/* Fire the writable event. */
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
...
processed++;
}
}
/* 3. 检查是否有时间事件,若有,则调用processTimeEvents函数处理 */
/* Check time events */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
/* 返回已经处理的文件或时间的数目*/
return processed; /* return the number of processed file/time events */
}
在其中第2个分支中,aeApiPoll 函数会被调用,用来捕获 I/O 事件,ae_epoll.c 的 aeApiPoll 直接调用了 epoll_wait 函数,并将 epoll 返回的事件信息保存起来的逻辑:
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
…
//调用epoll_wait获取监听到的事件
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;
//获得监听到的事件数量
numevents = retval;
//针对每一个事件,进行处理
for (j = 0; j < numevents; j++) {
#保存事件信息
}
}
return numevents;
}
6. 事件处理
6.1 读事件处理
当 Redis server 接收到客户端的连接请求时,就会使用注册好的 acceptTcpHandler 函数进行处理。initServer()函数中,就已经注册了读事件和acceptTcpHandler 处理函数。
//为每个 IP 端口上的网络事件,调用 aeCreateFileEvent,创建对 AE_READABLE 事件的监听,并且注册 AE_READABLE 事件的处理 handler,也就是 acceptTcpHandler 函数。
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
acceptTcpHandler 函数是在networking.c文件中,它会接受客户端连接,并创建已连接套接字 cfd。然后,acceptCommonHandler 函数(在 networking.c 文件中)会被调用,同时,刚刚创建的已连接套接字 cfd 会作为参数,传递给 acceptCommonHandler 函数。
acceptCommonHandler 函数会调用 createClient 函数(在 networking.c 文件中)创建客户端。而在 createClient 函数中,我们就会看到,aeCreateFileEvent 函数被再次调用了。
此时,aeCreateFileEvent 函数会针对已连接套接字上,创建监听事件,类型为 AE_READABLE,回调函数是 readQueryFromClient(在 networking.c 文件中)。
好了,到这里,事件驱动框架就增加了对一个客户端已连接套接字的监听。一旦客户端有请求发送到 server,框架就会回调 readQueryFromClient 函数处理请求。这样一来,客户端请求就能通过事件驱动框架进行处理了。
client *createClient(int fd) {
…
if (fd != -1) {
…
//调用aeCreateFileEvent,监听读事件,对应客户端读写请求,使用readQueryFromclient回调函数处理
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
} }
…
}
下图展示了从监听客户端连接请求,到监听客户端常规读写请求的事件创建过程
39.png
6.2 时间事件处理
其实,时间事件的检测触发比较简单,事件驱动框架的 aeMain 函数会循环调用 aeProcessEvents 函数,来处理各种事件。而 aeProcessEvents 函数在执行流程的最后,会调用 processTimeEvents 函数处理相应到时的任务。
aeProcessEvents(){
…
//检测时间事件是否触发
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
…
}
那么,具体到 proecessTimeEvent 函数来说,它的基本流程就是从时间事件链表上逐一取出每一个事件,然后根据当前时间判断该事件的触发时间戳是否已满足。如果已满足,那么就调用该事件对应的回调函数进行处理。这样一来,周期性任务就能在不断循环执行的 aeProcessEvents 函数中,得到执行了。下面的代码显示了 processTimeEvents 函数的基本流程,你可以再看下。
static int processTimeEvents(aeEventLoop *eventLoop) {
...
te = eventLoop->timeEventHead; //从时间事件链表中取出事件
while(te) {
...
aeGetTime(&now_sec, &now_ms); //获取当前时间
if (now_sec > te->when_sec || (now_sec == te->when_sec && now_ms >= te->when_ms)) //如果当前时间已经满足当前事件的触发时间戳
{
...
retval = te->timeProc(eventLoop, id, te->clientData); //调用注册的回调函数处理
...
}
te = te->next; //获取下一个时间事件
...
}
7. 总结
下面这张表格列出了 Redis 事件驱动框架中部分关键函数的作用,以及与 Reactor 模型的对应关系,仅供参考:
关键函数 | 所属文件 | 主要功能 | 对应Reactor角色 |
---|---|---|---|
aeMain | ae.c | 主循环,执行事件捕获、分发、处理 | Reactor |
aeProcessEvents | ae.c | 根据事件类型进行相应的处理 | Reactor |
acceptTcpHandler | networking.c | 处理连接事件,并注册读事件回调函数 | acceptor |
serverCron | server.c | 处理时间事件 | Handler |
readQueryFromClient | networking.c | 处理读事件 | Handler |
sendReplyToClient | networking.c | 处理写事件 | Handler |
其中 sendReplyToClient 写事件处理在上文没有提及,他是在 aeMain 循环开始前,调用 beforesleep 函数,beforesleep 调用 handleClientsWithPendingWrites 遍历每一个待写回数据的客户端,然后调用 writeToClient 函数,将客户端输出缓冲区中的数据写回。
如果输出缓冲区的数据还没有写完,此时,handleClientsWithPendingWrites 函数就会调用 aeCreateFileEvent 函数,创建可写事件,并设置回调函数 sendReplyToClient。sendReplyToClient 函数里面会调用 writeToClient 函数写回数据。
这里也用一张表格来描述 Redis 中关键函数的作用,以及与 Epoll 各函数的调用关系:
关键函数 | 所属文件 | 主要功能 | 调用 Epoll 函数 |
---|---|---|---|
aeCreateEventLoop ---> aeApiCreate | ae.c ---> ae_epoll.c | 初始化aeEventLoop时,创建epoll实例 | epoll_create |
aeCreateFileEvent ---> aeApiAddEvent | ae.c ---> ae_epoll.c | 注册监听的 I/O 事件,并设置回调函数 | epoll_ctl |
aeProcessEvents ---> aeApiPoll | ae.c ---> ae_epoll.c | 获取已经就绪的事件 | epoll_wait |
参考资料:
- 极客事件专栏《Redis源码剖析与实战》.蒋德钧.2021
- 《Netty权威指南(第2版)》.李林峰.2015
- Redis 5.0.14源码:https://github.com/redis/redis/tree/5.0
- CSDN博客:https://blog.csdn.net/qq_36414013/article/details/100620871
网友评论