关于分布式锁
我们学习过并发编程中的锁机制:synchronized和Lock。在单进程的系统中,当多线程同时修改某个变量时,就需要对变量或者代码块做同步,使其在修改该变量时能过线性执行消除并发修改变量。而同步的本质就是通过锁来实现的。为了实现多个线程在同一时刻同一个代码块只能一个线程执行,那么就需要在某个地方做个标记,这个标记必须要每个线程都可见,在标记不存在时,可以设置该标记,其余后续线程发现已经有标记了,就只能等待拥有标记的线程执行完代码块删除标记后,在进行尝试设置标记
分布式环境下,数据一致性问题也是必要重要的话题,而又与单进程情况不同。分布式与单机最大的不同就是,分布式不是多线程而是多进程。多线程由于可以共享堆内存,因此可以简单的采取内存作为标记存储位置。而进程之间甚至可能都不在同一台物理机上,因此需要将标准存储在一个所有进程都能看见的地方
常见的是秒杀的场景,订单服务部署了多个实例。例如秒杀商品有4个,第一个用户购买3个,第二个用户购买2个,理想情况下应该是第一个用户可以购买成功,第二个用户提示购买失败。而实际的上可能会出现的情况是两个用户看到的商品数量都是4个,而第一个用户买了3个之后,还没更新库,第二个用户下了2个商品的订单,更新库存就会出错
在上面的场景中,商品的库存是共享变量,面对高并发的情形,需要保证对资源的访问互斥。在单机环境中,Java中其实提供了很多并发处理相关的api,但是这些api在分布式场景中就无能为力了。也就是说单纯的Java api并不能提供分布式锁的能力。分布式系统中,由于分布式系统的分布性,即多线程和多进程并且分布在不同的机器中,synchronized和Lock这两种锁将失去原有锁的效果,需要我们自己实现分布式锁
常见的锁方案:
- 基于数据库实现的分布式锁
- 基于缓存实现的分布式锁,例如Redis
- 基于zookeeper实现分布式锁
基于Redis实现的分布式锁
SETNX
使用Redis的setnx实现分布式锁,多进程执行以下Redis命令:
SETNX lock.id <current Unix time + lock timeout + 1>
SETNX是将key的值设为value,当且仅当key不存在。若给定的key已经存在,则SETNX不做任何操作。
- 返回1,说明该进程获得所,SETNX将键lock.id的值设置为锁的超时时间,当前时间+加上锁的有效时间。
- 返回0,说明其他进程已经获得了锁,进程不能进入临界区。进程可以在一个循环中不断地尝试 SETNX操作,已获得锁
存在死锁的问题
SETNX实现分布式锁,可能会存在死锁的情况。与单机模式下的锁相比,分布式环境下不仅需要保证进程可见,还需要考虑进程与锁之间的网络问题。某个线程获取了锁之后,断开了与Redis的连接,锁没有及时释放,竞争该锁的其他线程都会hung,产生死锁的情况。
在使用SETNX获得锁时,我们将键lock.id的值设置为锁的有效时间,线程获得锁后,其他线程还会不断地检索锁是否已超时,如果超时,等待的线程也将有机会获得锁。然而,锁超时,我们不能简单的使用del命令删除键lock.id已释放锁。
考虑情况如下:
1. A已经首先获得了锁lock.id,然后线A断线。B,C都在等待竞争该锁;
2. B,C读取lock.id的值,比较当前时间和键lock.id的值来判断是否超时,发现超时;
3. B执行del lock.id命令,并执行SETNX lock.id命令,并返回1,B获得锁;
4. C由于刚刚检测到锁已超时,执行DEL lock.id命令,将B刚刚设置的键lock.id删除,执行SETNX lock.id命令,并返回1,
即C获得锁。
上面的步骤很明显出现了问题,导致B,C同时获取了锁。在检测超时后,线程不能直接简单的执行DEL删除键的操作已获得锁。
对于上面的操作步骤进行改进,问题是出在删除键的操作上面,那么获取锁之后应该怎么改进呢?首先看一下Redis的getset这个操作,GETSET key value
,将给定key的值设为value,并返回key的旧值(old value)。利用这个操作指令,我们改进一下上述步骤。
1. A已经首先获得了锁lock.id,然后A断线。B,C都在等待竞争该锁;
2. B,C读取lock.id的值,比较当前的时间和键lock.id的值来判断是否超时,发现超时;
3. B检测到锁已超时,即当前的时间大于键lock.id的值,B执行
GETSET lock.id <current Unix timestamp + lock timeout + 1>
设置时间戳,通过比较键lock.id的旧值是否小于当前时间,判断进程是否已获得锁;
4. B发现GETSET返回的值小于当前时间,则执行DEL lock.id命令,并执行SETNX lock.id命令,并返回1,B获得锁;
5. C执行GETSET得到的时间大于当前时间,则继续等待
在线程释放锁,即执行DEL lock.id操作之前,需要判断锁是否已超时。如果锁已超时,那么所可能已有其他线程获得,这是直接执行DEL lock.id操作会导致把其他线程已获得的锁释放掉
根据上面的逻辑,代码实现方式:基于StringRedisTemplate实现
@Slf4j
public class TestLock {
@Autowired
private StringRedisTemplate stringRedisTemplate;
//添加锁方法
public boolean lock(String key ,String value) throws InterruptedException {
//相当于setnx方法,将key-value设置进去,如果存在值则不进行任何操作
if (stringRedisTemplate.opsForValue().setIfAbsent(key,value)) {
return true;
}
//获取当前key的value(获取当前key对应的过期时间)
String currentValue = stringRedisTemplate.opsForValue().get(key);
long currentValueLong = Long.parseLong(currentValue);
// 判断当前key对应的value(过期时间),如果不为空并且小于当前时间,就说明上一个锁已经过期
if (StringUtils.isNotEmpty(currentValue) && currentValueLong < System.currentTimeMillis()) {
//执行GETSET方法将新的key和过期时间存储,并返回旧值(旧值指上一个key对应的value)
String oldValue = stringRedisTemplate.opsForValue().getAndSet(key, value);
//判断旧值不为空,并且旧值等于上一个value,说明
if (StringUtils.isNotEmpty(oldValue) && oldValue.equals(currentValue)) {
return true;
}
}
return false;
}
//释放锁
public void unLock(String key, String value) {
try {
// 获取当前redis中key对应值value
String currentValue = stringRedisTemplate.opsForValue().get(key);
// 校验当前值是否为空,且获取值是否与传递值一致
if (!StringUtils.isEmpty(currentValue) && currentValue.equals(value)) {
// 删除redis中key信息
stringRedisTemplate.opsForValue().getOperations().delete(key);
}
} catch (Exception e) {
log.error("【redis分布式锁】减锁异常", e);
}
}
}
另外一种实现逻辑:
1. A首先带着lock.id,过期时间和尝试时间通过setnx获取锁。随机生成一个值,作为key对应的value值,并设置key对应的
过期时间,然后返回当前value,表示获取锁成功
2. B,C读取lock.id的值,带着相应的过期时间和尝试时间,来竞争该锁。
3. B,C都通过setnx方法尝试,如果B通过setnx返回1,那么说明A已经结束,B将设置对应的key和value,以及相应的过期时间。
C则通过方法setnx返回0,并返回null,表示获取锁失败,继续等待
具体代码实现如下:基于jedis实现
@Slf4j
public class RedisLock {
private static Logger logger = LoggerFactory.getLogger(RedisLock.class);
private JedisPool jedisPool;
public RedisLock(JedisPool jedisPool) {
this.jedisPool = jedisPool;
}
public String lock(String key) {
return this.lockWithTimeout(key, 300L, 3000L);
}
/***
* redis实现分布式锁,入参key,acquireTimeout(尝试获取时间,单位:ms),expirationTime(key的到期时间,单位:ms)
* 该方法使用的是Redis中的setnx方法来获取锁
* */
public String lockWithTimeout(String key, long acquireTimeout, long expirationTime) {
Jedis conn = null;
//返回对象
Object retIdentifier = null;
try {
conn = this.jedisPool.getResource();
String identifier = UUID.randomUUID().toString();
//重新命名key
String lockKey = "lock:" + key;
//单位换算,由ms换算为s
int lockExpire = (int)(expirationTime / 1000L);
//计算重试的结束时间,如果入参acquireTimeout为0,尝试结束时间就是在当前时间加300ms,如果入参acquireTimeout不为0,尝试结束时间就是在当前时间加acquireTimeout,
long end = System.currentTimeMillis() + (acquireTimeout == 0L ? 300L : acquireTimeout);
//判断当前时间是否小于尝试结束时间
while(System.currentTimeMillis() < end) {
//使用setnx方法判断是否有值,如果有值,则线程睡眠10ms,然后返回空,获取锁失败
if (conn.setnx(lockKey, identifier) == 1L) {
//设置当前key的有效期,单位秒
conn.expire(lockKey, lockExpire);
String var13 = identifier;
//返回当前value值,获取锁成功。
return var13;
}
try {
Thread.sleep(10L);
} catch (InterruptedException var18) {
Thread.currentThread().interrupt();
}
}
return (String)retIdentifier;
} catch (JedisException var19) {
logger.error("JedisException:" + var19 + "\t key:" + key);
return (String)retIdentifier;
} finally {
if (conn != null) {
conn.close();
}
}
}
//释放锁
public boolean unLock(String key, String identifier) {
Jedis conn = null;
String lockKey = "lock:" + key;
boolean retFlag = false;
try {
conn = this.jedisPool.getResource();
if (identifier.equals(conn.get(lockKey))) {
conn.del(lockKey);
retFlag = true;
}
} catch (JedisException var10) {
logger.error("JedisException:" + var10 + "\t key:" + key);
} finally {
if (conn != null) {
conn.close();
}
}
return retFlag;
}
}
网友评论