美文网首页
浅析 Redis 事件驱动框架

浅析 Redis 事件驱动框架

作者: JBryan | 来源:发表于2022-12-15 20:18 被阅读0次

    Redis 5.0.14 源码地址:https://github.com/redis/redis/tree/5.0

    1. Epoll 与 Reactor 简述

    1.1 Epoll

    对于 epoll 机制来说,我们则需要先调用 epoll_create 函数,创建一个 epoll 实例。这个 epoll 实例内部维护了两个结构,分别是记录要监听的文件描述符和已经就绪的文件描述符,而对于已经就绪的文件描述符来说,它们会被返回给用户程序进行处理。
    所以,我们在使用 epoll 机制时,就不用像使用 select 和 poll 一样,遍历查询哪些文件描述符已经就绪了。这样一来, epoll 的效率就比 select 和 poll 有了更高的提升。
    在创建了 epoll 实例后,我们需要再使用 epoll_ctl 函数,给被监听的文件描述符添加监听事件类型,以及使用 epoll_wait 函数获取就绪的文件描述符。

    int sock_fd,conn_fd; //监听套接字和已连接套接字的变量
    sock_fd = socket() //创建套接字
    bind(sock_fd)   //绑定套接字
    listen(sock_fd) //在套接字上进行监听,将套接字转为监听套接字
        
    epfd = epoll_create(EPOLL_SIZE); //创建epoll实例,
    //创建epoll_event结构体数组,保存套接字对应文件描述符和监听事件类型    
    ep_events = (epoll_event*)malloc(sizeof(epoll_event) * EPOLL_SIZE);
    
    //创建epoll_event变量
    struct epoll_event ee
    //监听读事件
    ee.events = EPOLLIN;
    //监听的文件描述符是刚创建的监听套接字
    ee.data.fd = sock_fd;
    
    //将监听套接字加入到监听列表中    
    epoll_ctl(epfd, EPOLL_CTL_ADD, sock_fd, &ee); 
        
    while (1) {
       //等待返回已经就绪的描述符 
       n = epoll_wait(epfd, ep_events, EPOLL_SIZE, -1); 
       //遍历所有就绪的描述符     
       for (int i = 0; i < n; i++) {
           //如果是监听套接字描述符就绪,表明有一个新客户端连接到来 
           if (ep_events[i].data.fd == sock_fd) { 
              conn_fd = accept(sock_fd); //调用accept()建立连接
              ee.events = EPOLLIN;  
              ee.data.fd = conn_fd;
              //添加对新创建的已连接套接字描述符的监听,监听后续在已连接套接字上的读事件      
              epoll_ctl(epfd, EPOLL_CTL_ADD, conn_fd, &ee); 
                    
           } else { //如果是已连接套接字描述符就绪,则可以读数据
               ...//读取数据并处理
           }
       }
    }
    

    也正是因为 epoll 能自定义监听的描述符数量,以及可以直接返回就绪的描述符,Redis 在设计和实现网络通信框架时,就基于 epoll 机制中的 epoll_create、epoll_ctl 和 epoll_wait 等函数和读写事件,进行了封装开发,实现了用于网络通信的事件驱动框架,从而使得 Redis 虽然是单线程运行,但是仍然能高效应对高并发的客户端访问。

    1.2 Reactor 单线程模型

    Reactor 模型就是网络服务器端用来处理高并发网络 IO 请求的一种编程模型。我把这个模型的特征用两个“三”来总结,也就是:

    • 三类处理事件,即连接事件、写事件、读事件;
    • 三个关键角色,即 reactor、acceptor、handler。

    Reactor 模型处理的是客户端和服务器端的交互过程,而这三类事件正好对应了客户端和服务器端交互过程中,不同类请求在服务器端引发的待处理事件:

    • 当一个客户端要和服务器端进行交互时,客户端会向服务器端发送连接请求,以建立连接,这就对应了服务器端的一个连接事件。
    • 一旦连接建立后,客户端会给服务器端发送读请求,以便读取数据。服务器端在处理读请求时,需要向客户端写回数据,这对应了服务器端的写事件。
    • 无论客户端给服务器端发送读或写请求,服务器端都需要从客户端读取请求内容,所以在这里,读或写请求的读取就对应了服务器端的读事件。

    在了解了 Reactor 模型的三类事件后,你现在可能还有一个疑问:这三类事件是由谁来处理的呢?这其实就是模型中三个关键角色的作用了:

    • 首先,连接事件由 acceptor 来处理,负责接收连接;acceptor 在接收连接后,会创建 handler,用于网络连接上对后续读写事件的处理;
    • 其次,读写事件由 handler 处理;
    • 最后,在高并发场景中,连接事件、读写事件会同时发生,所以,我们需要有一个角色专门监听和分配事件,这就是 reactor 角色。当有连接请求时,reactor 将产生的连接事件交由 acceptor 处理;当有读写请求时,reactor 将读写事件交由 handler 处理。
      34.png
      上面介绍的是单线程Reactor模式,还有多线程、主从模型模式,详情可参考:https://blog.csdn.net/qq_36414013/article/details/100620871

    2. 数据结构

    事件驱动框架三个重要的结构体:

    • aeEventLoop,对应事件驱动框架循环流程,记录了时间事件和 I/O事件,以及进入循环流程前后执行的函数等信息。
    • aeFileEvent ,I/O事件,包括连接事件、读事件、写事件。
    • aeTimeEvent,时间事件,即按一定时间周期触发的事件。


      42.png

      这三个结构体都定义在src/ae.h中

    /*IO事件*/
    /* File event structure */
    typedef struct aeFileEvent {
        int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
        aeFileProc *rfileProc;
        aeFileProc *wfileProc;
        void *clientData;
    } aeFileEvent;
    
    /*时间事件*/
    /* Time event structure */
    typedef struct aeTimeEvent {
        long long id; /* time event identifier. */
        long when_sec; /* seconds */
        long when_ms; /* milliseconds */
        aeTimeProc *timeProc;
        aeEventFinalizerProc *finalizerProc;
        void *clientData;
        struct aeTimeEvent *prev;
        struct aeTimeEvent *next;
    } aeTimeEvent;
    ...
    /* State of an event based program */
    typedef struct aeEventLoop {
        int maxfd;   /* highest file descriptor currently registered */
        int setsize; /* max number of file descriptors tracked */
        long long timeEventNextId;
        time_t lastTime;     /* Used to detect system clock skew */
        aeFileEvent *events; /* Registered events */
        aeFiredEvent *fired; /* Fired events */
        aeTimeEvent *timeEventHead;
        int stop;
        void *apidata; /* This is used for polling API specific data */
        aeBeforeSleepProc *beforesleep;
        aeBeforeSleepProc *aftersleep;
    } aeEventLoop;
    

    3. aeEventLoop的初始化

    Redis 运行的基本控制逻辑是在 src/server.c 文件中完成的,Redis启动首先会执行 server.c 的 main 函数,在 main 函数中会调用 initServer 函数,initServer 调用 src/ae.c 的 aeCreateEventLoop 函数,来进行初始化了。
    server.c 相关代码

    int main(int argc, char **argv) {
    ...
       initServer();
    ...
    }
    
    initServer() {
    ...
        //调用aeCreateEventLoop函数创建aeEventLoop结构体,并赋值给server结构的el变量
        server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
    ...
    }
    

    aeCreateEventLoop 函数的参数只有一个,是 setsize。参数 setsize 的大小,其实是由 server 结构的 maxclients 变量和宏定义 CONFIG_FDSET_INCR 共同决定的。其中,maxclients 变量的值大小,可以在 Redis 的配置文件 redis.conf 中进行定义,默认值是 1000。而宏定义 CONFIG_FDSET_INCR 的大小,等于宏定义 CONFIG_MIN_RESERVED_FDS 的值再加上 96,如下所示,这里的两个宏定义都是在server.h文件中定义的。
    aeCreateEventLoop 函数执行的操作,大致可以分成以下三个步骤。

    • 第一步,aeCreateEventLoop 函数会创建一个 aeEventLoop 结构体类型的变量 eventLoop。然后,该函数会给 eventLoop 的成员变量分配内存空间,比如,按照传入的参数 setsize,给 IO 事件数组和已触发事件数组分配相应的内存空间。此外,该函数还会给 eventLoop 的成员变量赋初始值。

    • 第二步,aeCreateEventLoop 函数会调用 aeApiCreate 函数,aeApiCreate 函数封装了操作系统提供的 IO 多路复用函数。在这里,具体执行什么多路复用函数,是根据当前运行的操作系统来决定的,具体定义在 ae.c 文件头部:

      • ae_epoll.c,对应 Linux 上的 IO 复用函数 epoll;
      • ae_evport.c,对应 Solaris 上的 IO 复用函数 evport;
      • ae_kqueue.c,对应 macOS 或 FreeBSD 上的 IO 复用函数 kqueue;
      • ae_select.c,对应 Linux(或 Windows)的 IO 复用函数 select。
    /* Include the best multiplexing layer supported by this system.
     * The following should be ordered by performances, descending. */
    //根据不同的平台,来编译不同的文件,从而实现不同的多路复用
    #ifdef HAVE_EVPORT
    #include "ae_evport.c"
    #else
        #ifdef HAVE_EPOLL
        #include "ae_epoll.c"
        #else
            #ifdef HAVE_KQUEUE
            #include "ae_kqueue.c"
            #else
            #include "ae_select.c"
            #endif
        #endif
    #endif
    

    假设 Redis 运行在 Linux 操作系统上,并且 IO 多路复用机制是 epoll,那么此时,aeApiCreate 函数就会调用 ae_epoll.c 的 aeApiCreate 函数来创建 epoll 实例,同时会创建 epoll_event 结构的数组,数组大小等于参数 setsize。这里你需要注意,aeApiCreate 函数是把创建的 epoll 实例描述符和 epoll_event 数组,保存在了 aeApiState 结构体类型的变量 state,如下所示:

    typedef struct aeApiState {  //aeApiState结构体定义
        int epfd;   //epoll实例的描述符
        struct epoll_event *events;   //epoll_event结构体数组,记录监听事件
    } aeApiState;
    
    static int aeApiCreate(aeEventLoop *eventLoop) {
        aeApiState *state = zmalloc(sizeof(aeApiState));
        ...
        //将epoll_event数组保存在aeApiState结构体变量state中
        state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
        ...
        //将epoll实例描述符保存在aeApiState结构体变量state中
        state->epfd = epoll_create(1024); 
    
    • 第三步,aeCreateEventLoop 函数会把所有网络 IO 事件对应文件描述符的掩码,初始化为 AE_NONE,表示暂时不对任何事件进行监听
    aeEventLoop *aeCreateEventLoop(int setsize) {
        aeEventLoop *eventLoop;
        int i;
       
        //给eventLoop变量分配内存空间
      if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
      //给IO事件、已触发事件分配内存空间
        eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
        eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
        …
        eventLoop->setsize = setsize;
        eventLoop->lastTime = time(NULL);
        //设置时间事件的链表头为NULL
        eventLoop->timeEventHead = NULL;
      …
      //调用aeApiCreate函数,去实际调用操作系统提供的IO多路复用函数
      if (aeApiCreate(eventLoop) == -1) goto err;
       
        //将所有网络IO事件对应文件描述符的掩码设置为AE_NONE
        for (i = 0; i < setsize; i++)
            eventLoop->events[i].mask = AE_NONE;
        return eventLoop;
     
        //初始化失败后的处理逻辑,
        err:
        …
    }
    

    那么从 aeCreateEventLoop 函数的执行流程中,我们其实可以看到以下两个关键点:

    • 事件驱动框架监听的 IO 事件数组大小就等于参数 setsize,这样决定了和 Redis server 连接的客户端数量。所以,当你遇到客户端连接 Redis 时报错“max number of clients reached”,你就可以去 redis.conf 文件修改 maxclients 配置项,以扩充框架能监听的客户端数量。
    • 当使用 Linux 系统的 epoll 机制时,框架循环流程初始化操作,会通过 aeApiCreate 函数创建 epoll_event 结构数组,并调用 epoll_create 函数创建 epoll 实例,这都是使用 epoll 机制的准备工作要求。

    4. 事件注册

    4.1 I/O 事件注册

    server.c 的 initServer 函数初始化 aeEventLoop 之后,会根据启用的 IP 端口个数,为每个 IP 端口上的网络事件,调用 aeCreateFileEvent函数,创建对 AE_READABLE 事件的监听,并且注册 AE_READABLE 事件的处理 handler,也就是 acceptTcpHandler 函数。

    void initServer(void) {
        …
        for (j = 0; j < server.ipfd_count; j++) {
            if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
                acceptTcpHandler,NULL) == AE_ERR)
                {
                    serverPanic("Unrecoverable error creating server.ipfd file event.");
                }
      }
      …
    }
    

    我们来看 aeCreateFileEvent 函数的原型定义,如下所示:

    //这个函数的参数有 5 个,分别是循环流程结构体 *eventLoop、IO 事件对应的文件描述符 fd、事件类型掩码 mask、事件处理回调函数*proc,以及事件私有数据*clientData。
    int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData)
    

    因为循环流程结构体*eventLoop中有 IO 事件数组,这个数组的元素是 aeFileEvent 类型,所以,每个数组元素都对应记录了一个文件描述符(比如一个套接字)相关联的监听事件类型和回调函数。

    int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
            aeFileProc *proc, void *clientData)
    {
        if (fd >= eventLoop->setsize) {
            errno = ERANGE;
            return AE_ERR;
        }
        //获取该描述符关联的 IO 事件指针变量*fe
        aeFileEvent *fe = &eventLoop->events[fd];
        //调用 aeApiAddEvent 函数,添加要监听的事件
        if (aeApiAddEvent(eventLoop, fd, mask) == -1)
            return AE_ERR;
        fe->mask |= mask;
        if (mask & AE_READABLE) fe->rfileProc = proc;
        if (mask & AE_WRITABLE) fe->wfileProc = proc;
        fe->clientData = clientData;
        if (fd > eventLoop->maxfd)
            eventLoop->maxfd = fd;
        return AE_OK;
    }
    

    aeApiAddEvent 函数实际上会调用操作系统提供的 IO 多路复用函数,来完成事件的添加。我们还是假设 Redis 实例运行在使用 epoll 机制的 Linux 上,那么就会调用 ae_epoll.c的aeApiAddEvent 函数,进而调用 epoll_ctl 函数,添加要监听的事件。 epoll_ctl 函数,这个函数会接收 4 个参数,分别是:

    • epoll 实例;
    • 要执行的操作类型(是添加还是修改);
    • 要监听的文件描述符;
    • epoll_event 类型变量。
    static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
        aeApiState *state = eventLoop->apidata;
        struct epoll_event ee = {0}; /* avoid valgrind warning */
        /* If the fd was already monitored for some event, we need a MOD
         * operation. Otherwise we need an ADD operation. */
        int op = eventLoop->events[fd].mask == AE_NONE ?
                EPOLL_CTL_ADD : EPOLL_CTL_MOD;
    
        ee.events = 0;
        mask |= eventLoop->events[fd].mask; /* Merge old events */
        if (mask & AE_READABLE) ee.events |= EPOLLIN;
        if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
        ee.data.fd = fd;
        //调用 epoll_ctl,来注册希望监听的事件和相应的处理函数。
        if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
        return 0;
    }
    

    3.2 时间事件注册

    与 IO 事件创建使用 aeCreateFileEvent 函数类似,时间事件的创建函数是 aeCreateTimeEvent 函数。这个函数的原型定义如下所示:

    //milliseconds,这是所创建时间事件的触发时间距离当前时间的时长,是用毫秒表示的。另一个是 *proc,这是所创建时间事件触发后的回调函数。
    long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc)
    

    aeCreateTimeEvent 函数的执行逻辑不复杂,主要就是创建一个时间事件的变量 te,对它进行初始化,并把它插入到框架循环流程结构体 eventLoop 中的时间事件链表中。在这个过程中,aeCreateTimeEvent 函数会调用 aeAddMillisecondsToNow 函数,根据传入的 milliseconds 参数,计算所创建时间事件具体的触发时间戳,并赋值给 te。
    实际上,Redis server 在初始化时,除了创建监听的 IO 事件外,也会调用 aeCreateTimeEvent 函数创建时间事件。下面代码显示了 initServer 函数对 aeCreateTimeEvent 函数的调用:

    initServer() {
    …
    //创建时间事件
    if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR){
    … //报错信息
    }
    }
    

    时间事件回调函数
    从代码中,我们可以看到,时间事件触发后的回调函数是 serverCron。所以接下来,我们就来了解下 serverCron 函数。
    serverCron 函数是在 server.c 文件中实现的。一方面,它会顺序调用一些函数,来实现时间事件被触发后,执行一些后台任务。比如,serverCron 函数会检查是否有进程结束信号,若有就执行 server 关闭操作。serverCron 会调用 databaseCron 函数,处理过期 key 或进行 rehash 等。你可以参考下面给出的代码:

    ...
    //如果收到进程结束信号,则执行server关闭操作
     if (server.shutdown_asap) {
            if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0);
            ...
     }
    ...
    clientCron();  //执行客户端的异步操作
    databaseCron(); //执行数据库的后台操作
    ...
    

    另一方面,serverCron 函数还会以不同的频率周期性执行一些任务,这是通过执行宏 run_with_period 来实现的。比如,serverCron 函数中会以 1 秒 1 次的频率,检查 AOF 文件是否有写错误。如果有的话,serverCron 就会调用 flushAppendOnlyFile 函数,再次刷回 AOF 文件的缓存数据。下面的代码展示了这一周期性任务:

    serverCron() {
       …
       //每1秒执行1次,检查AOF是否有写错误
       run_with_period(1000) {
            if (server.aof_last_write_status == C_ERR)
                flushAppendOnlyFile(0);
        }
       …
    }
    

    5. 事件捕获与分发

    在 server.c 的 main 函数最后,会调用 aeMain 函数,aeMain 函数的逻辑很简单,就是用一个循环不停地判断事件循环的停止标记。如果事件循环的停止标记被设置为 true,那么针对事件捕获、分发和处理的整个主循环就停止了;否则,主循环会一直执行。aeMain 函数的主体代码如下所示:

    int main(int argc, char **argv) {
        ...
        aeMain(server.el);
        ...
        return 0;
    }
    
    void aeMain(aeEventLoop *eventLoop) {
        eventLoop->stop = 0;
        while (!eventLoop->stop) {
            if (eventLoop->beforesleep != NULL)
                eventLoop->beforesleep(eventLoop);
            aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
        }
    }
    

    aeProcessEvents 函数实现的主要功能,包括捕获事件、判断事件类型和调用具体的事件处理函数,从而实现事件的处理。从 aeProcessEvents 函数的主体结构中,我们可以看到主要有三个 if 条件分支,如下所示:

    int aeProcessEvents(aeEventLoop *eventLoop, int flags)
    {
        int processed = 0, numevents;
    
        /* 1. 若没有事件处理,则立刻返回*/
        /* Nothing to do? return ASAP */
        if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
    
        /* 2. 如果有IO事件发生,或者紧急的时间事件发生,则开始处理*/
        /* Note that we want call select() even if there are no
         * file events to process as long as we want to process time
         * events, in order to sleep until the next time event is ready
         * to fire. */
        if (eventLoop->maxfd != -1 ||
            ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
            ...
            //调用aeApiPoll(epoll_wait)函数捕获事件
            /* Call the multiplexing API, will return only on timeout or when
             * some event fires. */
            numevents = aeApiPoll(eventLoop, tvp);
            ...
            for (j = 0; j < numevents; j++) {
                ...
                 //如果触发的是可读事件,调用事件注册时设置的读事件回调处理函数
                if (!invert && fe->mask & mask & AE_READABLE) {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
    
                //如果触发的是可写事件,调用事件注册时设置的写事件回调处理函数
                /* Fire the writable event. */
                if (fe->mask & mask & AE_WRITABLE) {
                    if (!fired || fe->wfileProc != fe->rfileProc) {
                        fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                        fired++;
                    }
                }
                ...
                processed++;
            }
        }
       
        /* 3. 检查是否有时间事件,若有,则调用processTimeEvents函数处理 */
        /* Check time events */
        if (flags & AE_TIME_EVENTS)
            processed += processTimeEvents(eventLoop);
        /* 返回已经处理的文件或时间的数目*/
        return processed; /* return the number of processed file/time events */
    }
    

    在其中第2个分支中,aeApiPoll 函数会被调用,用来捕获 I/O 事件,ae_epoll.c 的 aeApiPoll 直接调用了 epoll_wait 函数,并将 epoll 返回的事件信息保存起来的逻辑:

    static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
        …
        //调用epoll_wait获取监听到的事件
        retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
                tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
        if (retval > 0) {
            int j;
            //获得监听到的事件数量
            numevents = retval;
            //针对每一个事件,进行处理
            for (j = 0; j < numevents; j++) {
                 #保存事件信息
            }
        }
        return numevents;
    }
    

    6. 事件处理

    6.1 读事件处理

    当 Redis server 接收到客户端的连接请求时,就会使用注册好的 acceptTcpHandler 函数进行处理。initServer()函数中,就已经注册了读事件和acceptTcpHandler 处理函数。

     //为每个 IP 端口上的网络事件,调用 aeCreateFileEvent,创建对 AE_READABLE 事件的监听,并且注册 AE_READABLE 事件的处理 handler,也就是 acceptTcpHandler 函数。
        /* Create an event handler for accepting new connections in TCP and Unix
         * domain sockets. */
        for (j = 0; j < server.ipfd_count; j++) {
            if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
                acceptTcpHandler,NULL) == AE_ERR)
                {
                    serverPanic(
                        "Unrecoverable error creating server.ipfd file event.");
                }
        }
    

    acceptTcpHandler 函数是在networking.c文件中,它会接受客户端连接,并创建已连接套接字 cfd。然后,acceptCommonHandler 函数(在 networking.c 文件中)会被调用,同时,刚刚创建的已连接套接字 cfd 会作为参数,传递给 acceptCommonHandler 函数。
    acceptCommonHandler 函数会调用 createClient 函数(在 networking.c 文件中)创建客户端。而在 createClient 函数中,我们就会看到,aeCreateFileEvent 函数被再次调用了。
    此时,aeCreateFileEvent 函数会针对已连接套接字上,创建监听事件,类型为 AE_READABLE,回调函数是 readQueryFromClient(在 networking.c 文件中)。
    好了,到这里,事件驱动框架就增加了对一个客户端已连接套接字的监听。一旦客户端有请求发送到 server,框架就会回调 readQueryFromClient 函数处理请求。这样一来,客户端请求就能通过事件驱动框架进行处理了。

    client *createClient(int fd) {
    …
    if (fd != -1) {
            …
            //调用aeCreateFileEvent,监听读事件,对应客户端读写请求,使用readQueryFromclient回调函数处理
            if (aeCreateFileEvent(server.el,fd,AE_READABLE,
                readQueryFromClient, c) == AE_ERR)
            {
                close(fd);
                zfree(c);
                return NULL;
            } }
    …
    }
    

    下图展示了从监听客户端连接请求,到监听客户端常规读写请求的事件创建过程


    39.png

    6.2 时间事件处理

    其实,时间事件的检测触发比较简单,事件驱动框架的 aeMain 函数会循环调用 aeProcessEvents 函数,来处理各种事件。而 aeProcessEvents 函数在执行流程的最后,会调用 processTimeEvents 函数处理相应到时的任务。

    aeProcessEvents(){
    …
    //检测时间事件是否触发
    if (flags & AE_TIME_EVENTS)
            processed += processTimeEvents(eventLoop);
    …
    }
    

    那么,具体到 proecessTimeEvent 函数来说,它的基本流程就是从时间事件链表上逐一取出每一个事件,然后根据当前时间判断该事件的触发时间戳是否已满足。如果已满足,那么就调用该事件对应的回调函数进行处理。这样一来,周期性任务就能在不断循环执行的 aeProcessEvents 函数中,得到执行了。下面的代码显示了 processTimeEvents 函数的基本流程,你可以再看下。

    static int processTimeEvents(aeEventLoop *eventLoop) {
    ...
    te = eventLoop->timeEventHead;  //从时间事件链表中取出事件
    while(te) {
       ...
      aeGetTime(&now_sec, &now_ms);  //获取当前时间
      if (now_sec > te->when_sec || (now_sec == te->when_sec && now_ms >= te->when_ms))   //如果当前时间已经满足当前事件的触发时间戳
      {
         ...
        retval = te->timeProc(eventLoop, id, te->clientData); //调用注册的回调函数处理
        ...
      }
      te = te->next;   //获取下一个时间事件
      ...
    }
    

    7. 总结

    下面这张表格列出了 Redis 事件驱动框架中部分关键函数的作用,以及与 Reactor 模型的对应关系,仅供参考:

    关键函数 所属文件 主要功能 对应Reactor角色
    aeMain ae.c 主循环,执行事件捕获、分发、处理 Reactor
    aeProcessEvents ae.c 根据事件类型进行相应的处理 Reactor
    acceptTcpHandler networking.c 处理连接事件,并注册读事件回调函数 acceptor
    serverCron server.c 处理时间事件 Handler
    readQueryFromClient networking.c 处理读事件 Handler
    sendReplyToClient networking.c 处理写事件 Handler

    其中 sendReplyToClient 写事件处理在上文没有提及,他是在 aeMain 循环开始前,调用 beforesleep 函数,beforesleep 调用 handleClientsWithPendingWrites 遍历每一个待写回数据的客户端,然后调用 writeToClient 函数,将客户端输出缓冲区中的数据写回。
    如果输出缓冲区的数据还没有写完,此时,handleClientsWithPendingWrites 函数就会调用 aeCreateFileEvent 函数,创建可写事件,并设置回调函数 sendReplyToClient。sendReplyToClient 函数里面会调用 writeToClient 函数写回数据。

    这里也用一张表格来描述 Redis 中关键函数的作用,以及与 Epoll 各函数的调用关系:

    关键函数 所属文件 主要功能 调用 Epoll 函数
    aeCreateEventLoop ---> aeApiCreate ae.c ---> ae_epoll.c 初始化aeEventLoop时,创建epoll实例 epoll_create
    aeCreateFileEvent ---> aeApiAddEvent ae.c ---> ae_epoll.c 注册监听的 I/O 事件,并设置回调函数 epoll_ctl
    aeProcessEvents ---> aeApiPoll ae.c ---> ae_epoll.c 获取已经就绪的事件 epoll_wait

    参考资料:

    1. 极客事件专栏《Redis源码剖析与实战》.蒋德钧.2021
    2. 《Netty权威指南(第2版)》.李林峰.2015
    3. Redis 5.0.14源码:https://github.com/redis/redis/tree/5.0
    4. CSDN博客:https://blog.csdn.net/qq_36414013/article/details/100620871

    相关文章

      网友评论

          本文标题:浅析 Redis 事件驱动框架

          本文链接:https://www.haomeiwen.com/subject/flelqdtx.html