美文网首页
[redis 源码走读] 事件 - 定时器

[redis 源码走读] 事件 - 定时器

作者: wenfh2020 | 来源:发表于2020-05-18 19:02 被阅读0次

    定时器是 redis 异步处理事件的一个十分重要的功能。redis 定时器功能由多个时间事件组成,事件由一个双向链表维护。时间事件可以处理多个定时任务。


    理解 redis 定时器,我们带着问题,看看 redis 是怎么处理的:

    • 定时器作用是什么。
    • 定时器实现原理。
    • 单进程里如何同时处理文件事件和时间事件。
    • 如何实现多定时任务。

    🔥文章来源:wenfh2020.com


    1. 作用

    定时器是 redis 异步处理任务的一个十分重要的功能。核心逻辑在 serverCron 函数里。

    • 对设置了过期时间的数据进行检查回收。
    • 异步回收需要关闭的链接(socket)。
    • 检查 fork 的子进程是否已经关闭,处理回收的相关工作。
    • 检查内存数据是否符合 rdb 持久化快照落地条件,fork 子进程进行快照保存。
    • bgsave rdb 生成快照或 bgrewriteaof aof 重写延后操作。
    • 对需要扩容和缩容的哈希表(dict)进行数据迁移。
    • 集群里节点间的断线重连。
    • redis 服务的一些信息统计。
    • ...等等。

    2. 定时器实现原理

    redis 定时器功能由多个时间事件组成,事件由一个双向链表维护。有些实现逻辑,上文已经提到,下面就简单说一下部分实现原理。


    2.1. 事件结构

    • 事件。
    // 时间事件
    typedef struct aeTimeEvent {
        long long id; /* time event identifier. */
        long when_sec; /* seconds */
        long when_ms; /* milliseconds */
        aeTimeProc *timeProc; /* 时钟到期事件触发回调处理函数。*/
        aeEventFinalizerProc *finalizerProc; /* 时间事件删除时,触发回调。*/
        void *clientData; /* 扩展参数,异步操作方便数据回调,在 timeProc 通过参数回传。*/
        struct aeTimeEvent *prev; /* 时间事件是一个双向链表。*/
        struct aeTimeEvent *next;
    } aeTimeEvent;
    
    // 事件管理
    typedef struct aeEventLoop {
        ...
        long long timeEventNextId;  // 时间事件下一个 id (通过 ‘++’ 递增)
        ...
        aeTimeEvent *timeEventHead; // 时间事件链表。
        ...
    } aeEventLoop;
    
    • 事件循环。
    // 循环处理事件。
    void aeMain(aeEventLoop *eventLoop) {
        eventLoop->stop = 0;
        while (!eventLoop->stop) {
            if (eventLoop->beforesleep != NULL)
                eventLoop->beforesleep(eventLoop);
            aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
        }
    }
    
    • 事件回调函数。
    
    // 时间事件触发处理函数。
    typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
    
    // 时间事件处理完毕,被删除时,触发的回调处理。
    typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
    
    • 创建时间事件,时间事件通过双向链表管理,新的事件插入到链表头。
    long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
            aeTimeProc *proc, void *clientData,
            aeEventFinalizerProc *finalizerProc) {
        // 事件 id 递增。
        long long id = eventLoop->timeEventNextId++;
        aeTimeEvent *te;
    
        te = zmalloc(sizeof(*te));
        if (te == NULL) return AE_ERR;
        te->id = id;
        // 设置到期时间。
        aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
        te->timeProc = proc;
        te->finalizerProc = finalizerProc;
        te->clientData = clientData;
        te->prev = NULL;
        te->next = eventLoop->timeEventHead;
        if (te->next)
            te->next->prev = te;
        eventLoop->timeEventHead = te;
        return id;
    }
    
    • 设置事件到期时间。
    static void aeAddMillisecondsToNow(long long milliseconds, long *sec, long *ms) {
        long cur_sec, cur_ms, when_sec, when_ms;
    
        // 当前时间增加到期时间间隔。
        aeGetTime(&cur_sec, &cur_ms);
        when_sec = cur_sec + milliseconds/1000;
        when_ms = cur_ms + milliseconds%1000;
        if (when_ms >= 1000) {
            when_sec ++;
            when_ms -= 1000;
        }
        *sec = when_sec;
        *ms = when_ms;
    }
    

    2.2. 定时器执行流程

    • redis 启动添加时钟处理事件。
    // ae.c
    int main(int argc, char **argv) {
        ...
        initServer();
        ...
        aeMain(server.el);
        ...
    }
    
    void initServer(void) {
        ...
        // 创建定时事件,绑定回调函数。
        if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
            serverPanic("Can't create event loop timers.");
            exit(1);
        }
        ...
    }
    
    // 时钟回调处理函数
    int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
        ...
    }
    
    • 事件循环处理。
    // 循环处理事件。
    void aeMain(aeEventLoop *eventLoop) {
        eventLoop->stop = 0;
        while (!eventLoop->stop) {
            if (eventLoop->beforesleep != NULL)
                eventLoop->beforesleep(eventLoop);
            aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
        }
    }
    
    // 处理时间事件
    int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
        ...
         /* Check time events */
        if (flags & AE_TIME_EVENTS)
            // 处理时间事件
            processed += processTimeEvents(eventLoop);
        ...
    }
    

    3. 单进程异步处理事件逻辑

    进程通过循环,不停地处理时间事件和文件事件。


    redis 单进程处理文件事件和时间事件
    • 在进程的循环中不停地捞出文件和时间事件进行处理 aeProcessEvents
    // ae.c
    int main(int argc, char **argv) {
        ...
        aeMain(server.el);
        ...
    }
    
    // 循环处理事件。
    void aeMain(aeEventLoop *eventLoop) {
        eventLoop->stop = 0;
        while (!eventLoop->stop) {
            ...
            aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
        }
    }
    
    // 处理事件
    int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
        ...
        // 先搜索出最快到期的定时器,查看时间戳,文件事件要在定时器到期前从系统内核捞出来处理。
        shortest = aeSearchNearestTimer(eventLoop);
        ...
        // 处理文件事件,等待获取事件时间间隔不能太长,否则定时器事件处理要超时了。
        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
            ...
        }
        ...
        // 处理时间事件
        if (flags & AE_TIME_EVENTS)
            processed += processTimeEvents(eventLoop);
        ...
    }
    
    • 对于文件事件,在 linux 系统中,redis 采用了 epoll 多路复用 I/O 事件驱动处理文件事件。通过 epoll_wait 捞出就绪事件进行处理。
    static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
        ...
        retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
                tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
        ...
    }
    
    • 先处理完文件事件,再处理时间事件。
    static int processTimeEvents(aeEventLoop *eventLoop) {
        ...
    }
    

    4. 多定时任务

    我们看看 processTimeEvents 是如何处理多个定时任务的:

    redis 多定时任务
    • 遍历时间事件链表,先删除已处理,被标识需要删除的事件,再执行到期事件。
    static int processTimeEvents(aeEventLoop *eventLoop) {
        int processed = 0;
        aeTimeEvent *te;
        long long maxId;
        time_t now = time(NULL);
        ...
        eventLoop->lastTime = now;
    
        te = eventLoop->timeEventHead;
        maxId = eventLoop->timeEventNextId-1;
        // 遍历时间链表,处理到期执行的时间事件。
        while(te) {
            long now_sec, now_ms;
            long long id;
    
            // 从时间事件链表中,删除时被标识为删除状态的事件。
            if (te->id == AE_DELETED_EVENT_ID) {
                aeTimeEvent *next = te->next;
                if (te->prev)
                    te->prev->next = te->next;
                else
                    eventLoop->timeEventHead = te->next;
                if (te->next)
                    te->next->prev = te->prev;
                if (te->finalizerProc)
                    te->finalizerProc(eventLoop, te->clientData);
                zfree(te);
                te = next;
                continue;
            }
            ...
            // 时间事件到期,执行事件。
            aeGetTime(&now_sec, &now_ms);
            if (now_sec > te->when_sec ||
                (now_sec == te->when_sec && now_ms >= te->when_ms)) {
                int retval;
    
                id = te->id;
                // 执行时钟回调函数
                retval = te->timeProc(eventLoop, id, te->clientData);
                processed++;
                // 如果回调函数不返回 AE_NOMORE,重新更新该事件的到期时间,等待下次触发,否则标识事件为删除状态。
                if (retval != AE_NOMORE) {
                    aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
                } else {
                    te->id = AE_DELETED_EVENT_ID;
                }
            }
            te = te->next;
        }
        return processed;
    }
    
    • 时间事件定时执行原理。

      到期事件回调处理函数 timeProc,例如 redis 对应的处理函数 serverCronserverCron 返回下一次到期的时间间隔,事件到期时间被(aeAddMillisecondsToNow)修改延后一个时间间隔,下一次到期再重新执行,从而达到时钟定期执行的效果。

    int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
        ...
        // 返回时间间隔,单位毫秒。
        return 1000/server.hz;
    }
    
    • 定时器定时执行频率。

      很多后台任务都在定时器里执行,定时执行频率可以由配置文件的 hz 频率控制,时间事件 (1000/hz) 毫秒执行一次,频率越高,到期时间间隔越小,刷得越快,定时后台任务处理得越快,但是这样也会相应地损耗更多的系统资源,而且定时事件和文件事件是在同一个进程中进行的,这样肯定会影响到文件事件执行。一般情况下,系统默认一秒定时执行 10 次,也就是 hz == 10

    # redis.conf
    
    # 定时器事件刷新频率 1 < hz < 500,默认 10
    hz 10
    
    • 多定时任务

      事件里有不同的定时任务,它们定时执行的任务有快有慢,那对于多个定时任务,在时间事件触发后,它是如何处理的呢?
      可以通过宏 run_with_period 处理。当时间间隔 _ms_ 很小的时候,每次触发时间事件,任务都会执行,否则通过记录事件触发的次数 server.cronloops++,当 hz 触发的事件时间间隔累积起来达到长时间间隔,就执行慢任务。(参考上图)

    struct redisServer {
        ...
        int cronloops;              /* Number of times the cron function run */
        ...
    }
    
    #define run_with_period(_ms_) if ((_ms_ <= 1000/server.hz) || !(server.cronloops%((_ms_)/(1000/server.hz))))
    
    int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
        ...
        // 快任务,时间间隔 <= 时间事件触发时间间隔,执行。
        run_with_period(100) {
            ...
        }
        ...
        // 慢任务,定时时间间隔比较长的,需要通过 server.cronloops 累加达到长时间间隔,才会执行慢任务。
        run_with_period(5000) {
            ...
        }
    
        // 每次触发定时事件,都会执行。
        /* We need to do a few operations on clients asynchronously. */
        clientsCron();
    
        /* Handle background operations on Redis databases. */
        databasesCron();
        ...
        server.cronloops++;
        ...
    }
    

    5. 总结

    • 定时器是由时间事件组成的,目前,redis 核心的定时事件只有一个,核心逻辑都在 serverCron 函数里。
    • 定时器定时触发频率受配置 hz 影响,可以通过修改该配置项,调整定时处理速度。
    • 多定时任务,其实在同一个定时器时间事件里处理。
    • 定时事件和文件事件都在同一个进程中执行,所以虽然定时事件时间精度是毫秒,但是不一定会十分精确,会受到文件事件处理影响。
    • 定时事件执行还会受到定时器里的任务处理影响,例如系统在一个时间段内很多过期数据,那么系统有可能会分配更多的时间片去处理。
    • 基于 redis 主逻辑都在单进程主线程中实现,所以定时任务不能执行太长的时间,所以很多复杂的定时任务,都会限制处理数量和处理时间。(例如字典 dict 的扩容和缩容,maxmemory 数据淘汰策略等等。)

    7. 后记

    通读一个知识点后,知识在脑海中是模糊的,需要通过不同方式去强化清晰这个脑海中的映像。让抽象思维落地,我自己会经常将一些知识点图形化,这样一点一点地将知识碎片拼接起来。


    图形化

    相关文章

      网友评论

          本文标题:[redis 源码走读] 事件 - 定时器

          本文链接:https://www.haomeiwen.com/subject/iuypohtx.html