为什么要分布式锁
- 在单机的情况下,可以通过jvm提供的系列线程安全的操作来处理高并发的情况,但是在分布式的环境下,jvm提供的线程安全操作明显是不能满足要求的。在一些小型的互联网公司经常做的crud操作如果在高并发的情况下会出现很大的问题,比如:
//伪代码:下订单
1、查库存:getStock()
2、判断库存:stock>0下单
3、下单:addOrder()
4、减库存
- 仅仅以上三步,如果在高并发的情况下,无论是单机或者集群,如果不加锁一定会出现超卖的情况。一瞬间成千上万个请求过来,如何能够确保查询到的库存是最新的数据?
Redis实现
- 通过redis的setNx方法可以自己简单的实现以下分布式锁,但是在实现之前需要考虑清楚几个问题。
问题与解决思路
-
如何避免死锁?在高并发的情况下很可能由于服务重启,服务器宕机的等情况导致锁没有及时释放,导致其他线程不能获得锁。
- setNx的使用设置一个过期时间,当锁没有手动释放的时候能够超过一定时间自动释放
- 这个时间如何设置?如果业务没有执行完成但是过期时间到了,这个锁释放了,怎么处理?
- 过期时间最好能够是业务执行完成的时间,为了防止时间到了业务没有执行完成,可以分开一个线程或者设置一个定时器,定时的延长这个过期时间,直到当前线程完成业务。
-
如何避免锁的误删(设置过期时间没有设置定时器延长过期时间)?高并发的情况下瞬间几万个请求过来,很有可能A线程执行完成之后,但是B线程没有执行完成(前提:B线程先获得锁先执行并且未执行完成之后过期时间到了删除了自己持有的锁,此时A线程获得锁并率先执行完成),A线程执行deleteKey方法,删除了B的锁。
- 在执行每一个业务逻辑之前先生成一个唯一id作为setNx的value值标识这个线程执行的任务,删除的时候先获取和当前线程的id比对一下,如果不一样,这个锁不是当前的线程持有的。
-
如何保证锁的可重入性?
- 在获取锁的时候先获取锁,比对一下当前的唯一标识,相同的话可重入。
-
如何确保获取锁和释放锁的原子性?在获取锁或者释放锁的过程中如果不是原子操作很有可能导致一系列问题
实现
- 根据上面的思路可以通过redis自己手写一个分布式锁的实现,当然这个例子并没有保证解锁和获得锁的原子性,不喜勿喷。
- redis的工具类:
public class RedisUtils {
private static RedisTemplate redisTemplate=ApplicationContextUtils.applicationContext.getBean("redisTemplate",RedisTemplate.class);
private static StringRedisTemplate stringRedisTemplate=ApplicationContextUtils.applicationContext.getBean(StringRedisTemplate.class);
/**
* setNx
* @param key
* @param value
* @return
*/
public static Boolean setNx(String key,String value){
return stringRedisTemplate.opsForValue().setIfAbsent(key,value);
}
/**
* setNx
* @param key
* @param value
* @param seconds 过期时间,单位秒
* @return
*/
public static Boolean setNx(String key, String value, Long seconds){
return stringRedisTemplate.opsForValue().setIfAbsent(key,value,seconds, TimeUnit.SECONDS);
}
/**
* 删除key
* @param key
* @return
*/
public static Boolean deleteKey(String key){
return redisTemplate.delete(key);
}
/**
* 获取NX设置的值
* @param key
* @return
*/
public static String getNX(String key){
return stringRedisTemplate.opsForValue().get(key);
}
/**
* 设置key的过期时间
* @param key
* @param seconds
* @return
*/
public static Boolean expireKey(String key,Long seconds){
return stringRedisTemplate.expire(key,seconds,TimeUnit.SECONDS);
}
}
public class RedisLock {
/**
* 存储的KEY,每个业务应该不同
*/
private String key;
/**
* 过期时间,单位秒
*/
private Long expireSeconds;
/**
* 异步任务,用于分开一个线程延长过期时间,也可以使用定时器
*/
private TaskAsync taskAsync;
public RedisLock(String key,Long expireSeconds){
this.key=key;
this.expireSeconds=expireSeconds;
taskAsync= ApplicationContextUtils.applicationContext.getBean(TaskAsync.class);
}
/**
* 上锁
* @param uuid setNx的值,唯一标识当前线程
*/
public void lock(String uuid) throws InterruptedException {
//获取value
String nx = RedisUtils.getNX(key);
/**
* 如果此时的uuid和redis中的一致,那么就是可重入的
*/
if (StringUtils.equals(nx,uuid)){
return;
}
//如果不一致,需要设置,为了避免死锁,需要设置一个过期时间
Boolean b = RedisUtils.setNx(key, uuid,expireSeconds);
//加锁失败,每隔两秒重试一次
while(!b){
b = RedisUtils.setNx(key, uuid,expireSeconds);
Thread.sleep(2000);
}
//开启异步线程延长过期时间
taskAsync.delayExpireTime(uuid,key,expireSeconds);
}
/**
* 解锁
* @param uuid 为了避免误删,这里的uuid是唯一标识当前方法执行的,如果和当前方法的相同才能删除
*/
public void unlock(String uuid) {
String value= RedisUtils.getNX(key);
//说明当前线程执行的方法所获得的锁已经被释放了
if (!StringUtils.equals(value,uuid)){
return;
}
Boolean b = RedisUtils.deleteKey(key);
while(!b){
b=RedisUtils.deleteKey(key);
}
}
}
@Component
public class TaskAsync {
/**
* 异步开一个线程延长执行的任务,
* @param uuid 当前线程持有锁的唯一标识
* @param key key
* @param expireSeconds 过期时间
*/
@Async
public void delayExpireTime(String uuid,String key,Long expireSeconds) throws InterruptedException {
//无限循环
while(true){
String nx = RedisUtils.getNX(key);
//key对应的值不存在,或者不等于当前方法的唯一id,直接跳出,不需要延长时间了
if (nx==null||!StringUtils.equals(uuid,nx))
break;
//延长时间
RedisUtils.expireKey(key,expireSeconds);
Thread.sleep(3000L);
}
}
}
Redisson
- Redisson和jedis一样同样是redis的客户端,但是其在解决分布式问题上有着很大的优势,对分布式锁的实现更是封装的更加简洁,能够通过简单的api完成。
- Redisson封装了多种锁,包括重入锁,公平锁,红锁......,这里简单的演示一下重入锁的使用方式。
可重入锁
-
RedissonClient
通过getxxLock(name)
获取不同锁的对象,RLock对应的是可重入锁的接口。与SpringBoot整合之后,配置的方式创建RedissonClient,并且注入了一个处理订单业务的锁:
@Configuration
@EnableConfigurationProperties(value = {RedissonProperties.class})
public class RedissonConfig {
/**
* 注入RedissonClient对象
*/
@Bean
public RedissonClient redissonClient(RedissonProperties redissonProperties){
Config config = new Config();
config.useSingleServer().setAddress(redissonProperties.getAddress()).setPassword(redissonProperties.getPassword()).setDatabase(redissonProperties.getDatabase());
return Redisson.create(config);
}
/**
* 注入可重入锁,用于订单业务
*/
@Bean
public RLock orderLock(@Qualifier(value = "redissonClient") RedissonClient redissonClient,RedissonProperties redissonProperties){
return redissonClient.getLock(redissonProperties.getOrderLock());
}
}
- 模拟订单的下单,如下:
-
void lock(long leaseTime, TimeUnit unit)
:获得锁,leaseTime设置的过期时间,unit是时间单位,如果设置了-1,redisson会设置默认的时间30秒,这个时间可以在config配置中修改,具体看文档。锁的值是UUID:线程Id
(作为唯一标识)
-
void unlock()
:解锁
public void add(String goodsId) throws Exception {
try {
//获取锁
rLock.lock(10, TimeUnit.SECONDS);
//检查库存,存储在redis中
Integer stock = Integer.valueOf(stringRedisTemplate.opsForValue().get(goodsId));
if (stock<0)
return;
//减库存
Long increment = stringRedisTemplate.opsForValue().increment(goodsId, -1);
if (increment<0)
return;
//减库存成功,下单
Order order = Order.builder().build();
orderMapper.add(order);
}finally {
//解锁
rLock.unlock();
}
}
文档
网友评论