美文网首页队列redis
翻译-使用redis做任务队列

翻译-使用redis做任务队列

作者: 岳艳涛 | 来源:发表于2016-08-31 17:09 被阅读125次

前段时间,在工作中我们使用azure storage队列作为任务队列引擎,但过段时间后我们发现它并没有我们希望的那么快,随之,我们集中地使用redis并开始考虑将redis作为任务队列。虽然有许多的文档介绍如何使用redis的发布/订阅服务,但使用redis作为任务队列的屈指可数,所以我决定来描述如何去做。

什么是任务队列?

任务队列允许某些服务的客户端异步发送任务给它。通常服务有许多clients,可能许多处理的workers。总之整个工作流程是这样的:

  1. client将任务放到队列中
  2. workers定期循环检查队列中的新任务,如何存在,则执行任务
    但也有一个队列一些额外的要求:
  3. 服务质量:client不应该阻塞其他client的请求。
  4. 批处理:clients & workers应该能够获得多个任务以获得更好的性能。
  5. 可靠性:如果worker在处理task失败时,该任务可以被其他worker再次处理。
  6. 死信:如果某些任务被多次尝试后失败,它可以放在死信存储。
  7. 一个任务仅可以被成功处理一次

每个客户端将使用一个redis list,list key将使用一个任务队列名作为前缀,第二部分将是client id。当client准备把消息放入队列前,会将队列名和它自己的ID的进行关联成列表键。我们将使用Redis的list为每个客户端。列表键将使用一个任务队列名作为前缀和第二部分将是客户端ID。当客户希望将消息放入队列会由队列名和它自己的ID的级联计算列表键。会有很多的list存入单独的redis db中,但在该情况下,我们须共用一个redis db和其他一些代码,同时允许在它们的名字前添加额外的前缀,如:"queues:"。因此,我们定义一个类RedisQueue来隐藏这些细节。

import json
import datetime
import pytz
from random import randint
import logging
import time

main_prefix = "bqueues:"

class ClientRedisQueue():
    def __init__(self, qname, clientid, redis):
        self.client_id = clientid
        self.queue_name = main_prefix + qname + ":" + clientid
        logging.debug("created queue %s" % self.queue_name)
        self.redis = redis

    def send(self, msgs):
        jmsgs = [json.dumps({ "client_id" : self.client_id, "msg" :msg, "attempts" : 0}) for msg in msgs]
        self.redis.lpush(self.queue_name, *jmsgs)

    def exists(self):
        return self.redis.exists(self.queue_name)

    def length(self):
        return self.redis.llen(self.queue_name) 

    def delete(self):
        self.redis.delete(self.queue_name)

r = redis.StrictRedis("localhost")
cq = ClientRedisQueue("worker1", "client", r)

cq2 = ClientRedisQueue("worker1", "client2", r)
cq.send([1,2])
cq2.send([3,4,0]) 

所以发送端容易实现,那接受端呢?首先,我们需要找到所有队列列表list。有三种方式:

  1. 使用KEYS“prefix:*"命令, 该命令能够列出来所有列表。但这个命令可能会导致生产出现严重的问题,当针对大型数据库中执行它可能毁性能。所以永远不要使用此方式。
  2. 使用SCAN命令, 该命令的作用相当于上一条命令,但没有性能问题。
  3. 使用redis set存储所有list名字,即发送消息时将list名字添加到redis set中,当消息被处理时,删除名字。不幸的是,该步需要额外的代码来实现,所以我们将使用第二个选项。

当我们发现的所有的队列,我们​​需要他们随机排序以保证所有的list以相同的概率处理。之后,我们需通过redis pipeline的方式,一次处理一批大量的消息,随后,如果没有找到的消息,我们需要删除它们。此外,我们需要防止消息的双重处理列表,并防止消息因失败等异常情况造成的消息丢失。要做到这一点,我们将使用RPOPLPUSH命令,它以原子从列表中删除的消息,并把它变成一个额外的“processing”列表,并返回至调用者。因此,我们将使用其他列表中为每个队列与关键“processing:queue_name”。消息处理后,我们必须从prccessing列表中删除。但在几次不成功的尝试过程中消息的情况下,我们需要最终将其移动到死信中。并将之设置为:"dead:queue_name"。不时,我们需要检查的处理列表,如果算上尝试的消息低于允许最大计数然后把它返回到客户端列表或在其他情况下,把它设置成一纸空文。

