Reactor 模型就是网络服务器端用来处理高并发网络 IO 请求的一种编程模型,总结起来:
- 三类事件,即 连接事件,写事件,读事件
- 三个角色,即 reactor、acceptor、handler
事件类型与角色
Reactor 模型处理的是客户端和服务器端的交互过程,而这三类事件正好对应了客户端和服务器端交互过程中,不同类请求在服务器端引发的待处理事件:
- 当一个客户端要和服务器端进行交互时,客户端会向服务器端发送连接请求,以建立连接,这就对应了服务器端的一个连接事件。
- 一旦连接建立后,客户端会给服务器端发送读请求,以便读取数据。服务器端在处理读请求时,需要向客户端写回数据,这对应了服务器端的写事件。
- 无论客户端给服务器端发送读或写请求,服务器端都需要从客户端读取请求内容,所以在这里,读或写请求的读取就对应了服务器端的读事件。
这三类事件是由谁来处理的呢?那就是对应三个角色了:
- 连接事件由 acceptor 来处理,负责接收连接;acceptor 在接收连接后,会创建 handler,用于网络连接上对后续读写事件的处理;
- 读写事件由 handler 处理;
- 最后,在高并发场景中,连接事件、读写事件会同时发生,所以,我们需要有一个角色专门监听和分配事件,这就是 reactor 角色。当有连接请求时,reactor 将产生的连接事件交由 acceptor 处理;当有读写请求时,reactor 将读写事件交由 handler 处理。
事件驱动框架
事件驱动框架,就是在实现 Reactor 模型时,需要实现的代码整体控制逻辑。简单来说,事件驱动框架包括了两部分:一是事件初始化;二是事件捕获、分发和处理主循环。
事件初始化是在服务器程序启动时就执行的,它的作用主要是创建需要监听的事件类型,以及该类事件对应的 handler。而一旦服务器完成初始化后,事件初始化也就相应完成了,服务器程序就需要进入到事件捕获、分发和处理的主循环中。
7304940-df0cf0ead00658d2.pngRedis 对 Reactor 模型的实现
github 有人将redis中的网络模块代码移植出来,参见:https://github.com/aisk/libae
Redis 为了实现事件驱动框架,同样的定义了:
事件的数据结构
框架主循环函数
事件捕获分发函数
事件注册函数
事件对应的handler 函数
-
事件的数据结构
Redis 的事件驱动框架定义了两类事件:IO 事件和时间事件,分别对应了客户端发送的网络请求和 Redis 自身的周期性操作#define AE_NONE 0 #define AE_READABLE 1 #define AE_WRITABLE 2 #define AE_BARRIER 4 typedef struct aeFileEvent { int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */ aeFileProc *rfileProc; aeFileProc *wfileProc; void *clientData; } aeFileEvent;
-
框架主循环函数
void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { … aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); } }
-
事件捕获与分发
int aeProcessEvents(aeEventLoop *eventLoop, int flags) { int processed = 0, numevents; /* 若没有事件处理,则立刻返回 */ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; /* 如果有IO事件发生,或者紧急的时间事件发生,则开始处理*/ if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { … } /* 检查是否有时间事件,若有,则调用processTimeEvents函数处理 */ if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); /* 返回已经处理的文件或时间*/ return processed; }
主要来看看第二种情况,首先,当该情况发生时,Redis 需要捕获发生的网络事件,并进行相应的处理。在这种情况下,aeApiPoll 函数会被调用,用来捕获事件,如下所示:
int aeProcessEvents(aeEventLoop *eventLoop, int flags){ ... if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { ... // 调用aeApiPoll函数捕获事件 numevents = aeApiPoll(eventLoop, tvp); ... } ... }
Redis 依赖于操作系统底层提供的 IO 多路复用机制,来实现事件捕获,检查是否有新的连接、读写事件发生。为了适配不同的操作系统,Redis 对不同操作系统实现的网络 IO 多路复用函数,都进行了统一的封装,封装后的代码分别通过以下四个文件中实现:
- ae_epoll.c,对应 Linux 上的 IO 复用函数 epoll
- ae_evport.c,对应 Solaris 上的 IO 复用函数 evport
- ae_kqueue.c,对应 macOS 或 FreeBSD 上的 IO 复用函数 kqueue
- ae_select.c,对应 Linux(或 Windows)的 IO 复用函数 select
这里以epoll 为例,在 aeApiPoll 函数中直接调用了 epoll_wait 函数,并将 epoll 返回的事件信息保存起来的逻辑
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { … // 调用epoll_wait获取监听到的事件 retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); if (retval > 0) { int j; // 获得监听到的事件数量 numevents = retval; // 针对每一个事件,进行处理 for (j = 0; j < numevents; j++) { // 保存事件信息 } } return numevents; }
- 事件注册
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { if (fd >= eventLoop->setsize) { errno = ERANGE; return AE_ERR; } aeFileEvent *fe = &eventLoop->events[fd]; if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR; fe->mask |= mask; if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; fe->clientData = clientData; if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; return AE_OK; }
- 事件处理函数
参考资料
1、《高性能Linux服务器编程》
2、https://github.com/aisk/libae
网友评论