异步消息队列
通过list实现,rpush/lpop或者lpush/rpop
队列为空的处理:
-
通过程序sleep一段时间
-
用blpop代替lpop,队列为空时会阻塞,但是如果闲置太久会抛异常。需要捕获异常并重试
如果有多个消费者,需要用到上面的分布式锁
锁冲突的处理(pop出来的数据由于没有抢到锁导致没有处理成功):
-
抛出异常提示前端重试
-
把请求转移到延时队列,稍后再试
延时队列
通过zset实现
消息内容作为value,过期时间作为score,优先处理
def loop():
while True:
values = redis.zrangebyscore("delay-queue", 0, time.time(), start=0, num=1)
if not values:
time.sleep(1) # 延时队列空的,休息 1s
continue
value = values[0] # 拿第一条,也只有一条
success = redis.zrem("delay-queue", value) # 从消息队列中移除该消息
if success: # 因为有多进程并发的可能,最终只会有一个进程可以抢到消息
msg = json.loads(value)
handle_msg(msg)
zrem 方法是多线程多进程争抢任务的关键,它的返回值决定了当前实例有没有抢到任务
命令:RPOPLPUSH source destination
安全的队列
Redis的列表经常被用作队列(queue),用于在不同程序之间有序地交换消息(message)。一个客户端通过 LPUSH 命令将消息放入队列中,而另一个客户端通过 RPOP 或者 BRPOP 命令取出队列中等待时间最长的消息。
不幸的是,上面的队列方法是『不安全』的,因为在这个过程中,一个客户端可能在取出一个消息之后崩溃,而未处理完的消息也就因此丢失。
使用 RPOPLPUSH 命令(或者它的阻塞版本 BRPOPLPUSH )可以解决这个问题:因为它不仅返回一个消息,同时还将这个消息添加到另一个备份列表当中,如果一切正常的话,当一个客户端完成某个消息的处理之后,可以用 LREM 命令将这个消息从备份表删除。
最后,还可以添加一个客户端专门用于监视备份表,它自动地将超过一定处理时限的消息重新放入队列中去(负责处理该消息的客户端可能已经崩溃),这样就不会丢失任何消息了。
循环列表
通过使用相同的 key 作为 RPOPLPUSH 命令的两个参数,客户端可以用一个接一个地获取列表元素的方式,取得列表的所有元素,而不必像 LRANGE 命令那样一下子将所有列表元素都从服务器传送到客户端中(两种方式的总复杂度都是 O(N))。
以上的模式甚至在以下的两个情况下也能正常工作:
有多个客户端同时对同一个列表进行旋转(rotating),它们获取不同的元素,直到所有元素都被读取完,之后又从头开始。
有客户端在向列表尾部(右边)添加新元素。
这个模式使得我们可以很容易实现这样一类系统:有 N 个客户端,需要连续不断地对一些元素进行处理,而且处理的过程必须尽可能地快。一个典型的例子就是服务器的监控程序:它们需要在尽可能短的时间内,并行地检查一组网站,确保它们的可访问性。
注意,使用这个模式的客户端是易于扩展(scala)且安全(reliable)的,因为就算接收到元素的客户端失败,元素还是保存在列表里面,不会丢失,等到下个迭代来临的时候,别的客户端又可以继续处理这些元素了。
网友评论