美文网首页JavaJava 程序员
Redis 源码分析——定时任务原理

Redis 源码分析——定时任务原理

作者: 程序花生 | 来源:发表于2022-04-19 16:57 被阅读0次

    本文主要是基于 redis 6.2 源码进行分析定时事件的数据结构和常见操作。

    数据结构

    在 redis 中通过 aeTimeEvent 结构来创建定时任务事件,代码如下:

    /* Time event structure */
    typedef struct aeTimeEvent {
        // 标识符
        long long id; /* time event identifier. */
        // 定时纳秒数
        monotime when;
        // 定时回调函数
        aeTimeProc *timeProc;
        // 注销定时器时候的回调函数
        aeEventFinalizerProc *finalizerProc;
        void *clientData;
        struct aeTimeEvent *prev;
        struct aeTimeEvent *next;
        int refcount; /* refcount to prevent timer events from being
               * freed in recursive time event calls. */
    } aeTimeEvent;
    

    常见操作

    1. 创建定时事件

    redis 中最重要的定时函数且是周期执行的函数,使用的是 serverCron 函数。在 redis 中由于定时任务比较少,因此并没有严格的按照过期时间来排序的,而是按照 id自增 + 头插法 来保证基本有序。

    if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
      serverPanic("Can't create event loop timers.");
      exit(1);
    }
    
    //创建定时器对象
    long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
            aeTimeProc *proc, void *clientData,
            aeEventFinalizerProc *finalizerProc)
    {
        long long id = eventLoop->timeEventNextId++;
        aeTimeEvent *te;
    
        te = zmalloc(sizeof(*te));
        if (te == NULL) return AE_ERR;
        te->id = id;
        te->when = getMonotonicUs() + milliseconds * 1000;
        te->timeProc = proc;
        te->finalizerProc = finalizerProc;
        te->clientData = clientData;
        te->prev = NULL;
        // 头插法 
        te->next = eventLoop->timeEventHead;
        te->refcount = 0;
        if (te->next)
            te->next->prev = te;
        eventLoop->timeEventHead = te;
        return id;
    }
    

    数据结构如下图所示:

    2. 触发定时事件

    redis 中是采用 IO 复用来进行定时任务的。

    • 查找距离现在最近的定时事件,见 usUntilEarliestTimer 函数:
    /* How many microseconds until the first timer should fire.
     * If there are no timers, -1 is returned.
     *
     * Note that's O(N) since time events are unsorted.
     * Possible optimizations (not needed by Redis so far, but...):
     * 1) Insert the event in order, so that the nearest is just the head.
     *    Much better but still insertion or deletion of timers is O(N).
     * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)).
     */
    static int64_t usUntilEarliestTimer(aeEventLoop *eventLoop) {
        aeTimeEvent *te = eventLoop->timeEventHead;
        if (te == NULL) return -1;
    
        aeTimeEvent *earliest = NULL;
        while (te) {
            if (!earliest || te->when < earliest->when)
                earliest = te;
            te = te->next;
        }
    
        monotime now = getMonotonicUs();
        return (now >= earliest->when) ? 0 : earliest->when - now;
    }
    

    这里时间复杂度可能比较高,实际中需要结合具体场景使用。

    • 更新剩余过期时间,想想为啥呢?因为我们前面提到过,IO 复用有可能因为 IO 事件返回,所以需要更新。
    if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
      usUntilTimer = usUntilEarliestTimer(eventLoop);
    
    if (usUntilTimer >= 0) {
      tv.tv_sec = usUntilTimer / 1000000;
      tv.tv_usec = usUntilTimer % 1000000;
      tvp = &tv;
    } else {
      if (flags & AE_DONT_WAIT) {
        // 不等待
        tv.tv_sec = tv.tv_usec = 0;
        tvp = &tv;
      } else {
        /* Otherwise we can block */
        tvp = NULL; /* wait forever */
      }
    }
    

    3. 执行定时事件

    一次性的执行完直接删除,周期性的执行完在重新添加到链表。

    /* Process time events */
    static int processTimeEvents(aeEventLoop *eventLoop) {
      int processed = 0;
      aeTimeEvent *te;
      long long maxId;
    
      te = eventLoop->timeEventHead;
      maxId = eventLoop->timeEventNextId-1;
      monotime now = getMonotonicUs();
    
      // 删除定时器
      while(te) {
        long long id;
    
        // 下一轮中对事件进行删除
        /* Remove events scheduled for deletion. */
        if (te->id == AE_DELETED_EVENT_ID) {
          aeTimeEvent *next = te->next;
          /* If a reference exists for this timer event,
                 * don't free it. This is currently incremented
                 * for recursive timerProc calls */
          if (te->refcount) {
            te = next;
            continue;
          }
          if (te->prev)
            te->prev->next = te->next;
          else
            eventLoop->timeEventHead = te->next;
          if (te->next)
            te->next->prev = te->prev;
          if (te->finalizerProc) {
            te->finalizerProc(eventLoop, te->clientData);
            now = getMonotonicUs();
          }
          zfree(te);
          te = next;
          continue;
        }
    
        if (te->id > maxId) {
          te = te->next;
          continue;
        }
    
        if (te->when <= now) {
          int retval;
    
          id = te->id;
          te->refcount++;
          // timeProc 函数返回值 retVal 为时间事件执行的间隔
          retval = te->timeProc(eventLoop, id, te->clientData);
          te->refcount--;
          processed++;
          now = getMonotonicUs();
          if (retval != AE_NOMORE) {
            te->when = now + retval * 1000;
          } else {
            // 如果超时了,那么标记为删除
            te->id = AE_DELETED_EVENT_ID;
          }
        }
        // 执行下一个
        te = te->next;
      }
      return processed;
    }
    

    总结

    1. 优点:实现简单
    2. 缺点:如果定时任务很多,效率比较低。

    作者:心城以北
    链接:https://juejin.cn/post/7072016644272291847
    来源:稀土掘金

    相关文章

      网友评论

        本文标题:Redis 源码分析——定时任务原理

        本文链接:https://www.haomeiwen.com/subject/tcasertx.html