什么是AE?
首先介绍这里说的的AE指的是redis的网络事件库,标准说明应该是:一个简单的事件驱动程序库。
源码ae.h头文件说明如下:
/* A simple event-driven programming library. Originally I wrote this code
* for the Jim's event-loop (Jim is a Tcl interpreter) but later translated
* it in form of a library for easy reuse.
我们知道这个程序库封装了网络模型:evport、epoll、kqueue、select。
那么第一个问题:redis是如何实现不同操作系统使用对应的模型呢?
redis是如何实现不同操作系统使用对应的模型呢?
上代码:
/* Include the best multiplexing layer supported by this system.
* The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
上面这部分代码是在ae.c前面的一段代码,显然是通过宏来控制引入不同模型的.c文件实现的。也就是不同操作系统定义不同的宏,编译的时候就编译对应的.c文件。我们可以进一步来看下宏定义,位于config.h:
#ifdef __linux__
#define HAVE_EPOLL 1
#endif
#if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
#define HAVE_KQUEUE 1
#endif
#ifdef __sun
#include <sys/feature_tests.h>
#ifdef _DTRACE_VERSION
#define HAVE_EVPORT 1
#endif
#endif
可知:
linux系统使用epoll,apple/Mac/bsd系统使用kqueue,__sun使用evport(宏__sun实际对应Solaris系统),其他操作系统如windows则使用select。
epoll和kqueue都有听说过,第二个问题来了:evport是什么模型?
evport是什么模型?
evport主要API:
-
port_create()
原型:int port_create(void);port_create() 创建一个 Event ports 队列,返回一个文件描述符作为该
Event port 的代表。相似:kqueue(),epoll_create()
-
port_associate()
原型:int port_associate(int port, int source, uintptr_t object,
int events, void *user);port_associate() 将某一个对象的特定 event 与 Event port 相关联。当
source 为 PORT_SOURCE_FD 时,object 就是文件描述符。events 可以参考
poll 的。user 是一个用户自定义的指针,与该 object 相关的。在
kqueue 和 epoll 也提供了类似的用户自定义指针。需要注意的是,当用 port_get() 取得某个 object 的 event 之后,这个
object 与 port 也就不再相关联了。如果想继续取得这个 object 的 event,
必须再次调用 port_associate() 将 object 与 port 关联。这种设计显然
是为多线程程序而做的,当某一个线程取得一个 event 之后,object 就从
port 的队列中删掉了,这样可以保证这个线程完完整整地处理完这个 event,
不用担心别的线程也会取得这个 event。相似:kevent(),epoll_ctl()
-
port_get()
原型:int port_get(int port, port_event_t *pe, const timespec_t
*timeout);port_get() 每次从 port 中取回一个 event。如果 timeout 参数为 NULL,
port_get() 会一直等待,直到某一个 event 到来。pe 用于存储返回的
event。相似:kevent(),epoll_wait()
-
port_getn()
原型:int port_getn(int port, port_event_t list[], uint_t max,
uint_t *nget, const timespec_t *timeout);port_getn() 与 port_get() 都是用于从 port 中取回 event,不同的是
port_getn() 可以一次取回多个。list 数组用于存储返回的多个 events,
max 为 list 数组的元素个数。timeout 与 port_get() 的一致。需要特别注意的是 nget 参数,这是一个 value-result 参数,也就是传入
n,告诉内核要取得 n 个 event,当 port_getn() 返回时,getn 的值表示
内核实际取得了多少个。当 timeout 为 NULL 时,port_getn() 会一直等待,
直到确实取得了 n 个 event 之后才会返回,这一点是与 kevent() 和
epoll_wait() 很不相同的地方。如果 timeout 不为 NULL,则只要超时就返
回,不管是不是已经取到了 n 个 event。(注:这里忽略了其他可能引起
port_getn() 返回的因素)相似:kevent(),epoll_wait()
总结evport:实际也是类似epoll、kqueue的一种事件模型。
再回过头来,我们看看redis 定义的事件模型是怎么被调用和工作的?
redis 定义的事件模型是怎么被调用和工作的?
/* State of an event based program */
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
time_t lastTime; /* Used to detect system clock skew */
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
} aeEventLoop;
上面定义了事件循环结构体。它在server中使用如下:
struct redisServer {
/* General */
pid_t pid; /* Main process pid. */
char *configfile; /* Absolute config file path, or NULL */
char *executable; /* Absolute executable file path. */
char **exec_argv; /* Executable argv vector (copy). */
int dynamic_hz; /* Change hz value depending on # of clients. */
int config_hz; /* Configured HZ value. May be different than
the actual 'hz' field value if dynamic-hz
is enabled. */
int hz; /* serverCron() calls frequency in hertz */
redisDb *db;
dict *commands; /* Command table */
dict *orig_commands; /* Command table before command renaming. */
aeEventLoop *el;//注意这个就是aeEventLoop在sever定义的变量
看看它使用的地方:
1)初始化
void initServer(void) {
...省略其它代码
//初始化
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
...
//创建一个定时事件 每隔1ms进行回调
/* Create the timer callback, this is our way to process many background
* operations incrementally, like clients timeout, eviction of unaccessed
* expired keys and so forth. */
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
//针对每个新连接创建文件事件
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
//针对unix socket创建文件事件
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
/* Register a readable event for the pipe used to awake the event loop
* when a blocked client in a module needs attention. */
//创建一个针对pipe的事件
if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE,
moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
serverPanic(
"Error registering the readable event for the module "
"blocked clients subsystem.");
}
}
上面的初始化看到创建了多个文件事件,这个文件事件实际就是用来监听fd的读写。
2)循环处理
int main(int argc, char **argv) {
...
aeSetBeforeSleepProc(server.el,beforeSleep);//设置事件循环处理前的回调
aeSetAfterSleepProc(server.el,afterSleep);//设置事件循环处理后的回调
aeMain(server.el);//设置事件循环主要逻辑(包含事件处理逻辑)
aeDeleteEventLoop(server.el);//清除事件循环对象
...
}
总结:事件loop对象在initServer函数中初始化,在main函数调用。通过aeMain中的while函数不断处理事件。
网友评论