美文网首页
基于redis key的分布式资源池锁

基于redis key的分布式资源池锁

作者: Mr韩_xianfeng | 来源:发表于2020-03-26 17:01 被阅读0次

    基于redis自带的原子操作setnx,getset等原子操作命令实现资源锁定功能。
    setnx:将 key 的值设为 value ,当且仅当 key 不存在。
    若给定的 key 已经存在,则 SETNX 不做任何动作。
    SETNX 是『SET if Not eXists』(如果不存在,则 SET)的简写。
    getset:将给定 key 的值设为 value ,并返回 key 的旧值(old value)。
    当 key 存在但不是字符串类型时,返回一个错误。

    这个实现参考了网友的代码,可以实现多个资源锁抢占,相当于资源池锁。假定有两个环境可被使用,每个环境只能被一个任务占用,就可以使用这个锁去抢占资源

    import time
    
    from redis import Redis
    
    
    class ResourceLock(object):
        def __init__(self, redis_client: Redis, keys=[], expires=60, timeout=10):
            """
            Distributed locking using Redis SETNX and GETSET.
    
            Usage::
    
                with ResourceLock(*args,**kwargs):
                    print "Critical section"
    
                or
    
                lock = ResourceLock(*args,**kwargs)
                lock.acquire()
                print('do something here...')
                lock.release()
    
            :param  expires     We consider any existing lock older than
                                ``expires`` seconds to be invalid in order to
                                detect crashed clients. This value must be higher
                                than it takes the critical section to execute.
            :param  timeout     If another client has already obtained the lock,
                                sleep for a maximum of ``timeout`` seconds before
                                giving up. A value of 0 means we never wait.
            """
    
            self.keys = keys
            self.available_key = None
            self.timeout = timeout
            self.expires = expires
            self.redis_client = redis_client
    
        def acquire(self):
            timeout = self.timeout
            while timeout >= 0:
                expires = time.time() + self.expires + 1
    
                for key in self.keys:
                    if self.redis_client.setnx(key, expires):
                        self.available_key = key
                        # We gained the lock; enter critical section
                        print(f'{self.available_key}')
                        return
    
                for key in self.keys:
                    current_value = self.redis_client.get(key)
    
                    # We found an expired lock and nobody raced us to replacing it
                    if current_value and float(current_value) < time.time() and \
                            self.redis_client.getset(key, expires) == current_value:
                        self.available_key = key
                        print('过期')
                        print(f'{self.available_key}')
                        return
    
                timeout -= 1
                time.sleep(1)
    
            raise LockTimeout("Timeout while waiting for lock")
    
        def release(self):
            print(f'delete {self.available_key}')
            self.redis_client.delete(self.available_key)
    
        def __enter__(self):
            self.acquire()
    
        def __exit__(self, exc_type, exc_value, traceback):
            self.release()
    
    
    class LockTimeout(BaseException):
        pass
    
    
    class redis_lock(object):
        def __init__(self, redis_client, keys=[], expires=60):
            self.rc = redis_client
            self.keys = keys
            self.expires = expires
    
        def __call__(self, func):
            def _dec(*args, **kwargs):
                with ResourceLock(redis_client=self.rc, keys=self.keys, expires=self.expires):
                    return func(*args, **kwargs)
    
            return _dec
    

    使用演示:

    pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
    rc = redis.Redis(connection_pool=pool)
    
    @redis_lock(rc, keys=['test1', 'test2'])
    def deploy(*args, **kwargs):
        from apps.pipeline.actions.deploy import deploy
        return deploy(*args, **kwargs)
    
    

    相关文章

      网友评论

          本文标题:基于redis key的分布式资源池锁

          本文链接:https://www.haomeiwen.com/subject/bsvcuhtx.html