本文主要是基于 redis 6.2
源码进行分析定时事件的数据结构和常见操作。
数据结构
在 redis 中通过 aeTimeEvent
结构来创建定时任务事件,代码如下:
/* Time event structure */
typedef struct aeTimeEvent {
// 标识符
long long id; /* time event identifier. */
// 定时纳秒数
monotime when;
// 定时回调函数
aeTimeProc *timeProc;
// 注销定时器时候的回调函数
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *prev;
struct aeTimeEvent *next;
int refcount; /* refcount to prevent timer events from being
* freed in recursive time event calls. */
} aeTimeEvent;
常见操作
1. 创建定时事件
redis 中最重要的定时函数且是周期执行的函数,使用的是 serverCron 函数。在 redis 中由于定时任务比较少,因此并没有严格的按照过期时间来排序的,而是按照 id自增
+ 头插法
来保证基本有序。
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
//创建定时器对象
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc)
{
long long id = eventLoop->timeEventNextId++;
aeTimeEvent *te;
te = zmalloc(sizeof(*te));
if (te == NULL) return AE_ERR;
te->id = id;
te->when = getMonotonicUs() + milliseconds * 1000;
te->timeProc = proc;
te->finalizerProc = finalizerProc;
te->clientData = clientData;
te->prev = NULL;
// 头插法
te->next = eventLoop->timeEventHead;
te->refcount = 0;
if (te->next)
te->next->prev = te;
eventLoop->timeEventHead = te;
return id;
}
数据结构如下图所示:
2. 触发定时事件
redis 中是采用 IO 复用来进行定时任务的。
- 查找距离现在最近的定时事件,见
usUntilEarliestTimer
函数:
/* How many microseconds until the first timer should fire.
* If there are no timers, -1 is returned.
*
* Note that's O(N) since time events are unsorted.
* Possible optimizations (not needed by Redis so far, but...):
* 1) Insert the event in order, so that the nearest is just the head.
* Much better but still insertion or deletion of timers is O(N).
* 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)).
*/
static int64_t usUntilEarliestTimer(aeEventLoop *eventLoop) {
aeTimeEvent *te = eventLoop->timeEventHead;
if (te == NULL) return -1;
aeTimeEvent *earliest = NULL;
while (te) {
if (!earliest || te->when < earliest->when)
earliest = te;
te = te->next;
}
monotime now = getMonotonicUs();
return (now >= earliest->when) ? 0 : earliest->when - now;
}
这里时间复杂度可能比较高,实际中需要结合具体场景使用。
- 更新剩余过期时间,想想为啥呢?因为我们前面提到过,IO 复用有可能因为 IO 事件返回,所以需要更新。
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
usUntilTimer = usUntilEarliestTimer(eventLoop);
if (usUntilTimer >= 0) {
tv.tv_sec = usUntilTimer / 1000000;
tv.tv_usec = usUntilTimer % 1000000;
tvp = &tv;
} else {
if (flags & AE_DONT_WAIT) {
// 不等待
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
}
3. 执行定时事件
一次性的执行完直接删除,周期性的执行完在重新添加到链表。
/* Process time events */
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
monotime now = getMonotonicUs();
// 删除定时器
while(te) {
long long id;
// 下一轮中对事件进行删除
/* Remove events scheduled for deletion. */
if (te->id == AE_DELETED_EVENT_ID) {
aeTimeEvent *next = te->next;
/* If a reference exists for this timer event,
* don't free it. This is currently incremented
* for recursive timerProc calls */
if (te->refcount) {
te = next;
continue;
}
if (te->prev)
te->prev->next = te->next;
else
eventLoop->timeEventHead = te->next;
if (te->next)
te->next->prev = te->prev;
if (te->finalizerProc) {
te->finalizerProc(eventLoop, te->clientData);
now = getMonotonicUs();
}
zfree(te);
te = next;
continue;
}
if (te->id > maxId) {
te = te->next;
continue;
}
if (te->when <= now) {
int retval;
id = te->id;
te->refcount++;
// timeProc 函数返回值 retVal 为时间事件执行的间隔
retval = te->timeProc(eventLoop, id, te->clientData);
te->refcount--;
processed++;
now = getMonotonicUs();
if (retval != AE_NOMORE) {
te->when = now + retval * 1000;
} else {
// 如果超时了,那么标记为删除
te->id = AE_DELETED_EVENT_ID;
}
}
// 执行下一个
te = te->next;
}
return processed;
}
总结
- 优点:实现简单
- 缺点:如果定时任务很多,效率比较低。
作者:心城以北
链接:https://juejin.cn/post/7072016644272291847
来源:稀土掘金
网友评论