为了应对服务器高并发,可以采用锁进行控制。
如果redis是单节点部署,基本上不会出现什么问题。但如果redis是多节点的集群部署,那么使用redis集群作为分布式锁就会存在一些问题。本文参(抄)考(袭)了以下文章。
闲聊Redis分布式锁
基于Redis的分布式锁到底安全吗(上)?
基于Redis的分布式锁到底安全吗(下)?
一、单节点redis锁
1.1 首先客户端获取锁
SET resource_name my_random_value NX PX 30000
如果返回成功,则说明客户端获取锁成功,然后就可以访问公共资源了,如果失败则获取锁失败。对于这条命令,需要注意
my_random_value:必须是一个随机字符,并且唯一。如果不唯一,可能会出现以下情况:
1.客户端1获取资源成功
2.客户端1阻塞超时,锁自动释放
3.客户端2获取锁成功
4.客户端1从阻塞中醒来,释放了客户端2的锁
必须设置NX,表示只有resource_name不存在时才会设置成功,保证只有第一个请求的客户端获取锁成功
PX 30000 表示过期时间为30s,为了保证原子操作必须在SET时设置过期时间
1.2 客户端释放锁
释放锁时使用下面的redis lua脚本执行来保证原子性。
只有当resource_name的值和客户端持有的数据相等时才能够调用del删除resource_name,否则不进行删除操作。从而防止一个客户端释放另一个客户端持有的锁。
image.png
1.3 安全性和可靠性:
分析一下redis锁的原理,我们在redis实例中创建一个键值,同时设置该键值的超时时间。创建该键值的客户端获取锁成功,访问公共资源。同时如果客户端宕机则锁会自动释放。客户端需要释放锁时只需要删除该键即可。但一旦单节点的Redis宕机则不能再提供服务,即使是基于Master-Slave模式的故障切换也是不安全的,例如下面场景
客户端1从Master获取锁
Master宕机,但锁key还没有同步到Slave上
Slave升级为Master
客户端2从新的Master上获取锁成功
总之:当redis服务器宕机的时候会出现bug
二、分布式Redlock
算法实现
- 客户端获取当前时间start_time
- 客户端按照顺序依次向N个Redis节点获取锁操作,这个过程类似于上述单个节点获取锁过程。为了防止在获取某个Redis节点锁超时,客户端会设置一个很小的超时时间(timeout),timeout要远远小于锁本身超时时间。
- 当向所有Redis节点发送获取锁操作完成后,记录当前时间endtime。并且获取锁总消耗时间elapsed_time = (endtime-starttime),可用时长:validity = ttl - elapsed_time - drift,获取锁成功数n。当n > (N/2+1) && validity > 0(或其他值) 获取锁成功,并修改占有锁时长为validity
- 如果获取锁失败,则需要向所有客户端发起释放锁的操作
php源码实现
<?php
class RedLock
{
private $retryDelay; // 重试的间隔时间
private $retryCount;// 重试的次数
private $clockDriftFactor = 0.01; // 不同服务器时间漂移比例因子
private $quorum;// (N/2+1)
private $servers = array();// redis集群
private $instances = array();
function __construct(array $servers, $retryDelay = 200, $retryCount = 3)
{
$this->servers = $servers;
$this->retryDelay = $retryDelay;
$this->retryCount = $retryCount;
$this->quorum = min(count($servers), (count($servers) / 2 + 1));
}
public function lock($resource, $ttl)
{
$this->initInstances();
$token = uniqid();
$retry = $this->retryCount;
do {
$n = 0;
$startTime = microtime(true) * 1000;
foreach ($this->instances as $instance) {
if ($this->lockInstance($instance, $resource, $token, $ttl)) {
$n++;
}
}
# Add 2 milliseconds to the drift to account for Redis expires
# precision, which is 1 millisecond, plus 1 millisecond min drift
# for small TTLs.
$drift = ($ttl * $this->clockDriftFactor) + 2;
$validityTime = $ttl - (microtime(true) * 1000 - $startTime) - $drift;
if ($n >= $this->quorum && $validityTime > 0) {
return [
'validity' => $validityTime,
'resource' => $resource,
'token' => $token,
];
} else {
foreach ($this->instances as $instance) {
$this->unlockInstance($instance, $resource, $token);
}
}
// Wait a random delay before to retry
$delay = mt_rand(floor($this->retryDelay / 2), $this->retryDelay);
usleep($delay * 1000);
$retry--;
} while ($retry > 0);
return false;
}
public function unlock(array $lock)
{
$this->initInstances();
$resource = $lock['resource'];
$token = $lock['token'];
foreach ($this->instances as $instance) {
$this->unlockInstance($instance, $resource, $token);
}
}
private function initInstances()
{
if (empty($this->instances)) {
foreach ($this->servers as $server) {
list($host, $port, $timeout) = $server;
$redis = new Redis();
$redis->connect($host, $port, $timeout);
$this->instances[] = $redis;
}
}
}
private function lockInstance($instance, $resource, $token, $ttl)
{
return $instance->set($resource, $token, ['NX', 'PX' => $ttl]);
}
private function unlockInstance($instance, $resource, $token)
{
$script = '
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
';
return $instance->eval($script, [$resource, $token], 1);
}
}
网友评论