美文网首页
用Redis实现的各种队列比较

用Redis实现的各种队列比较

作者: gao922699 | 来源:发表于2022-08-13 00:59 被阅读0次

异步消息队列

通过list实现,rpush/lpop或者lpush/rpop

队列为空的处理:

  1. 通过程序sleep一段时间

  2. 用blpop代替lpop,队列为空时会阻塞,但是如果闲置太久会抛异常。需要捕获异常并重试

如果有多个消费者,需要用到上面的分布式锁

锁冲突的处理(pop出来的数据由于没有抢到锁导致没有处理成功):

  1. 抛出异常提示前端重试

  2. 把请求转移到延时队列,稍后再试

延时队列

通过zset实现

消息内容作为value,过期时间作为score,优先处理

def loop():

while True:

values = redis.zrangebyscore("delay-queue", 0, time.time(), start=0, num=1)

        if not values:

            time.sleep(1)  # 延时队列空的,休息 1s

            continue

        value = values[0]  # 拿第一条,也只有一条

        success = redis.zrem("delay-queue", value)  # 从消息队列中移除该消息

        if success:  # 因为有多进程并发的可能,最终只会有一个进程可以抢到消息

            msg = json.loads(value)

            handle_msg(msg)

zrem 方法是多线程多进程争抢任务的关键,它的返回值决定了当前实例有没有抢到任务

命令:RPOPLPUSH source destination

安全的队列

Redis的列表经常被用作队列(queue),用于在不同程序之间有序地交换消息(message)。一个客户端通过 LPUSH 命令将消息放入队列中,而另一个客户端通过 RPOP 或者 BRPOP 命令取出队列中等待时间最长的消息。

不幸的是,上面的队列方法是『不安全』的,因为在这个过程中,一个客户端可能在取出一个消息之后崩溃,而未处理完的消息也就因此丢失。

使用 RPOPLPUSH 命令(或者它的阻塞版本 BRPOPLPUSH )可以解决这个问题:因为它不仅返回一个消息,同时还将这个消息添加到另一个备份列表当中,如果一切正常的话,当一个客户端完成某个消息的处理之后,可以用 LREM 命令将这个消息从备份表删除。

最后,还可以添加一个客户端专门用于监视备份表,它自动地将超过一定处理时限的消息重新放入队列中去(负责处理该消息的客户端可能已经崩溃),这样就不会丢失任何消息了。

循环列表

通过使用相同的 key 作为 RPOPLPUSH 命令的两个参数,客户端可以用一个接一个地获取列表元素的方式,取得列表的所有元素,而不必像 LRANGE 命令那样一下子将所有列表元素都从服务器传送到客户端中(两种方式的总复杂度都是 O(N))。

以上的模式甚至在以下的两个情况下也能正常工作:

有多个客户端同时对同一个列表进行旋转(rotating),它们获取不同的元素,直到所有元素都被读取完,之后又从头开始。

有客户端在向列表尾部(右边)添加新元素。

这个模式使得我们可以很容易实现这样一类系统:有 N 个客户端,需要连续不断地对一些元素进行处理,而且处理的过程必须尽可能地快。一个典型的例子就是服务器的监控程序:它们需要在尽可能短的时间内,并行地检查一组网站,确保它们的可访问性。

注意,使用这个模式的客户端是易于扩展(scala)且安全(reliable)的,因为就算接收到元素的客户端失败,元素还是保存在列表里面,不会丢失,等到下个迭代来临的时候,别的客户端又可以继续处理这些元素了。

相关文章

网友评论

      本文标题:用Redis实现的各种队列比较

      本文链接:https://www.haomeiwen.com/subject/pboawrtx.html