美文网首页What is SoftWare TestPython
Python基于Redis实现分布式锁

Python基于Redis实现分布式锁

作者: Rethink | 来源:发表于2021-07-14 14:50 被阅读0次

    > redis-server --version
    Redis server v=3.0.504 sha=00000000:0 malloc=jemalloc-3.6.0 bits=64 build=a4f7a6e86f2d60b3

    Redis中的事务处理

    Redis中的事务可以视为一个队列,使用MULTI命令标记事务的开始,接下来客户端提交的命令,服务器都不会立即执行,而是将其压入队列,并返回QUEUED,表示已入队。当输入EXEC命令时,触发当前事务的执行(FIFO),并且在执行事务的过程中不会被客户端发送的其它命令所中断。下面是一个在Redis中使用事务的简单示例:

    > MULTI  # 标记事务开始
    OK
    > SET Book_Name "GIt Pro"    # 多条命令按顺序入队
    QUEUED
    > SADD Program_Language "C++" "C#" "Jave" "Python"
    QUEUED
    > GET Book_Name
    QUEUED
    > EXEC  # 执行
    OK
    4
    GIt Pro
    

    并不是所有的命令都会被放进事务队列, 其中的例外就是 EXECDISCARDMULTIWATCH 这四个命令 。 当这四个命令从客户端发送到服务器时, 它们会像客户端处于非事务状态一样, 直接被服务器执行。

    Redis事务命令

    命令 描述
    MULTI 标记一个事务块的开始
    EXEC 执行事务块内的所有命令
    DISCARD 取消事务,清空事务队列,如果正在使用 WATCH 命令监视某个(或某些) key,那么取消所有监视,等同于执行命令 UNWATCH
    WATCH 监视一个或多个KEY,如果在事务执行之前KEY被其他命令所改动,那么事务将会被打断
    UNWATCH 取消WATCH命令对所有KEY的监视

    使用案例:

    1. 放弃事务;
    > SET Name Rethink
    OK
    > MULTI
    OK
    > SET Name Foo
    QUEUED
    > DISCARD   -- (放弃事务)
    OK
    > EXEC
    ERR EXEC without MULTI
    > GET Name
    Rethink    -- (可以看到,数据未被改动)
    
    1. 命令在入队时发生错误,Redis将在客户端调用EXEC命令时拒绝执行并取消事务;
    > SET K1 V1
    OK
    > MULTI
    OK
    > SET K1 V2
    QUEUED
    > GET_  K1
    ERR unknown command 'get_'
    > EXEC  -- (自动取消事务)
    EXECABORT Transaction discarded because of previous errors.
    > GET K1
    V1
    
    1. EXEC命令执行后发生的错误,Redis将选择自动忽略,而不是事务回滚,实际上,Redis本身是不支持事务回滚机制的,后面会再细说。
    > SET Version 1.0
    OK
    > MULTI
    OK
    > INCR Version  -- (非事务模式下, 执行此命令会报错)
    QUEUED
    > GET Version
    QUEUED
    > EXEC
    1.0  -- (自动忽略了错误)
    
    1. 使用watch监视
      WATCH命令用于在事务开始之前监视任意数量的键,当调用EXEC命令执行事务时, 如果任意一个被监视的键已经被其他客户端修改了, 那么整个事务不再执行, 直接返回失败。
    > SET Balance 100
    OK
    > WATCH Balance
    OK
    > MULTI
    OK
    > DECRBY Balance 20
    QUEUED
    > INCRBY Balance 20
    QUEUED
    > EXEC
    (nil)   -- (事务不执行, 直接返回失败)
    

    WATCH命令实现原理

    在每个代表数据库的 redis.h/redisDb 结构类型中, 都保存了一个 watched_keys 字典, 字典的键是这个数据库被监视的键, 而字典的值则是一个链表, 链表中保存了所有监视这个键的客户端。
    比如说,以下字典就展示了一个 watched_keys 字典的例子:


    其中, 键 key1 正在被 client2client5client1 三个客户端监视, 其他一些键也分别被其他别的客户端监视着。

    WATCH 命令的作用, 就是将当前客户端和要监视的键在 watched_keys 中进行关联。通过watched_keys字典, 如果程序想检查某个键是否被监视, 那么它只要检查字典中是否存在这个键即可; 如果程序要获取监视某个键的所有客户端, 那么只要取出键的值(一个链表), 然后对链表进行遍历即可。

    watch的触发
    在任何对数据库键空间(key space)进行修改的命令成功执行之后 (比如FLUSHDB, SET, LPUSH, SADD, ZREM,诸如此类),multi.c/touchWatchedKey函数都会被调用 —— 它检查数据库的watched_keys字典, 看是否有客户端在监视已经被命令修改的键, 如果有的话, 程序将所有监视这个/这些被修改键的客户端的REDIS_DIRTY_CAS选项打开:


    当客户端发送EXEC命令触发事务执行时, 服务器会对客户端的状态进行检查:
    • 如果客户端的 REDIS_DIRTY_CAS 选项已经被打开,那么说明被客户端监视的键至少有一个已经被修改了,事务的安全性已经被破坏。服务器会放弃执行这个事务,直接向客户端返回空回复,表示事务执行失败。
    • 如果 REDIS_DIRTY_CAS 选项没有被打开,那么说明所有监视键都安全,服务器正式执行事务。

    Redis事务的ACID特性

    • 原子性 (Atomicity)

      指事务中的所有操作作为一个整体,像原子一样不可分割。通俗的理解:只有事务中所有的操作都执行成功,整个事务才会被提交,如果有某个操作在执行期间出现问题,则已经执行的任何操作都必须被撤销,让数据库返回初始状态。

      对于Redis事务来说,却不能保证原子性,这是因为Redis不支持事务回滚机制,这也是和传统的关系型数据库事务的最大区别。对于Redis事务而言,即使事务队列中的某个命令在执行期间出现了错误,整个事务也会继续执行下去,直到将事务队列中的所有命令执行完毕。

    • 一致性 (Consistency)

      事务的执行结果,必须使数据库从一个一致性状态到另一个一致性状态。

    • 隔离性 (Isolation)

      并发执行的事务不会相互影响,其对数据库的影响和它们串行执行时是一样的。如多个用户同时往一个账户转账,最后账户的结果应该和他们按先后次序转账的结果一样。

      Redis使用单线程的方式来执行事务,并且在执行事务期间不会对事务进行中断,因此,Redis的事务总是以串行的方式运行,并且事务也总是具有隔离性。

    • 持久性 (Durability)

      事务的耐久性指的是,当一个事务执行完毕时,执行这个事务所得的结果巳经被保存到 永久性存储介质(比如硬盘)里面了, 即使服务器在事务执行完毕 之后停机, 执行事务所得的结果也不会丢失。

      Redis事务的耐久性由服务器所使用持久化模式决定的:

      1. 当服务器在无持久化的内存模式下运作时,事务不具有耐久性。因为一旦服务器停机,服务器所有的数据都将丢失;
      2. 当服务器在ROB持久化模式下运作时,事务同样不具有耐久性。因为服务器只会在特定的保存条件下才会执行BGSAVE命令,并且异步执行的BGSAVE命令不能保证事务的数据第一时间被保存到硬盘上;
      3. 当服务器运行在AOF持久化模式下,并且appendfsync选项的值为always时,程序总会在执行命令之后调用同步(sync)函数,将命令数据真正地保存到硬盘里;

    python基于redis实现分布式锁

    分布式锁应该具备的条件:

    • 在分布式系统环境下,任意时刻,只能有一个客户端能持有锁;
    • 高可用/高性能的获取锁和释放锁,必须保证加锁和解锁是同一个客户端所为;
    • 具备锁失效机制(防止死锁),即使有一个客户端在持有锁期间崩溃而没有主动释放锁,也能保证后续其他客户端能正常加锁;
    • 具备非堵塞特性,即没有获取到锁,直接返回获取锁失败。

    示例代码如下:

    import uuid
    import time
    import math
    
    def acquire_lock(cli, lockname, acquire_timeout=3, lock_timeoout=2):
        """获取锁
        @param cli:   Redis实例
        @param lockname:   锁名称
        @param acquire_timeout: 客户端获取锁的超时时间(秒), 默认3s
        @param lock_timeout: 锁过期时间(秒), 超过这个时间锁会自动释放, 默认2s
        """
        lockname = f"lock:{lockname}"
        identifier = str(uuid.uuid4())
        lock_timeoout = math.ceil(lock_timeoout)
    
        end_time = time.time() + acquire_timeout
    
        while time.time() <= end_time:
            # 如果不存在当前锁, 则进行加锁并设置过期时间, 返回锁唯一标识
            if cli.set(lockname, identifier, ex=lock_timeoout, nx=True):  # 一条命令实现, 保证原子性
                return identifier
            # 如果锁存在但是没有失效时间, 则进行设置, 避免出现死锁
            elif cli.ttl(lockname) == -1:
                cli.expire(lockname, lock_timeoout)
    
            time.sleep(0.001)
    
        # 客户端在超时时间内没有获取到锁, 返回False
        return False
    
    
    def release_lock(cli, lockname, identifier):
        """释放锁
        @param cli: Redis实例
        @param lock_name:   锁名称
        @param identifier:  锁标识
        """
        with cli.pipeline() as pipe:
            lockname = f"lock:{lockname}"
            while True:
                try:
                    pipe.watch(lockname)
                    id = pipe.get(lockname)
                    if id and id == identifier:
                        pipe.multi()
                        pipe.delete(lockname)
                        pipe.execute()    # 执行EXEC命令后自动执行UNWATCH (DISCARD同理)
                        return True
                    pipe.unwatch()  # 没有参数
                    break
                except redis.WatchError:
                    pass
            return False
    

    加锁过程

    1. 首先需要为锁生成一个唯一标识identifier;
    2. 然后使用redis set 命令设置锁,从 v2.6.12 版本开始,set命令支持nxex参数,具体内容可点击进行查看;如果锁之前不存在,则加锁成功,并设置锁的过期时间,返回锁唯一标识;
    3. 如果锁设置失败,则先判断一下该锁是否有过期时间,如果没有则进行设置;其实这一步可以省略,因为redis的命令都是原子性的。

    解锁过程

    1. 首先整个解锁操作需要在一个 Redis 的事务中进行,python 中 redis 事务是通过pipeline的封装实现的;
    2. 使用WATCH 监听锁,防止在解锁时出现锁被其他客户端修改;
    3. 查询锁对应的标识是否与本次解锁的标识相同;
    4. 如果标识相同,则在事务中删除锁,如果删除过程中锁自动失效又被其他客户端拿到(即锁标识被其他客户端修改),此时设置了 WATCH 就会删除失败,这样就不会出现删除了其他客户端锁的情况。

    下面对刚才实现的分布式锁进行测试,使用50个线程,模拟秒杀10张票,从结果的有序性可以看出是否为加锁状态,代码如下:

    from threading import Thread
    
    import redis
    
    # Redis 存字符串返回的是byte,指定decode_responses=True可以解决
    pool = redis.ConnectionPool(host="127.0.0.1", port=6379, socket_connect_timeout=3, decode_responses=True)
    redis_cli = redis.Redis(connection_pool=pool)
    
    count = 10
    
    
    def ticket(i):
        identifier = acquire_lock(redis_cli, 'Ticket')
        print(f"线程{i}--获得了锁")
        time.sleep(1)
        global count
        if count < 1:
            print(f"线程{i}没抢到票, 票已经抢完了")
            return
        count -= 1
        print(f"线程{i}抢到票了, 还剩{count}张票")
        release_lock(redis_cli, 'Resource', identifier)
        print(f"线程{i}--释放了锁")
    
    
    for i in range(10):
        t = Thread(target=ticket, args=(i, ))
        t.start()
    

    输出如下:

    线程1--获得了锁
    线程1抢到票了, 还剩4张票
    线程1--释放了锁
    线程2--获得了锁
    线程4--获得了锁
    线程3--获得了锁 
    线程0--获得了锁 
    线程6--获得了锁 
    线程7--获得了锁 
    线程8--获得了锁 
    线程5--获得了锁 
    线程9--获得了锁 
    线程11--获得了锁
    线程10--获得了锁
    线程13--获得了锁
    线程2抢到票了, 还剩3张票
    线程2--释放了锁
    线程14--获得了锁
    线程12--获得了锁
    线程15--获得了锁
    线程16--获得了锁
    线程17--获得了锁
    线程19--获得了锁
    线程18--获得了锁
    线程4抢到票了, 还剩2张票
    线程4--释放了锁
    线程3抢到票了, 还剩1张票
    线程0抢到票了, 还剩0张票
    线程8没抢到票, 票已经抢完了
    线程6没抢到票, 票已经抢完了
    线程7没抢到票, 票已经抢完了
    线程3--释放了锁
    线程5没抢到票, 票已经抢完了
    线程0--释放了锁
    线程9没抢到票, 票已经抢完了
    线程11没抢到票, 票已经抢完了
    线程10没抢到票, 票已经抢完了
    线程13没抢到票, 票已经抢完了
    线程14没抢到票, 票已经抢完了
    线程12没抢到票, 票已经抢完了
    线程15没抢到票, 票已经抢完了
    线程16没抢到票, 票已经抢完了
    线程17没抢到票, 票已经抢完了
    线程19没抢到票, 票已经抢完了
    线程18没抢到票, 票已经抢完了
    

    参考文档

    1. 数据库事务的概念及其实现原理, takumiCX
    2. 深入理解Redis事务
    3. Python 使用 Redis 实现分布式锁WoodenRobot
    4. python基于redis实现分布式锁,Maple_feng
    5. Redis 命令参考

    【To Be Continued...】

    相关文章

      网友评论

        本文标题:Python基于Redis实现分布式锁

        本文链接:https://www.haomeiwen.com/subject/ilsypltx.html