分布式锁
问题
- 在分布式应用中,如果要修改用户的状态,需要先读取出用户的状态,在内存中修改之后再保存,如果这样的操作在同一个用户上进行,就会出现并发问题,因为读取和修改这两个操作不是原子性的。
解决方法
-
对分布式应用加锁
在Redis中,可以使用setnx(set if not exists)指令来进行加锁的操作:
> setnx lock:operation true OK ... do something critical > del lock:operation (integer) 1
- 可能出现的问题:
逻辑执行到一半出现异常,未能正常退出,可能导致del
指令没有被调用,锁就永远不会被释放,发生了死锁。
因此需要对锁设置过期时间,防止锁被一直占用:
> setnx lock:operation true OK > expire lock:operation 5 ... do something critical > del lock:operation (integer) 1
- 可能出现的问题:
在setnx
和expire
两个指令之间,服务器进程挂掉,expire
无法执行,也会死锁。
问题出现的原因是
setnx
和expire
这两个指令不是原子性的,因此,需要将它们组合在一起,redis2.8之后引入了set
指令的扩展参数:> set lock:operation true ex 5 nx OK ... do something critical ... > del lock:operation
上面的指令是将
setnx
和expire
组合在一起的原子指令 - 可能出现的问题:
-
需要注意的问题:
-
超时问题:
如果在加锁和释放锁之间的逻辑执行时间过长,导致锁过期,就可能出现这种情况:
线程1持有一把锁,但执行时间过长导致锁过期,线程2由于锁过期的原因,持有了这把锁,
线程1此时执行完毕,释放锁。这时线程3就可能在线程2执行的过程中拿到锁,导致一些预料之外的行为发生。如何避免? 尽量不要让Redis分布式锁用于较长时间的任务。其次可以在加锁的时候设置一个随机数,
在释放锁时先匹配随机数是否一致,然后再执行释放锁的操作,但是需要注意匹配随机数和释放锁这两个动作应该是一个原子性动作,因此需要使用Lua
脚本来执行 -
保证锁的可重入性:
线程在持有某个锁的情况下,可以再次请求被这个锁保护的其他资源,这个锁就是可重入的。可重入性意味着线程可以进入任何一个它已经拥有的锁所同步着的代码块。
使用可重入锁需要对set方法进行包装,使用threadlocal存储当前持有锁的计数。如果要实现完整的可重入锁,还需要考虑很多其他的问题,例如内存锁计数的过期时间等等。比较复杂。
不推荐使用Redis来实现可重入锁
-
锁冲突处理
客户端处理请求时加锁没加成功,有三种策略来解决- 直接抛出异常,通知稍后重试
- sleep一会再请求
- 将请求转移到延时队列,过一会再试
-
延时队列
比较适合异步消息处理,将当前冲突的请求扔到另一个队列延后处理以避开冲突
Redis的 list
常常被用来作为异步消息队列使用,使用 rpush/lpush
入队, lpop/rpop
出队
问题
- 队列为空
队列为空时,客户端会陷入pop操作的死循环,导致空轮询,空轮询多的话,会导致Redis的慢查询显著增多
让线程sleep来解决这个问题,请求不到的话,让线程睡1秒钟,这样能降低客户端的CPU占用和Redis的QPS
- 队列延迟
睡眠的方法虽然可以解决问题,但是会导致消息延迟增大,如何降低延迟呢?
使用blpop/brpop
,这俩个指令的前缀 d 代表blocking
,阻塞读。
阻塞读在队列没有数据的时候,会立即进入休眠状态,一旦数据到来,则立刻醒过来,消息延迟几乎为0.
- 空闲连接自动断开
若果线程一直阻塞,Redis在连接闲置过久的时候会主动断开连接以减少闲置资源占用,这个时候bpop/lpop
会抛出异常
在编写客户端消费者时要注意捕获异常,重新连接
延时队列的实现
延时队列可以通过Redis的zset
(有序列表)实现,将消息序列化作为 zset
的 value
,将到期处理时间作为 score
使用多个线程轮询 zset
获取到期的任务进行处理(保证一个线程挂了其他的线程可以继续处理)
def delay(msg):
msg.id = str(uuid.uuid4()) # 保证 value 值唯一
value = json.dumps(msg)
retry_ts = time.time() + 5 # 5 秒后重试
redis.zadd("delay-queue", retry_ts, value)
def loop():
while True:
# 最多取 1 条
values = redis.zrangebyscore("delay-queue", 0, time.time(), start=0, num=1)
if not values:
time.sleep(1) # 延时队列空的,休息 1s
continue
value = values[0] # 拿第一条,也只有一条
success = redis.zrem("delay-queue", value) # 从消息队列中移除该消息
if success: # 因为有多进程并发的可能,最终只会有一个进程可以抢到消息
msg = json.loads(value)
handle_msg(msg)...
使用 zrem
来保证任务被唯一的线程获取并执行
- 需要注意的问题:
对handle_msg进行异常捕获, 避免因为个别任务抛出异常导致循环异常退出。
网友评论