美文网首页Python
Redis分布式锁的Python实现[python-redis-

Redis分布式锁的Python实现[python-redis-

作者: 来一碗花甲粉 | 来源:发表于2019-04-30 14:04 被阅读0次

转自https://readthefuckingsource.codes/2018/08/06/%E5%88%86%E5%B8%83%E5%BC%8F%E9%94%81python-redis-lock/

关于分布式锁有很多种实现方式,可以用数据库锁或者ZooKeeper这类的专业的分布式开源项目。本文讲的是用Redis实现的一个分布式锁库python-redis-lock. Redis官方有推荐一个分布式锁的算法Redlock(这个库实现的并不是这个算法), 该算法自动释放锁没有考虑到客户端长期持有的情况,因此也有人对这个算法提出了质疑

那回到我们今天要讲的这个库python-redis-lock。作者: Ionel Cristian Mărieș, 这个库整体的思路作者也用很直观的图展现出来了,如下:

python-redis-lock

大致思路

从图上看出作者和其它大多数用Redis实现分布式锁的思路类似(SET NX),但是他对每个锁多用了一个list类型键来做信号控制,如果客户端第一次尝试获取锁失败,可以选择在signal列表上阻塞一个timeout时间用来接收锁被释放的通知,Redis列表的这个特性保证了每次只有一个客户端接收到了锁释放的通知。而获取到锁的客户端在使用完后会在对应的信号列表上推送一个通知。另外,作者对锁超时还增加了一个刷新的功能来延长(Extend)对锁的占用,可以保证在持有锁的客户端上完成所有操作后才释放锁。个人认为这种设计的优点和需要注意的点如下:

优点

  • 一方面避免客户端反复请求锁,另一方面通过list signal来让客户端决定是否要block自己;
  • 如果有设置超时,则等待超时后客户端仍然会再尝试获取一次锁而不是直接失败;
  • 这个算法不依赖客户端时间戳,也就没有time drift问题;
  • 结合Lua脚本做原子操作,如果再加上细粒度锁,个人认为基本可以满足各种高需求场景的分布式锁要求。

⚠️Warning

  • 自动刷新可能会造成饥饿问题,如果持有锁的客户端因为某种未知原因阻塞,并且开启了自动刷新锁,那其它客户端就跪了,所以需要使用者慎用刷新机制;
  • 如果没有设置超时,且持有锁的客户端无响应的情况下就会造成死锁;

源码分析

了解过大体思路后,我们来一步步分解作者的实现。首先这个库源码只有两个脚本(不含测试和示例代码), 结构很简单。

src/redis_lock
├── init.py
└── django_cache.py

核心代码在__init__.py中,django_cache.py则是结合django-redis做的缓存后端,来避免缓存失效时遇上所谓的“狗屎效应(dogpile effect, Google翻译)”,这里不对它进行解析。

载入Lua脚本

# Check if the id match. If not, return an error code.
UNLOCK_SCRIPT = b"""
    if redis.call("get", KEYS[1]) ~= ARGV[1] then
        return 1
    else
        redis.call("del", KEYS[2])
        redis.call("lpush", KEYS[2], 1)
        redis.call("del", KEYS[1])
        return 0
    end
"""
UNLOCK_SCRIPT_HASH = sha1(UNLOCK_SCRIPT).hexdigest()

作者用上面这种方式定义了UNLOCK, EXTEND, RESET… 等5个原子操作的Lua脚本,每个脚本也定义了对应的哈希值。关于Redis的Lua脚本支持可以看这篇文章。比较有意思的是下面这段代码,可以说是很Pythonic了:

((UNLOCK, _, _,   # noqa
  EXTEND, _, _,
  RESET, _, _,
  RESET_ALL, _, _,
  DELETE_ALL_SIGNAL_KEYS, _, _),
 SCRIPTS) = zip(*enumerate([
    UNLOCK_SCRIPT_HASH, UNLOCK_SCRIPT, 'UNLOCK_SCRIPT',
    EXTEND_SCRIPT_HASH, EXTEND_SCRIPT, 'EXTEND_SCRIPT',
    RESET_SCRIPT_HASH, RESET_SCRIPT, 'RESET_SCRIPT',
    RESET_ALL_SCRIPT_HASH, RESET_ALL_SCRIPT, 'RESET_ALL_SCRIPT',
    DELETE_ALL_SIGNAL_KEYS_SCRIPT_HASH, DELETE_ALL_SIGNAL_KEYS_SCRIPT,
    'DELETE_ALL_SIGNAL_KEYS_SCRIPT'
]))

为了把使用时要指定的脚本ID与其脚本、哈希值关联起来,作者用了enumerate来自动生成索引ID, 然后又用*来拍扁整个列表,最后再用zip把索引提取出来,把哈希值对应的索引ID用变量名存起来,没用的索引用_忽略,其余内容依然在SCRIPTS元组中。这样一来用下面这个函数执行Redis的Lua脚本就很舒服了:

def _eval_script(redis, script_id, *keys, **kwargs):
    """Tries to call ``EVALSHA`` with the `hash` and then, if it fails, calls
    regular ``EVAL`` with the `script`.
    """
    # Lua脚本的 KEYS 参数放在 *keys 中, ARGV 参数则在 kwargs 的 args 命名参数中提取出来, 和 keys 拼接起来传给 evalsha 或者 eval 函数
    args = kwargs.pop('args', ())
    if kwargs:
        raise TypeError("Unexpected keyword arguments %s" % kwargs.keys())
    try:
        # 首先尝试调用evalsha
        return redis.evalsha(SCRIPTS[script_id], len(keys), *keys + args)
    except NoScriptError:
        # 如果脚本不存在则调用eval载入并执行脚本
        logger.warn("%s not cached.", SCRIPTS[script_id + 2])
        return redis.eval(SCRIPTS[script_id + 1], len(keys), *keys + args)

创建锁

主要代码都在Lock类中,创建锁对象时大部分都是常规操作,保存实例的一些设定。

class Lock(object):
    """
    A Lock context manager implemented via redis SETNX/BLPOP.
    """

    def __init__(self, redis_client, name, expire=None, id=None, auto_renewal=False, strict=True):
        """
        :param redis_client:
            An instance of :class:`~StrictRedis`.
        :param name:
            The name (redis key) the lock should have.
        :param expire:
            The lock expiry time in seconds. If left at the default (None)
            the lock will not expire.
        :param id:
            The ID (redis value) the lock should have. A random value is
            generated when left at the default.

            Note that if you specify this then the lock is marked as "held". Acquires
            won't be possible.
        :param auto_renewal:
            If set to ``True``, Lock will automatically renew the lock so that it
            doesn't expire for as long as the lock is held (acquire() called
            or running in a context manager).

            Implementation note: Renewal will happen using a daemon thread with
            an interval of ``expire*2/3``. If wishing to use a different renewal
            time, subclass Lock, call ``super().__init__()`` then set
            ``self._lock_renewal_interval`` to your desired interval.
        :param strict:
            If set ``True`` then the ``redis_client`` needs to be an instance of ``redis.StrictRedis``.
        """
        # ... 此处省略参数校验代码
        self._client = redis_client
        self._expire = expire if expire is None else int(expire)
        if id is None:
            self._id = urandom(16)
        elif isinstance(id, bytes):
            self._id = id
        else:
            raise TypeError("Incorrect type for `id`. Must be bytes not %s." % type(id))
        self._name = 'lock:'+name
        self._signal = 'lock-signal:'+name
        self._lock_renewal_interval = (float(expire)*2/3
                                       if auto_renewal
                                       else None)
        self._lock_renewal_thread = None

需要注意的几个点:

  • 可以用id来申明对锁所有权识别,例如客户端的主机名称或者进程号什么的,默认是16个随机字节。
  • 如果指定了锁自动刷新,那刷新间隔会设定在超时的2/3时间。

获取锁

获取锁的相关代码如下,还是选择在代码注释中解析代码会比较直观点。这里和上一段代码一样,省略了参数校验,这是很重要的一步,并且是一个良好的编程习惯,但是限于篇幅这里不做介绍。

def acquire(self, blocking=True, timeout=None):
    """
    :param blocking:
        Boolean value specifying whether lock should be blocking or not.
    :param timeout:
        An integer value specifying the maximum number of seconds to block.
    """
    logger.debug("Getting %r ...", self._name)

    if self._held:  # 锁不可重入
        raise AlreadyAcquired("Already acquired from this Lock instance.")

    # ... 此处省略参数校验代码,如timeout不能大于锁的_expire等各种条件

    busy = True
    blpop_timeout = timeout or self._expire or 0
    timed_out = False
    while busy:
        # 如果set失败则代表锁被占用,返回False
        busy = not self._client.set(self._name, self._id, nx=True, ex=self._expire)
        if busy:
            if timed_out:
                return False
            elif blocking:  # 如果阻塞则在signal列表上监听
                # 如果blpop在blpop_timeout时间内获取到信号通知的话,timeout会被设置为False
                timed_out = not self._client.blpop(self._signal, blpop_timeout) and timeout
            else:
                logger.debug("Failed to get %r.", self._name)
                return False

    logger.debug("Got lock for %r.", self._name)
    if self._lock_renewal_interval is not None: 
        # 如果需要自动刷新锁,则开启刷新线程
        self._start_lock_renewer()
    return True

这里有一个问题,如果在blpop成功获取到信号,并不代表下一次while循环尝试获取就一定成功,如果在此间隙中被其它客户端获得了锁,那该客户端仍然会获取失败,并去阻塞一个timeout时间。也就是说假设这个客户端的网络质量很差,而又恰恰是一个高频请求的锁,那就可能造成它虽然设置了超时,但最终结果可能等待了不止一个timeout时间才拿到结果,而且还可能会一直获取不到锁。

获取锁的开头用一个_held内部属性来判断当前实例是否已经拥有了锁,这里就是上一步中的id属性的用处,来判断锁的拥有者。代码如下:

def get_owner_id(self):
   return self._client.get(self._name)

@property
def _held(self):
   return self.id == self.get_owner_id()

刷新锁

刷新锁比较繁琐,作者用了一个线程在后台定时刷新,不过我们先来看刷新锁的实际操作:extend方法, 这个函数没加下划线前缀也就是允许锁的拥有者自己手动刷新。

def extend(self, expire=None):
    """Extends expiration time of the lock.

    :param expire:
        New expiration time. If ``None`` - `expire` provided during
        lock initialization will be taken.
    """
    # ... 此处省略参数校验代码
    # 这里调用第一步提到的Lua脚本,用索引EXTEND来指定脚本,并将超时时间`expire`和自身识别 id 传入脚本。
    error = _eval_script(self._client, EXTEND, self._name, args=(expire, self._id))
    if error == 1:
        raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
    elif error == 2:
        raise NotExpirable("Lock %s has no assigned expiration time" %
                           self._name)
    elif error:
        raise RuntimeError("Unsupported error code %s from EXTEND script" % error)

这个EXTEND操作的 Lua 脚本如下:

# Covers both cases when key doesn't exist and doesn't equal to lock's id
# 刷新前判断锁是否属于该拥有者,只允许拥有者延长锁的TTL
EXTEND_SCRIPT = b"""
    if redis.call("get", KEYS[1]) ~= ARGV[2] then
        return 1
    elseif redis.call("ttl", KEYS[1]) < 0 then
        return 2
    else
        redis.call("expire", KEYS[1], ARGV[1])
        return 0
    end
"""
EXTEND_SCRIPT_HASH = sha1(EXTEND_SCRIPT).hexdigest()

刷新锁的线程相关代码如下:

def _start_lock_renewer(self):
    """
    Starts the lock refresher thread.
    """
    if self._lock_renewal_thread is not None:
        raise AlreadyStarted("Lock refresh thread already started")

    # 线程事件用来监听刷新是否结束
    self._lock_renewal_stop = threading.Event()
    self._lock_renewal_thread = threading.Thread(
        group=None,
        target=self._lock_renewer,
        kwargs={'lockref': weakref.ref(self),   # 对锁实例做了一个弱引用
                'interval': self._lock_renewal_interval,
                'stop': self._lock_renewal_stop}
    )
    self._lock_renewal_thread.setDaemon(True)
    self._lock_renewal_thread.start()

@staticmethod
def _lock_renewer(lockref, interval, stop):
    """
    Renew the lock key in redis every `interval` seconds for as long
    as `self._lock_renewal_thread.should_exit` is False.
    """
    log = getLogger("%s.lock_refresher" % __name__)
    # 等待终止事件到来,否则在指定超时后返回False
    while not stop.wait(timeout=interval):
        log.debug("Refreshing lock")
        lock = lockref()    # 调用这个弱引用来获取当前锁实例
        if lock is None:    # 如果这个锁已经在其它线程被销毁则对应刷新线程也应该关闭
            log.debug("The lock no longer exists, "
                      "stopping lock refreshing")
            break
        lock.extend(expire=lock._expire)
        del lock    # 删除弱引用
    log.debug("Exit requested, stopping lock refreshing")

def _stop_lock_renewer(self):
    """
    Stop the lock renewer.

    This signals the renewal thread and waits for its exit.
    """
    if self._lock_renewal_thread is None or not self._lock_renewal_thread.is_alive():
        return
    logger.debug("Signalling the lock refresher to stop")
    self._lock_renewal_stop.set()   # 事件通知子线程退出
    self._lock_renewal_thread.join()
    self._lock_renewal_thread = None
    logger.debug("Lock refresher has stopped")

这里主要是一个弱引用问题。传入了Daemon守护进程的变量要格外小心,很容易造成即使主线程已经不再引用这个变量,而守护进程不依赖该变量,却一直引用着,就导致内存无法释放。

释放锁

释放锁的实际操作和刷新锁一样,因为都涉及多个 Redis 命令,所以他们都放在了 Lua 脚本中。

def release(self):
    """Releases the lock, that was acquired with the same object.

    .. note::

        If you want to release a lock that you acquired in a different place you have two choices:

        * Use ``Lock("name", id=id_from_other_place).release()``
        * Use ``Lock("name").reset()``
    """
    if self._lock_renewal_thread is not None:
        # 如果有刷新线程则停止它
        self._stop_lock_renewer()
    logger.debug("Releasing %r.", self._name)
    error = _eval_script(self._client, UNLOCK, self._name, self._signal, args=(self._id,))
    if error == 1:
        raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
    elif error:
        raise RuntimeError("Unsupported error code %s from EXTEND script." % error)
    else:
        self._delete_signal()

def _delete_signal(self):
    self._client.delete(self._signal)

至于UNLOCK脚本操作在第一步里已经展示出来,如果持有该锁则先删除signal列表,再push一个通知到signal列表,最后删除锁。这些步骤都会在一个Lua函数中执行保证原子性。

上下文支持

作者也重载了__enter____exit__两个函数来支持with的上下文调用,也很简单:

def __enter__(self):
    acquired = self.acquire(blocking=True)
    assert acquired, "Lock wasn't acquired, but blocking=True"
    return self

def __exit__(self, exc_type=None, exc_value=None, traceback=None):
    self.release()

示例代码

这里我就直接贴上作者的示例代码:

conn = StrictRedis()
with redis_lock.Lock(conn, "name-of-the-lock"):
    print("Got the lock. Doing some work ...")
    time.sleep(5)
# Eg:
lock = redis_lock.Lock(conn, "name-of-the-lock")
if lock.acquire(blocking=False):
    print("Got the lock.")
else:
    print("Someone else has the lock.")

总结

这个库提供的分布式锁很灵活,是否需要超时?是否需要自动刷新?是否要阻塞?都是可选的。没有最好的算法,只有最合适的算法,用户应该根据自己是场景谨慎选择。喜欢的朋友可以去这个项目的GitHub页面 点个🌟。

另外,Redis这个东西感觉可以做很多事,而且可以做很多高性能的事。尤其在分布式环境下,重点是还支持各种有意思的特性。用它来实现分布式锁就显得再合适不过了。

分享就到这里,谢谢大家!

相关文章

网友评论

    本文标题:Redis分布式锁的Python实现[python-redis-

    本文链接:https://www.haomeiwen.com/subject/optvnqtx.html