Redis事件

作者: tracyzht | 来源:发表于2017-06-06 00:59 被阅读0次

    redis服务器是一个事件驱动型的,主要包括以下两种类型的事件:
    (1)文件事件:客户端与服务器的socket连接,读命令,写命令都是文件事件。redis服务器是单线程,采用I/O多路复用来处理多个客户端的请求。
    (2)时间事件:周期性地执行一些操作。

    1、事件循环

    事件循环的核心部分是aeEventLoop,下图为数据结构:

    aeEventLoop数据结构

    aeEventLoop保存了待处理的文件事件,时间事件,以及事件执行的上下文。内部持有三个事件数组:
    (1)aeFileEvent *events 已注册文件事件数组;
    (2)aeFiredEvent *fired 已就绪文件事件数组;
    (3)aeTimeEvent *timeEventHead 时间事件列表;
    事件循环
    在server启动过程中,会调用aeMain启动事件处理循环。

    void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
    // 如果有需要在事件处理前执行的函数,那么运行它
    if (eventLoop->beforesleep != NULL)
    eventLoop->beforesleep(eventLoop);
    // 开始处理事件
    aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
    }
    

    aeMain循环处理事件,直到eventLoop→stop=true为止。实际处理文件事件和时间事件的过程是在aeProcessEvents中。

    int aeProcessEvents(aeEventLoop *eventLoop, int flags)
    {
    int processed = 0, numevents;
    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
    if (eventLoop->maxfd != -1 ||
    ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
    int j;
    aeTimeEvent *shortest = NULL;
    struct timeval tv, *tvp;
    // 获取最近的时间事件
    if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
    shortest = aeSearchNearestTimer(eventLoop);
    if (shortest) {
    // 如果时间事件存在的话
    // 那么根据最近可执行时间事件和现在时间的时间差来决定文件事件的阻塞时间
    long now_sec, now_ms;
    /* Calculate the time missing for the nearest
    * timer to fire. */
    // 计算距今最近的时间事件还要多久才能达到
    // 并将该时间距保存在 tv 结构中
    aeGetTime(&now_sec, &now_ms);
    tvp = &tv;
    tvp->tv_sec = shortest->when_sec - now_sec;
    if (shortest->when_ms < now_ms) {
    tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
    tvp->tv_sec --;
    } else {
    tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
    }
    // 时间差小于 0 ,说明事件已经可以执行了,将秒和毫秒设为 0 (不阻塞)
    if (tvp->tv_sec < 0) tvp->tv_sec = 0;
    if (tvp->tv_usec < 0) tvp->tv_usec = 0;
    } else {
    // 执行到这一步,说明没有时间事件
    // 那么根据 AE_DONT_WAIT 是否设置来决定是否阻塞,以及阻塞的时间长度
    if (flags & AE_DONT_WAIT) {
    // 设置文件事件不阻塞
    tv.tv_sec = tv.tv_usec = 0;
    tvp = &tv;
    } else {
    // 文件事件可以阻塞直到有事件到达为止
    tvp = NULL; /* wait forever */
    }
    }
    // 处理文件事件,阻塞时间由 tvp 决定
    numevents = aeApiPoll(eventLoop, tvp);
    for (j = 0; j < numevents; j++) {
    // 从已就绪数组中获取事件
    aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
    int mask = eventLoop->fired[j].mask;
    int fd = eventLoop->fired[j].fd;
    int rfired = 0;
    // 读事件
    if (fe->mask & mask & AE_READABLE) {
    // rfired 确保读/写事件只能执行其中一个
    rfired = 1;
    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
    }
    // 写事件
    if (fe->mask & mask & AE_WRITABLE) {
    if (!rfired || fe->wfileProc != fe->rfileProc)
    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
    }
    processed++;
    }
    }
    /* Check time events */
    // 执行时间事件
    if (flags & AE_TIME_EVENTS)
    processed += processTimeEvents(eventLoop);
    return processed; /* return the number of processed file/time events */
    }
    

    (1)找出最近的时间事件,计算出文件事件的阻塞时间。

    if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
    shortest = aeSearchNearestTimer(eventLoop);
    

    如果最近的时间事件存在,则根据离当前时间的时间差得出文件事件的阻塞时间;
    如果不存在,则根据 AE_DONT_WAIT 是否设置来决定是否阻塞,以及阻塞的时间长度。
    (2)阻塞等待就绪的文件事件
    // 处理文件事件,阻塞时间由 tvp 决定

    numevents = aeApiPoll(eventLoop, tvp);
    

    底层有四种实现方式:(1)evport(2)epoll(3)kqueue(4)select
    (3)处理已就绪的文件事件
    第二步获取的已就绪事件存储在fired中。如果文件事件绑定了读/写事件,进行相应的处理。

    // 读事件
    if (fe->mask & mask & AE_READABLE) {
    // rfired 确保读/写事件只能执行其中一个
    rfired = 1;
    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
    }
    // 写事件
    if (fe->mask & mask & AE_WRITABLE) {
    if (!rfired || fe->wfileProc != fe->rfileProc)
    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
    }
    

    其中rfileProc和wfileProc就是在文件事件被创建时传入的函数指针。

    int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
    aeFileProc *proc, void *clientData)
    {
    //省略部分代码
    // 设置文件事件类型,以及事件的处理器
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    // 私有数据
    fe->clientData = clientData;
    // 如果有需要,更新事件处理器的最大 fd
    if (fd > eventLoop->maxfd)
    eventLoop->maxfd = fd;
    return AE_OK;
    }
    

    (4)执行时间事件
    事件循环的流程如下图:

    事件循环过程

    2、文件事件

    下面来看下aeFileEvent内部结构:

    typedef struct aeFileEvent {
    // 监听事件类型掩码,
    // 值可以是 AE_READABLE 或 AE_WRITABLE ,
    // 或者 AE_READABLE | AE_WRITABLE
    int mask; /* one of AE_(READABLE|WRITABLE) */
    // 读事件处理器
    aeFileProc *rfileProc;
    // 写事件处理器
    aeFileProc *wfileProc;
    // 多路复用库的私有数据
    void *clientData;
    } aeFileEvent;
    

    (1)共有两种类型的文件事件:AE_READABLE和AE_WRITABLE类型。
    (2)两个函数指针:一个是处理读事件的函数指针,一个是处理写事件的函数指针。
    创建文件事件
    以下三种场景会创建文件事件:
    1、当有client申请socket连接时,会注册一个AE_READABLE类型的文件事件。
    2、当接受client命令请求时,会注册一个AE_READABLE类型的文件事件。
    3、当返回命令处理结果时,会注册一个AE_WRITABLE类型的文件事件。
    (1)socket连接
    创建一个AE_READABLE类型的文件事件,并注册事件处理函数指针acceptTcpHandler

    // 为 TCP 连接关联连接应答(accept)处理器
    // 用于接受并应答客户端的 connect() 调用
    for (j = 0; j < server.ipfd_count; j++) {
    if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
    acceptTcpHandler,NULL) == AE_ERR)
    {
    redisPanic(
    "Unrecoverable error creating server.ipfd file event.");
    }
    }
    

    (2)接受client命令请求
    创建Readable类型的时间,并注册事件处理函数readQueryFromClient

    // 绑定读事件到事件 loop (开始接收命令请求)
    if (aeCreateFileEvent(server.el,fd,AE_READABLE,
    readQueryFromClient, c) == AE_ERR)
    {
    close(fd);
    zfree(c);
    return NULL;
    }
    

    (3)返回命令处理结果
    创建Writable类型的文件事件,并注册事件处理函数sendReplyToClient

    int prepareClientToWrite(redisClient *c) {
    ...
    // 一般情况,为客户端套接字安装写处理器到事件循环
    if (c->bufpos == 0 && listLength(c->reply) == 0 &&
    (c->replstate == REDIS_REPL_NONE ||
    c->replstate == REDIS_REPL_ONLINE) &&
    aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
    sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
    return REDIS_OK;
    }
    

    3、时间事件

    3.1、processTimeEvents中处理时间事件

    static int processTimeEvents(aeEventLoop *eventLoop) {
    ...
    // 通过重置事件的运行时间,防止因系统时间被修改而造成的事件处理混乱
    if (now < eventLoop->lastTime) {
    te = eventLoop->timeEventHead;
    while(te) {
    te->when_sec = 0;
    te = te->next;
    }
    }
    // 更新最后一次处理时间事件的时间
    eventLoop->lastTime = now;
    // 遍历链表,执行那些已经就绪的事件
    te = eventLoop->timeEventHead;
    maxId = eventLoop->timeEventNextId-1;
    while(te) {
    long now_sec, now_ms;
    long long id;
    // 跳过无效事件
    if (te->id > maxId) {
    te = te->next;
    continue;
    }
    // 获取当前时间
    aeGetTime(&now_sec, &now_ms);
    // 如果当前时间等于或等于事件的执行时间,那么说明事件已就绪,执行这个事件
    if (now_sec > te->when_sec ||
    (now_sec == te->when_sec && now_ms >= te->when_ms))
    {
    int retval;
    id = te->id;
    // 执行事件处理器,并获取返回值
    retval = te->timeProc(eventLoop, id, te->clientData);
    processed++;
    // 记录是否有需要循环执行这个事件时间
    if (retval != AE_NOMORE) {
    // 是, retval 毫秒之后继续执行这个时间事件
    aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
    } else {
    // 否,将这个事件删除
    aeDeleteTimeEvent(eventLoop, id);
    }
    // 因为执行事件之后,事件列表可能已经被改变了
    // 因此需要将 te 放回表头,继续开始执行事件
    te = eventLoop->timeEventHead;
    } else {
    te = te->next;
    }
    }
    return processed;
    }
    

    (1)首先判断系统时间是否被重新设置过
    如果系统时间先被设置成了未来的时间,又调成到正确的时间时,为了防止部分事件延迟执行,这里会强制执行所有的时间事件。
    (2)判断时间事件是否已到达
    如果当前时间大于或等于事件的执行时间,说明事件已到达,则执行事件。事件的执行由创建时间事件时传入的函数指针te->timeProc负责。
    server启动时会注册一个时间事件,并传入事件处理函数serverCron
    (3)判断该时间事件是否需要循环执行
    timeProc函数的返回值retval为时间事件执行的时间间隔
    如果retval != AE_MORE,则修改当前事件下次执行时间,并在retval间隔之后再次执行。
    如果retval == AE_MORE,则删除当前事件。

    3.2、ServerCron

    serverCron是时间事件的具体执行函数,具体工作主要有:

    int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    ...
    /*更新server的统计信息 */
    updateCachedTime();
    // 记录服务器执行命令的次数
    run_with_period(100) trackOperationsPerSecond();
    server.lruclock = getLRUClock();
    if (zmalloc_used_memory() > server.stat_peak_memory)
    server.stat_peak_memory = zmalloc_used_memory();
    /* Sample the RSS here since this is a relatively slow call. */
    server.resident_set_size = zmalloc_get_rss();
    // 检查客户端,关闭超时客户端,并释放客户端多余的缓冲区
    clientsCron();
    // 对数据库执行各种操作
    databasesCron();
    // 如果 BGSAVE 和 BGREWRITEAOF 都没有在执行
    // 并且有一个 BGREWRITEAOF 在等待,那么执行 BGREWRITEAOF
    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
    server.aof_rewrite_scheduled)
    {
    rewriteAppendOnlyFileBackground();
    }
    // 检查 BGSAVE 或者 BGREWRITEAOF 是否已经执行完毕
    if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) {
    ...
    } else {
    // 遍历所有保存条件,看是否需要执行 BGSAVE 命令
    for (j = 0; j < server.saveparamslen; j++) {
    ...
    }
    /* Trigger an AOF rewrite if needed */
    if (server.rdb_child_pid == -1 &&
    server.aof_child_pid == -1 &&
    server.aof_rewrite_perc &&
    // AOF 文件的当前大小大于执行 BGREWRITEAOF 所需的最小大小
    server.aof_current_size > server.aof_rewrite_min_size)
    {
    ...
    }
    }
    /* AOF postponed flush: Try at every cron cycle if the slow fsync
    * completed. */
    if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
    //关闭那些需要异步关闭的客户端
    freeClientsInAsyncFreeQueue();
    /* Clear the paused clients flag if needed. */
    clientsArePaused(); /* Don't check return value, just use the side effect. */
    // 重连接主服务器、向主服务器发送 ACK 、判断数据发送失败情况、断开本服务器超时的从服务器,等等
    run_with_period(1000) replicationCron();
    /* Run the Redis Cluster cron. */
    // 如果服务器运行在集群模式下,那么执行集群操作
    run_with_period(100) {
    if (server.cluster_enabled) clusterCron();
    }
    /* Run the Sentinel timer if we are in sentinel mode. */
    // 如果服务器运行在 sentinel 模式下,那么执行 SENTINEL 的主函数
    run_with_period(100) {
    if (server.sentinel_mode) sentinelTimer();
    }
    ....
    return 1000/server.hz;
    }
    

    (1)更新server的统计信息
    (2)关闭已断开连接的client,并释放client占用的空间
    (3)删除数据库过期键
    (4)执行AOF或RDB持久化操作
    (5)进行主从服务器同步
    (6)如果是集群模式,则对集群进行同步和连接测试

    相关文章

      网友评论

        本文标题:Redis事件

        本文链接:https://www.haomeiwen.com/subject/gdzwfxtx.html