假设有这样一个场景,在一个购票软件上买一张票,但是此时剩余票数只有一张或几张,这个时候有几十个人都在同时使用这个软件购票。在不考虑任何影响下,正常的逻辑是首先判断当前是否还有剩余的票,如果有,那么就进行购买并扣减库存数,否则就会提示票数不足,购买失败。伪代码如下:
void buyTicket() {
int stockNum = byTicketMapper.selectStockNum();
if(stockNum>0){
//TODO 买票流程....
byTicketMapper.reduceStock(); // 扣减库存
}else{
log.info("=====>票卖完了<====");
}
}
这段代码在逻辑上没有问题,但是在并发场景下,可能会存在一个严重的问题。当剩余票数为1时,有A,B两个用户同时点击了购买按钮,A用户通过了库存大于0的校验并开始执行购票逻辑,但是由于一些原因造成A用户的购票线程有短暂的阻塞。而在这个阻塞的过程中,用户B发起了购买请求,并且也通过了库存大于0的校验,直到整个购买流程执行完成并且扣减了库存。那么这个时候剩余库存刚好为0,不会再有用户发起购买请求,这时用户A的购买请求阻塞被唤醒,因为在此之前已经校验过库存大于0,所以执行完购买流程后,库存还会被扣减一次。那么此时的库存为-1,这就是常听到的超卖问题。
为了避免这个问题,我们可以通过加锁了方式,来保证并发的安全性。像JVM提供的内置锁synchronized,JUC提供的重入锁ReentrantLock,但是这两种锁只能保证单机环境下并发安全问题,一般在实际工作中很少会部署单节点的项目,通常都是多节点集群部署,这两个锁就失去了意义。这个时候就可以借助redis来实现分布式锁。
setnx
在集群部署的情况下,通常使用redis来实现分布式锁。其中redis提供了setnx命令,标识只有key不存在时才能设值成功,从而达到加锁的效果。下面通过redis来改造上述的代码,其方式是购票线程首先获取锁,如果获取锁成功,那么继续执行购票业务流程,直到所有流程执行完成并扣减库存后,最终在释放锁。如果获取锁失败,那么就给出一个友好的系统提示。
void buyTicket() {
// 获取锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "1");
if (lock) {
int stockNum = byTicketMapper.selectStockNum();
if(stockNum>0){
//TODO 买票流程....
byTicketMapper.reduceStock(); // 扣减库存
}else{
log.info("=====>票卖完了<====");
}
// 释放锁
redisTemplate.delete("lock");
} else {
log.info("=====>系统繁忙,请稍后!<====");
}
}
问题1:死锁问题
通过上面的一顿梭哈,你以为这样就可以了吗,其实不然。设想一下,如果线程A在获取锁成功后,在执行购票的逻辑中出现了异常,那么这个时候就会造成锁得不到释放,其他线程始终获取不到锁,这就造成严重的死锁问题。为了避免死锁问题的出现,我们可以对异常进行捕获,在finally中去释放锁,这样不管业务执行成功或失败,最后都会去释放锁。
void buyTicket() {
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "1");
if (lock) {
try {
int stockNum = byTicketMapper.selectStockNum();
if (stockNum > 0) {
//TODO 买票流程....
byTicketMapper.reduceStock(); // 扣减库存
} else {
log.info("=====>票卖完了<====");
}
}finally {
redisTemplate.delete("lock"); // 释放锁
}
} else {
log.info("=====>系统繁忙,请稍后!<====");
}
}
你以为这就结束了吗?死锁就不会发生了吗?如果你认为这样就能避免死锁的发生,那你就太不细心啦。如果在程序刚想像执行释放锁的逻辑时,redis服务突然宕机了,那么这时锁释放就失败了。在将redis服务重启后,加锁的数据又被恢复了,这样又出现了死锁的现象。为了避免这个问题,可以为锁设置一个过期时间,这样即使redis重启恢复数据后,也会很快的过期掉。不过需要注意的是,在设置锁的过期时间时,一定要保证原子性操作,不然还是会出现死锁问题。
//不是原子操作,会出现死锁问题
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "1");
//如果刚要执行该语句时,redis宕机了。上面的锁无法释放
redisTemplate.expire("lock",Duration.ofSeconds(5L));
//原子操作
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "1", Duration.ofSeconds(5L));
问题2:锁被其他线程释放问题
经过上面的又一顿梭哈,死锁的问题可以避免了,这样在高并发的情况下就能安全的执行了吗。如果锁的过期时间设置了5秒,当A线程发起购票请求并获取到了锁,但是A线程在执行购票流程时花费了6秒,此时线程A的锁已经过期。这时线程B重新获取了锁并且也开始执行购票流程,但是A线程要比B线程执行的要快,当A线程释放锁时,问题就出现了。由于A线程执行的过程锁已经过期了,那么在执行释放锁的流程时,最终被释放的是线程B的锁,这就导致B的锁被A线程释放问题。
对于这个现象,可以给每个锁设置一个唯一标识,比如像UUID,线程ID。在释放锁时,校验一下这个锁的标识是否为需要删除的锁,如果是,在进行锁的释放。
public void buyTicket() {
String uuid = UUID.randomUUID().toString();
// 为锁设置一个唯一标识
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, Duration.ofSeconds(5L));
if (lock) {
try {
int stockNum = byTicketMapper.selectStockNum();
if (stockNum > 0) {
//TODO 买票流程....
byTicketMapper.reduceStock(); // 扣减库存
} else {
log.info("=====>票卖完了<====");
}
}finally {
String lockValue = redisTemplate.opsForValue().get("lock");
if(lockValue.equals(uuid)){ //校验标识,通过则释放锁
redisTemplate.delete("lock");
}
}
} else {
log.info("=====>系统繁忙,请稍后!<====");
}
}
问题3:锁续期问题
使用setnx命令做分布式锁时,无法避免的一个问题就是:线程尚未执行完成,但是锁已经过期。在解决锁被其他线程误删的代码中,并不是100%能解决的,问题点在于下面这段代码。如果线程A已经执行到了if语句并且通过了判断,当刚要执行释放锁的逻辑时,线程A的锁过期了并且线程B重新获取到了锁,那么线程A在释放锁时,释放的是B的锁。为了完全能够解决这个问题,可以采用锁续期的方式,其实现方式是单独开一个线程用来定时监听线程的锁是否还被持有,如果还持有,那么就给这把锁增加一些过期时间,这样就不会出现上述问题了。目前市面上已经为我们提供了锁自动续期的中间件,比如redisson
String lockValue = redisTemplate.opsForValue().get("lock");
if(lockValue.equals(uuid)){ // 线程A的锁过期
redisTemplate.delete("lock"); // 线程A删除了线程B的锁
}
Redisson
redisson一般使用最多的场景就是分布式锁了,它不仅保证了并发场景下线程安全的问题,也解决了锁续期的问题。使用方式也比较简单,以3.5.7版本为例,首先需要配置redisson信息,根据自己的redis集群模式自由选择配置。在配置完成后,再来改造上面的购票方法。
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
// 单机配置
config.useSingleServer().setAddress("redis://127.0.0.1:3306").setDatabase(0);
// 主从配置
// config.useMasterSlaveServers().setMasterAddress("").addSlaveAddress("","");
// 哨兵配置
// config.useSentinelServers().addSentinelAddress("").setMasterName("");
// Cluster配置
//config.useClusterServers().addNodeAddress("");
return Redisson.create(config);
}
对于redisson使用起来也非常简单,通过getLock方法获取到RLock对象。通过RLock的tryLock或lock方法来进行加锁(底层都是通过Lua脚本来实现的)。当获取到锁并且扣减库存后,可以使用unlock方法进行锁释放。
void buyTicket() {
RLock lock = redissonClient.getLock("lock");
if (lock.tryLock()) { // 获取锁
try {
int stockNum = byTicketMapper.selectStockNum();
if (stockNum > 0) {
//TODO 买票流程....
byTicketMapper.reduceStock(); // 扣减库存
} else {
log.info("=====>票卖完了<====");
}
} finally {
lock.unlock(); //释放锁
}
} else {
log.info("=====>系统繁忙,请稍后!<====");
}
}
- Watch Dog机制
那redisson是如何做到锁续期的呢?其实在redisson内部有一个看watch dog机制(看门狗机制),但是看门狗机制并不是在加锁时就能启动的。需要注意的是在加锁时,如果使用tryLock(long t1,long t2, TimeUnit unit)或lock(long t1,long t2, TimeUnit unit)方法并且将t2参数值设为了一个不为-1的值,那么看门口将无法生效。看门狗在启动后会监听主线程还在执行,如果还在执行那么将会通过Lua脚本每10秒给锁续期30秒。watchlog的延时时间默认为30秒,这个值可以在配置config时自己定义。
private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1L) { // 如果leaseTime不是-1,那么将无法使用看门狗
return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
} else {
RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.addListener(new FutureListener<Boolean>() {
public void operationComplete(Future<Boolean> future) throws Exception {
if (future.isSuccess()) {
Boolean ttlRemaining = (Boolean)future.getNow();
if (ttlRemaining) {
// 看门口机制
RedissonLock.this.scheduleExpirationRenewal(threadId);
}
}
}
});
return ttlRemainingFuture;
}
}
private long lockWatchdogTimeout = 30000L; //默认30秒
private void scheduleExpirationRenewal(final long threadId) {
if (!expirationRenewalMap.containsKey(this.getEntryName())) {
// 每10秒执行续期
Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
// 通过LUA脚本为锁续期
RFuture<Boolean> future = RedissonLock.this.commandExecutor.evalWriteAsync(RedissonLock.this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(RedissonLock.this.getName()), new Object[]{RedissonLock.this.internalLockLeaseTime, RedissonLock.this.getLockName(threadId)});
future.addListener(new FutureListener<Boolean>() {
public void operationComplete(Future<Boolean> future) throws Exception {
RedissonLock.expirationRenewalMap.remove(RedissonLock.this.getEntryName());
if (!future.isSuccess()) {
RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", future.cause());
} else {
if ((Boolean)future.getNow()) {
RedissonLock.this.scheduleExpirationRenewal(threadId);
}
}
}
});
}
}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS); // 每10秒执行一次
if (expirationRenewalMap.putIfAbsent(this.getEntryName(), task) != null) {
task.cancel();
}
}
}
问题4:主从切换导致锁丢失问题
虽然redisson帮助我们解决了锁续期的问题,但是在redis集群架构中,由于主从复制具有一定的延时,那么在极端情况下就会出现这样一个问题:当一个线程获取锁成功,并且成功向主节点保存了锁信息,当主节点还未像从节点同步锁信息时,主节点宕机了,那么当发生故障转移从节点切换为主节点时,线程加的锁就丢失了。为了解决这个问题,redis引入了红锁RedLock
,RedLock与大多数中间件的选举机制类似,采用过半的方式来决定操作成功还是不成功。
RedLock
- 加锁
RedLock在工作中,并不接受redis的集群架构,无论是主从,哨兵还是Cluster。每台redis服务都是独立的,都是一台独立的Master节点。在加锁的过程中,RedLock会记录开始加锁时的时间以及加锁成功后的时间,这两个时间差就是一台机器加锁成功所需要的时间。比如启动了5个redis服务,线程A设置锁的超时时间为5秒,当像第一台redis服务加锁成功后花费了1秒,像第二台服务加锁成功后也花费了一秒。这个时候加到第二台机器时,已经花费了两秒的时间,但是加锁数并未过半,还需要加锁一台才能完全算加锁成功,这个时候第三台机器加锁成功又花费了1秒。那么总的加锁时间就是3秒,锁的实际过期时间就为2秒。特别需要注意的是,在向redis服务建立网络连接时,要设置一个超时时间,避免redis服务宕机时,客户端还在傻傻的等待回应,这里超时时间官方给到建议是5-50毫秒之间,当连接超时时,客户端会继续向下一个节点发起连接。
- 加锁失败
如果因为某些原因,获取锁失败(加锁没有超半数或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁,即便某些Redis实例根本就没有加锁成功。
- 失败重试
在并发场景下,RedLock会出现这样一个问题,比如有三个线程同时去获取了同一张票的锁,此时A线程已经成功给redis-1和reids-2加上了锁,线程B已经成功给redis-3,reids-4加上了锁,线程C成功的给reids-5加上了锁,这个时候三个线程再去加锁时,没有机器可加了,发现加锁成功数都未过半,那么就导致客户端始终获取不到锁。
当客户端无法取到锁时,应该在随机延迟一定时间,然后进行重试,防止多个客户端在同时抢夺同一资源的锁。
- 释放锁
释放锁比较简单,向所有的Redis实例发送释放锁命令即可,不用关心之前有没有从Redis实例成功获取到锁.
在了解了RedLock后,最后再来改造购票的代码逻辑。首先需要根据redis的实例数来定义对应的Bean实例,redis的实例最少要有三台。
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
// 单机配置
config.useSingleServer().setAddress("redis://192.168.36.128:3306").setDatabase(0);
return Redisson.create(config);
}
@Bean
public RedissonClient redissonClient2() {
Config config = new Config();
// 单机配置
config.useSingleServer().setAddress("redis://192.168.36.130:3306").setDatabase(0);
return Redisson.create(config);
}
@Bean
public RedissonClient redissonClient3() {
Config config = new Config();
// 单机配置
config.useSingleServer().setAddress("redis://192.168.36.131:3306").setDatabase(0);
return Redisson.create(config);
}
在配置完成后,为每台实例都设置同一把锁,最后在调用RedissonRedLock提供的tryLock和unlock进行加锁和解锁。
void buyTicket(){
RLock lock = redissonClient.getLock("lock");
RLock lock2 = redissonClient2.getLock("lock");
RLock lock3 = redissonClient3.getLock("lock");
RedissonRedLock redLock = new RedissonRedLock(lock,lock2,lock3); // 分别像三台实例加锁
if (redLock.tryLock()) {
try {
int stockNum = byTicketMapper.selectStockNum();
if (stockNum > 0) {
//TODO 买票流程....
byTicketMapper.reduceStock(); // 扣减库存
} else {
log.info("=====>票卖完了<====");
}
} finally {
redLock.unlock(); //释放锁
}
} else {
log.info("=====>系统繁忙,请稍后!<====");
}
}
总结
在使用reids做分布式锁时,并没有想象中的那么简单,高并发场景下容易出现死锁,锁被其他线程误删,锁续期,锁丢失等问题,在实际开发中应该考虑到这些问题并根据相应的解决办法来解决这些问题,从而保证系统的安全性。本文中可能会存在一些遗漏或错误,后续会继续跟进。
作者:清名
链接:https://juejin.cn/post/7174312109017137165
来源:稀土掘金
网友评论