> redis-server --version
Redis server v=3.0.504 sha=00000000:0 malloc=jemalloc-3.6.0 bits=64 build=a4f7a6e86f2d60b3
Redis中的事务处理
Redis中的事务可以视为一个队列,使用MULTI
命令标记事务的开始,接下来客户端提交的命令,服务器都不会立即执行,而是将其压入队列,并返回QUEUED
,表示已入队。当输入EXEC
命令时,触发当前事务的执行(FIFO),并且在执行事务的过程中不会被客户端发送的其它命令所中断。下面是一个在Redis中使用事务的简单示例:
> MULTI # 标记事务开始
OK
> SET Book_Name "GIt Pro" # 多条命令按顺序入队
QUEUED
> SADD Program_Language "C++" "C#" "Jave" "Python"
QUEUED
> GET Book_Name
QUEUED
> EXEC # 执行
OK
4
GIt Pro
并不是所有的命令都会被放进事务队列, 其中的例外就是 EXEC
、DISCARD
、MULTI
和 WATCH
这四个命令 。 当这四个命令从客户端发送到服务器时, 它们会像客户端处于非事务状态一样, 直接被服务器执行。
Redis事务命令
命令 | 描述 |
---|---|
MULTI | 标记一个事务块的开始 |
EXEC | 执行事务块内的所有命令 |
DISCARD | 取消事务,清空事务队列,如果正在使用 WATCH 命令监视某个(或某些) key,那么取消所有监视,等同于执行命令 UNWATCH |
WATCH | 监视一个或多个KEY,如果在事务执行之前KEY被其他命令所改动,那么事务将会被打断 |
UNWATCH | 取消WATCH命令对所有KEY的监视 |
使用案例:
- 放弃事务;
> SET Name Rethink
OK
> MULTI
OK
> SET Name Foo
QUEUED
> DISCARD -- (放弃事务)
OK
> EXEC
ERR EXEC without MULTI
> GET Name
Rethink -- (可以看到,数据未被改动)
- 命令在入队时发生错误,Redis将在客户端调用
EXEC
命令时拒绝执行并取消事务;
> SET K1 V1
OK
> MULTI
OK
> SET K1 V2
QUEUED
> GET_ K1
ERR unknown command 'get_'
> EXEC -- (自动取消事务)
EXECABORT Transaction discarded because of previous errors.
> GET K1
V1
- 在
EXEC
命令执行后发生的错误,Redis将选择自动忽略,而不是事务回滚,实际上,Redis本身是不支持事务回滚机制的,后面会再细说。
> SET Version 1.0
OK
> MULTI
OK
> INCR Version -- (非事务模式下, 执行此命令会报错)
QUEUED
> GET Version
QUEUED
> EXEC
1.0 -- (自动忽略了错误)
-
使用watch监视
WATCH
命令用于在事务开始之前监视任意数量的键,当调用EXEC
命令执行事务时, 如果任意一个被监视的键已经被其他客户端修改了, 那么整个事务不再执行, 直接返回失败。
> SET Balance 100
OK
> WATCH Balance
OK
> MULTI
OK
> DECRBY Balance 20
QUEUED
> INCRBY Balance 20
QUEUED
> EXEC
(nil) -- (事务不执行, 直接返回失败)
WATCH
命令实现原理
在每个代表数据库的 redis.h/redisDb
结构类型中, 都保存了一个 watched_keys
字典, 字典的键是这个数据库被监视的键, 而字典的值则是一个链表, 链表中保存了所有监视这个键的客户端。
比如说,以下字典就展示了一个 watched_keys
字典的例子:
其中, 键
key1
正在被 client2
、 client5
和 client1
三个客户端监视, 其他一些键也分别被其他别的客户端监视着。
WATCH
命令的作用, 就是将当前客户端和要监视的键在 watched_keys
中进行关联。通过watched_keys
字典, 如果程序想检查某个键是否被监视, 那么它只要检查字典中是否存在这个键即可; 如果程序要获取监视某个键的所有客户端, 那么只要取出键的值(一个链表), 然后对链表进行遍历即可。
watch的触发
在任何对数据库键空间(key space)进行修改的命令成功执行之后 (比如FLUSHDB, SET, LPUSH, SADD, ZREM,诸如此类),multi.c/touchWatchedKey
函数都会被调用 —— 它检查数据库的watched_keys
字典, 看是否有客户端在监视已经被命令修改的键, 如果有的话, 程序将所有监视这个/这些被修改键的客户端的REDIS_DIRTY_CAS
选项打开:
当客户端发送
EXEC
命令触发事务执行时, 服务器会对客户端的状态进行检查:
- 如果客户端的
REDIS_DIRTY_CAS
选项已经被打开,那么说明被客户端监视的键至少有一个已经被修改了,事务的安全性已经被破坏。服务器会放弃执行这个事务,直接向客户端返回空回复,表示事务执行失败。 - 如果
REDIS_DIRTY_CAS
选项没有被打开,那么说明所有监视键都安全,服务器正式执行事务。
Redis事务的ACID特性
-
原子性 (Atomicity)
指事务中的所有操作作为一个整体,像原子一样不可分割。通俗的理解:只有事务中所有的操作都执行成功,整个事务才会被提交,如果有某个操作在执行期间出现问题,则已经执行的任何操作都必须被撤销,让数据库返回初始状态。
对于Redis事务来说,却不能保证原子性,这是因为Redis不支持事务回滚机制,这也是和传统的关系型数据库事务的最大区别。对于Redis事务而言,即使事务队列中的某个命令在执行期间出现了错误,整个事务也会继续执行下去,直到将事务队列中的所有命令执行完毕。
-
一致性 (Consistency)
事务的执行结果,必须使数据库从一个一致性状态到另一个一致性状态。
-
隔离性 (Isolation)
并发执行的事务不会相互影响,其对数据库的影响和它们串行执行时是一样的。如多个用户同时往一个账户转账,最后账户的结果应该和他们按先后次序转账的结果一样。
Redis使用单线程的方式来执行事务,并且在执行事务期间不会对事务进行中断,因此,Redis的事务总是以串行的方式运行,并且事务也总是具有隔离性。
-
持久性 (Durability)
事务的耐久性指的是,当一个事务执行完毕时,执行这个事务所得的结果巳经被保存到 永久性存储介质(比如硬盘)里面了, 即使服务器在事务执行完毕 之后停机, 执行事务所得的结果也不会丢失。
Redis事务的耐久性由服务器所使用持久化模式决定的:
- 当服务器在无持久化的内存模式下运作时,事务不具有耐久性。因为一旦服务器停机,服务器所有的数据都将丢失;
- 当服务器在ROB持久化模式下运作时,事务同样不具有耐久性。因为服务器只会在特定的保存条件下才会执行BGSAVE命令,并且异步执行的BGSAVE命令不能保证事务的数据第一时间被保存到硬盘上;
- 当服务器运行在AOF持久化模式下,并且appendfsync选项的值为always时,程序总会在执行命令之后调用同步(sync)函数,将命令数据真正地保存到硬盘里;
python基于redis实现分布式锁
分布式锁应该具备的条件:
- 在分布式系统环境下,任意时刻,只能有一个客户端能持有锁;
- 高可用/高性能的获取锁和释放锁,必须保证加锁和解锁是同一个客户端所为;
- 具备锁失效机制(防止死锁),即使有一个客户端在持有锁期间崩溃而没有主动释放锁,也能保证后续其他客户端能正常加锁;
- 具备非堵塞特性,即没有获取到锁,直接返回获取锁失败。
示例代码如下:
import uuid
import time
import math
def acquire_lock(cli, lockname, acquire_timeout=3, lock_timeoout=2):
"""获取锁
@param cli: Redis实例
@param lockname: 锁名称
@param acquire_timeout: 客户端获取锁的超时时间(秒), 默认3s
@param lock_timeout: 锁过期时间(秒), 超过这个时间锁会自动释放, 默认2s
"""
lockname = f"lock:{lockname}"
identifier = str(uuid.uuid4())
lock_timeoout = math.ceil(lock_timeoout)
end_time = time.time() + acquire_timeout
while time.time() <= end_time:
# 如果不存在当前锁, 则进行加锁并设置过期时间, 返回锁唯一标识
if cli.set(lockname, identifier, ex=lock_timeoout, nx=True): # 一条命令实现, 保证原子性
return identifier
# 如果锁存在但是没有失效时间, 则进行设置, 避免出现死锁
elif cli.ttl(lockname) == -1:
cli.expire(lockname, lock_timeoout)
time.sleep(0.001)
# 客户端在超时时间内没有获取到锁, 返回False
return False
def release_lock(cli, lockname, identifier):
"""释放锁
@param cli: Redis实例
@param lock_name: 锁名称
@param identifier: 锁标识
"""
with cli.pipeline() as pipe:
lockname = f"lock:{lockname}"
while True:
try:
pipe.watch(lockname)
id = pipe.get(lockname)
if id and id == identifier:
pipe.multi()
pipe.delete(lockname)
pipe.execute() # 执行EXEC命令后自动执行UNWATCH (DISCARD同理)
return True
pipe.unwatch() # 没有参数
break
except redis.WatchError:
pass
return False
加锁过程
- 首先需要为锁生成一个唯一标识identifier;
- 然后使用redis set 命令设置锁,从 v2.6.12 版本开始,set命令支持
nx
和ex
参数,具体内容可点击进行查看;如果锁之前不存在,则加锁成功,并设置锁的过期时间,返回锁唯一标识; - 如果锁设置失败,则先判断一下该锁是否有过期时间,如果没有则进行设置;其实这一步可以省略,因为redis的命令都是原子性的。
解锁过程
- 首先整个解锁操作需要在一个 Redis 的事务中进行,python 中 redis 事务是通过pipeline的封装实现的;
- 使用
WATCH
监听锁,防止在解锁时出现锁被其他客户端修改; - 查询锁对应的标识是否与本次解锁的标识相同;
- 如果标识相同,则在事务中删除锁,如果删除过程中锁自动失效又被其他客户端拿到(即锁标识被其他客户端修改),此时设置了
WATCH
就会删除失败,这样就不会出现删除了其他客户端锁的情况。
下面对刚才实现的分布式锁进行测试,使用50个线程,模拟秒杀10张票,从结果的有序性可以看出是否为加锁状态,代码如下:
from threading import Thread
import redis
# Redis 存字符串返回的是byte,指定decode_responses=True可以解决
pool = redis.ConnectionPool(host="127.0.0.1", port=6379, socket_connect_timeout=3, decode_responses=True)
redis_cli = redis.Redis(connection_pool=pool)
count = 10
def ticket(i):
identifier = acquire_lock(redis_cli, 'Ticket')
print(f"线程{i}--获得了锁")
time.sleep(1)
global count
if count < 1:
print(f"线程{i}没抢到票, 票已经抢完了")
return
count -= 1
print(f"线程{i}抢到票了, 还剩{count}张票")
release_lock(redis_cli, 'Resource', identifier)
print(f"线程{i}--释放了锁")
for i in range(10):
t = Thread(target=ticket, args=(i, ))
t.start()
输出如下:
线程1--获得了锁
线程1抢到票了, 还剩4张票
线程1--释放了锁
线程2--获得了锁
线程4--获得了锁
线程3--获得了锁
线程0--获得了锁
线程6--获得了锁
线程7--获得了锁
线程8--获得了锁
线程5--获得了锁
线程9--获得了锁
线程11--获得了锁
线程10--获得了锁
线程13--获得了锁
线程2抢到票了, 还剩3张票
线程2--释放了锁
线程14--获得了锁
线程12--获得了锁
线程15--获得了锁
线程16--获得了锁
线程17--获得了锁
线程19--获得了锁
线程18--获得了锁
线程4抢到票了, 还剩2张票
线程4--释放了锁
线程3抢到票了, 还剩1张票
线程0抢到票了, 还剩0张票
线程8没抢到票, 票已经抢完了
线程6没抢到票, 票已经抢完了
线程7没抢到票, 票已经抢完了
线程3--释放了锁
线程5没抢到票, 票已经抢完了
线程0--释放了锁
线程9没抢到票, 票已经抢完了
线程11没抢到票, 票已经抢完了
线程10没抢到票, 票已经抢完了
线程13没抢到票, 票已经抢完了
线程14没抢到票, 票已经抢完了
线程12没抢到票, 票已经抢完了
线程15没抢到票, 票已经抢完了
线程16没抢到票, 票已经抢完了
线程17没抢到票, 票已经抢完了
线程19没抢到票, 票已经抢完了
线程18没抢到票, 票已经抢完了
参考文档
- 数据库事务的概念及其实现原理, takumiCX
- 深入理解Redis事务
- Python 使用 Redis 实现分布式锁,WoodenRobot
- python基于redis实现分布式锁,Maple_feng
- Redis 命令参考
【To Be Continued...】
网友评论