AX_ATTEMPTS_COUNT = 4
class WorkerRedisQueue():
    def __init__(self, name, redis):
        self.queue_name = main_prefix + name
        self.processing_lname = main_prefix + "processing:" + name
        self.dead_sname = main_prefix + "dead:" + name
        self.refresh_period = datetime.timedelta(seconds=2)
        self.queue_pattern = self.queue_name + "*"
        self.redis = redis
        self.last_refresh_time = datetime.datetime.now(pytz.utc) - self.refresh_period - self.refresh_period
        self.list_names = []

    def proccessed(self, msg):
        self.redis.lrem(self.processing_lname, 0, json.dumps(msg))

    # start this from time to time
    def recover(self):
        recovered = 0
        proc_msgs = self.redis.lrange(self.processing_lname, -5,-1)
        for (msg, orig) in [(json.loads(msg),msg) for msg in proc_msgs if msg]:
            if msg["attempts"] > MAX_ATTEMPTS_COUNT:
                print "found dead letter"
                self.redis.sadd(self.dead_sname, orig)
            else:
                print "recovering"
                recovered = recovered + 1
                msg["attempts"] = msg["attempts"] + 1
                self.redis.lpush("%s:%s" % (self.queue_name, msg["client_id"]), json.dumps(msg))
            self.redis.lrem(self.processing_lname, 0, orig)

        return recovered

    def get_list_names(self):
        lists = []
        print "searching pattern", self.queue_pattern
        for l in self.redis.scan_iter(self.queue_pattern):
            print "found list", l
            lists.append(l)
        return lists

    def refresh(self, force = False):
        now = datetime.datetime.now(pytz.utc)
        time_to_refresh = now - self.last_refresh_time > self.refresh_period
        if force or time_to_refresh:
            self.list_names = self.get_list_names()
            self.last_refresh_time = now
        else:
            print "skip refresh"

    def receive(self, msg_count):
        self.refresh()
        count = len(self.list_names)
        if count == 0:
            print "queues not found"
            return []
        p = self.redis.pipeline()
        for i in range(msg_count):
            l = self.list_names[randint(0, count - 1)]
            p.rpoplpush(l,self.processing_lname)
        msgs = p.execute()
        return [json.loads(msg) for msg in msgs if msg]

    def length(self):
        self.refresh(True)
        res = 0
        for l in self.list_names:
            res = res + self.redis.llen(l)
        return res

wq = WorkerRedisQueue("worker1", r)
while(True):
    time.sleep(1)
    msgs = wq.receive(2)
    if len(msgs) == 0:
        if randint(0, 10) == 0 and wq.length() == 0 and wq.recover() == 0:
            print "sleeping"
            time.sleep(1)
            
    for msg in msgs:
        print "received msg", msg
        try:
            a = 10/msg["msg"]
            wq.proccessed(msg)
        except Exception,e: 
            print "exception", str(e)   

原文:http://hodzanassredin.github.io/2016/03/29/redis_task_queue.html
翻译:yyt030

相关文章

  • 翻译-使用redis做任务队列

    前段时间,在工作中我们使用azure storage队列作为任务队列引擎,但过段时间后我们发现它并没有我们希望的那...

  • Redis入门(5) - 消息通知

    使用列表实现任务队列 优先级队列 按照规则订阅 Redis也可以作为任务队列。任务队列顾名思义,就是“传递任务的队...

  • 基于mysql构建的队列表

    通常大家都会使用redis作为应用的任务队列表,redis的List结构,在一段进行任务的插入,在另一端进行任务的...

  • Redis学习笔记:订阅和发布

    原文链接:Redis学习笔记:订阅和发布 一、基本使用 除了任务队列以外,redis还有一种基于“发布/订阅”模式...

  • horizon - 队列监控

    Horizon 为 Laravel Redis 队列提供一个仪表板,用于查看和管理 Redis 队列任务执行的情况...

  • redis 异步和延时消息队列

    redis 异步消息队列Redis 的 list(列表) 数据结构常用来作为异步消息队列使用,使用rpush/l...

  • 几种实现延时任务的方式(三)

    上篇文章介绍了使用Redis来实现延时任务,这是一个比较好的方案,但是这种方式是把Redis作为消息队列去使用,而...

  • Redis 怎么做消息队列?

    结论是:不要使用redis去做消息队列,这不是redis的设计目标。 但实在太多人使用redis去做去消息队列,r...

  • 用Redis做一个消息队列

    PHP文件使用redis调用方法,操作入队列,守护进程文件中,使用redis方法操作出队列,然后执行业务逻辑 守护...

  • 史上最全141道大数据面试题:Redis+Linux+kafka

    Redis面试专题及答案 什么是缓存穿透?如何避免?什么是缓存雪崩?何如避免? 使用过 Redis 做异步队列么,...

网友评论

    本文标题:翻译-使用redis做任务队列

    本文链接:https://www.haomeiwen.com/subject/yszqettx.html