美文网首页
Redis AE事件库研究

Redis AE事件库研究

作者: 胖子罗 | 来源:发表于2019-08-25 18:22 被阅读0次

    什么是AE?

    首先介绍这里说的的AE指的是redis的网络事件库,标准说明应该是:一个简单的事件驱动程序库。
    源码ae.h头文件说明如下:

    /* A simple event-driven programming library. Originally I wrote this code
     * for the Jim's event-loop (Jim is a Tcl interpreter) but later translated
     * it in form of a library for easy reuse.
    

    我们知道这个程序库封装了网络模型:evport、epoll、kqueue、select。
    那么第一个问题:redis是如何实现不同操作系统使用对应的模型呢?

    redis是如何实现不同操作系统使用对应的模型呢?

    上代码:

    /* Include the best multiplexing layer supported by this system.
     * The following should be ordered by performances, descending. */
    #ifdef HAVE_EVPORT
    #include "ae_evport.c"
    #else
        #ifdef HAVE_EPOLL
        #include "ae_epoll.c"
        #else
            #ifdef HAVE_KQUEUE
            #include "ae_kqueue.c"
            #else
            #include "ae_select.c"
            #endif
        #endif
    #endif
    

    上面这部分代码是在ae.c前面的一段代码,显然是通过宏来控制引入不同模型的.c文件实现的。也就是不同操作系统定义不同的宏,编译的时候就编译对应的.c文件。我们可以进一步来看下宏定义,位于config.h:

    #ifdef __linux__
    #define HAVE_EPOLL 1
    #endif
    
    #if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
    #define HAVE_KQUEUE 1
    #endif
    
    #ifdef __sun
    #include <sys/feature_tests.h>
    #ifdef _DTRACE_VERSION
    #define HAVE_EVPORT 1
    #endif
    #endif
    

    可知:
    linux系统使用epoll,apple/Mac/bsd系统使用kqueue,__sun使用evport(宏__sun实际对应Solaris系统),其他操作系统如windows则使用select。

    epoll和kqueue都有听说过,第二个问题来了:evport是什么模型?

    evport是什么模型?

    evport主要API:

    1. port_create()
      原型:int port_create(void);

      port_create() 创建一个 Event ports 队列,返回一个文件描述符作为该
      Event port 的代表。

      相似:kqueue(),epoll_create()

    2. port_associate()
      原型:int port_associate(int port, int source, uintptr_t object,
      int events, void *user);

      port_associate() 将某一个对象的特定 event 与 Event port 相关联。当
      source 为 PORT_SOURCE_FD 时,object 就是文件描述符。events 可以参考
      poll 的。user 是一个用户自定义的指针,与该 object 相关的。在
      kqueue 和 epoll 也提供了类似的用户自定义指针。

      需要注意的是,当用 port_get() 取得某个 object 的 event 之后,这个
      object 与 port 也就不再相关联了。如果想继续取得这个 object 的 event,
      必须再次调用 port_associate() 将 object 与 port 关联。这种设计显然
      是为多线程程序而做的,当某一个线程取得一个 event 之后,object 就从
      port 的队列中删掉了,这样可以保证这个线程完完整整地处理完这个 event,
      不用担心别的线程也会取得这个 event。

      相似:kevent(),epoll_ctl()

    3. port_get()
      原型:int port_get(int port, port_event_t *pe, const timespec_t
      *timeout);

      port_get() 每次从 port 中取回一个 event。如果 timeout 参数为 NULL,
      port_get() 会一直等待,直到某一个 event 到来。pe 用于存储返回的
      event。

      相似:kevent(),epoll_wait()

    4. port_getn()
      原型:int port_getn(int port, port_event_t list[], uint_t max,
      uint_t *nget, const timespec_t *timeout);

      port_getn() 与 port_get() 都是用于从 port 中取回 event,不同的是
      port_getn() 可以一次取回多个。list 数组用于存储返回的多个 events,
      max 为 list 数组的元素个数。timeout 与 port_get() 的一致。

      需要特别注意的是 nget 参数,这是一个 value-result 参数,也就是传入
      n,告诉内核要取得 n 个 event,当 port_getn() 返回时,getn 的值表示
      内核实际取得了多少个。当 timeout 为 NULL 时,port_getn() 会一直等待,
      直到确实取得了 n 个 event 之后才会返回,这一点是与 kevent() 和
      epoll_wait() 很不相同的地方。如果 timeout 不为 NULL,则只要超时就返
      回,不管是不是已经取到了 n 个 event。(注:这里忽略了其他可能引起
      port_getn() 返回的因素)

      相似:kevent(),epoll_wait()

    总结evport:实际也是类似epoll、kqueue的一种事件模型。

    再回过头来,我们看看redis 定义的事件模型是怎么被调用和工作的?

    redis 定义的事件模型是怎么被调用和工作的?

    /* State of an event based program */
    typedef struct aeEventLoop {
        int maxfd;   /* highest file descriptor currently registered */
        int setsize; /* max number of file descriptors tracked */
        long long timeEventNextId;
        time_t lastTime;     /* Used to detect system clock skew */
        aeFileEvent *events; /* Registered events */
        aeFiredEvent *fired; /* Fired events */
        aeTimeEvent *timeEventHead;
        int stop;
        void *apidata; /* This is used for polling API specific data */
        aeBeforeSleepProc *beforesleep;
        aeBeforeSleepProc *aftersleep;
    } aeEventLoop;
    

    上面定义了事件循环结构体。它在server中使用如下:

    struct redisServer {
        /* General */
        pid_t pid;                  /* Main process pid. */
        char *configfile;           /* Absolute config file path, or NULL */
        char *executable;           /* Absolute executable file path. */
        char **exec_argv;           /* Executable argv vector (copy). */
        int dynamic_hz;             /* Change hz value depending on # of clients. */
        int config_hz;              /* Configured HZ value. May be different than
                                       the actual 'hz' field value if dynamic-hz
                                       is enabled. */
        int hz;                     /* serverCron() calls frequency in hertz */
        redisDb *db;
        dict *commands;             /* Command table */
        dict *orig_commands;        /* Command table before command renaming. */
        aeEventLoop *el;//注意这个就是aeEventLoop在sever定义的变量
    

    看看它使用的地方:
    1)初始化

    void initServer(void) {
        ...省略其它代码
       //初始化
        server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
       ...
       //创建一个定时事件 每隔1ms进行回调
      /* Create the timer callback, this is our way to process many background
         * operations incrementally, like clients timeout, eviction of unaccessed
         * expired keys and so forth. */
        if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
            serverPanic("Can't create event loop timers.");
            exit(1);
        }
        /* Create an event handler for accepting new connections in TCP and Unix
         * domain sockets. */
     //针对每个新连接创建文件事件
        for (j = 0; j < server.ipfd_count; j++) {
            if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
                acceptTcpHandler,NULL) == AE_ERR)
                {
                    serverPanic(
                        "Unrecoverable error creating server.ipfd file event.");
                }
        }
       //针对unix socket创建文件事件
        if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
            acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
    
    
        /* Register a readable event for the pipe used to awake the event loop
         * when a blocked client in a module needs attention. */
      //创建一个针对pipe的事件
        if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE,
            moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
                serverPanic(
                    "Error registering the readable event for the module "
                    "blocked clients subsystem.");
        }
    }
    

    上面的初始化看到创建了多个文件事件,这个文件事件实际就是用来监听fd的读写。

    2)循环处理

    int main(int argc, char **argv) {
    ...
    aeSetBeforeSleepProc(server.el,beforeSleep);//设置事件循环处理前的回调
    aeSetAfterSleepProc(server.el,afterSleep);//设置事件循环处理后的回调
    aeMain(server.el);//设置事件循环主要逻辑(包含事件处理逻辑)
    aeDeleteEventLoop(server.el);//清除事件循环对象
    ...
    }
    

    总结:事件loop对象在initServer函数中初始化,在main函数调用。通过aeMain中的while函数不断处理事件。

    相关文章

      网友评论

          本文标题:Redis AE事件库研究

          本文链接:https://www.haomeiwen.com/subject/gtckectx.html