美文网首页redis
基于 Redis 的任务排队时间统计

基于 Redis 的任务排队时间统计

作者: Maslino | 来源:发表于2016-11-20 14:06 被阅读635次

    问题

    基于 Redis 的优先级队列这篇文章中,我们使用 sorted set 实现了一个优先级队列。现在,考虑如下问题:

    在一个任务调度系统中,任务进入队列排队,然后等待被调度出队列。为了衡量调度效率,一个指标是任务的平均排队时间。那么,如何统计每个任务的排队时间以及这些任务的平均排队时间呢?

    解决方案

    为了统计每个任务的排队时间,我们需要知道任务的入队时间以及出队时间,这两个时间的差值就是这个任务的排队时间。我们仍然使用 sorted set 来记录任务的入队或出队时间,只不过这个时候 sorted set 中元素的 score 不再是优先级,而是任务的入队或出队时间。

    代码实现

    下面是一个可能的代码实现:

    class QueuingTimeStat(object):
    
        def __init__(self, redis_client, redis_key, namespace='schedule'):
            self.client = redis_client
            self.key = '%s:%s' % (namespace, redis_key)
    
        def _clean(self):
            """清理一天之前的记录"""
            now = time.time()
            self.client.zremrangebyscore(self.key, 0, now - 3600 * 24)
    
        def add_start(self, task_id, timestamp=None):
            self._clean()
            member = '%s:%s' % (task_id, 'start')
            timestamp = time.time() if timestamp is None else timestamp
            self.client.zadd(self.key, timestamp, member)
    
        def add_end(self, task_id, timestamp=None):
            self._clean()
            member = '%s:%s' % (task_id, 'end')
            timestamp = time.time() if timestamp is None else timestamp
            self.client.zadd(self.key, timestamp, member)
    
        def average_delay(self, window):
            now = time.time()
            results = self.client.zrangebyscore(self.key, now - window, now, withscores=True)
            # [(value, score), ...]
            sorted_results = sorted(results, key=lambda x: x[0])
            group_results = groupby(sorted_results, key=lambda x: x[0].rsplit(':', 1)[0])
            total = 0.0
            count = 0
            for task_id, sub_group in group_results:
                pairs = tuple(sub_group)
                if len(pairs) == 2:
                    count += 1
                    total += abs(pairs[0][1] - pairs[1][1])
                else:
                    if pairs[0][0].endswith('start'):
                        count += 1
                        total += now - pairs[0][1]
            return total / count if count > 0 else 0
    

    在上面的代码中,task_id 是任务的唯一标识。task_id:start 用于记录任务的入队时间,task_id:end 用于记录任务的出队时间。average_delay 用于统计最近任务的平均排队时间,该方法提供了时间窗口 window 参数,时间窗口越大,则平均排队时间变化曲线越平滑。另外,为了降低对内存的消耗,每次记录入队或出队时间的时候,会清理一天之前的入队出队时间记录。

    相关文章

      网友评论

        本文标题:基于 Redis 的任务排队时间统计

        本文链接:https://www.haomeiwen.com/subject/xgqypttx.html