分布式锁四个特征
1、互斥性,任务时刻只有一个进程取得锁
2、防死锁,惹加锁进程已经失去联系,则锁需要释放
3、加锁和解锁需要同一个进程,
4、锁需要锁缓期
分布式锁方案
1、基于Redis进行分布式锁
2、基于Zookeeper进行分布式锁
3、基于Mysql进行分布式锁
下面分析基于Redis进行分布式锁,加锁流程
1、基于Redis执行命令是单线程,定义一个唯一Key,value可以是uuid,用来保护加锁和解锁是同一进程
2、利用set Nx 、exits等命令进行加锁,再设置key超时时间,避免死锁
3、这样子可以实现分布式锁,但是它没有锁缓期的功能
官方推荐使用Redsson
1、我们先引用对应的jar包
image.png
2、再配置一下RedissonConfig,我使用了简单的配置,如果我对其他配置进行设置,可以看Config里面有东东。
image.png
image.png
image.png
3、这样子一个分布式锁就完成了
Redsson源码分析
我们进去tryLock方法
image.png
一直从代码找,找到tryAcquireAsync,你发现leaseTime,其实它就是一个Watch Dog机制
image.png
在tryLockInnerAsync方法中,发现加锁是执行一个加锁lua脚本,大概如下
if (redis.call('exists', KEYS[1]) == 0) //检查key是否存在
then redis.call('hincrby', KEYS[1], ARGV[2], 1); //hash 加1
redis.call('pexpire', KEYS[1], ARGV[1]); //设置有效期,耗秒级
return nil;
end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) //检查hash 结构,key是否存在
then redis.call('hincrby', KEYS[1], ARGV[2], 1); //hash 加1
redis.call('pexpire', KEYS[1], ARGV[1]); //设置有效期,耗秒级
return nil;
end;
return redis.call('pttl', KEYS[1]); //返回锁过期时间,耗秒级
或加锁成功则返回null,否则返回key的ttl,
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
//执行lua脚本,获取锁
Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
//获取到锁
return true;
} else {
//判断是否超过获取锁的等待时间
time -= System.currentTimeMillis() - current;
if (time <= 0L) {
//超过等待时间,获取锁失败
this.acquireFailed(waitTime, unit, threadId);
return false;
} else {
current = System.currentTimeMillis();
//订阅当前线程id,释放事件
RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
//阻塞等待锁释放,返回false,表示当前等待时间超过锁最大等待时间,取消订阅,返回加锁失败,返回true,则不停等待
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
this.unsubscribe(subscribeFuture, threadId);
}
});
}
this.acquireFailed(waitTime, unit, threadId);
return false;
} else {
try {
//超过等待时间,获取锁失败
time -= System.currentTimeMillis() - current;
if (time <= 0L) {
//超过等待时间,获取锁失败
this.acquireFailed(waitTime, unit, threadId);
boolean var20 = false;
return var20;
} else {
boolean var16;
//do..while不停尝试获取锁
do {
long currentTime = System.currentTimeMillis();
ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
var16 = true;
return var16;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0L) {
//超过等待时间,获取锁失败
this.acquireFailed(waitTime, unit, threadId);
var16 = false;
return var16;
}
currentTime = System.currentTimeMillis();
// 利用共享锁来阻塞等待判断是否允许等待共享锁,允许则加入共享锁等待释放信号
// 1、latch其实是个信号量Semaphore,调用其tryAcquire方法会让当前线程阻塞一段时间,避免了在while循环中频繁请求获取锁;
// 2、该Semaphore的release方法,会在订阅解锁消息的监听器消息处理方法org.redisson.pubsub.LockPubSub#onMessage调用;当其他线程释放了占用的锁,会广播解锁消息,监听器接收解锁消息,并释放信号量,最终会唤醒阻塞在这里的线程。
if (ttl >= 0L && ttl < time) {
((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
} while(time > 0L);
this.acquireFailed(waitTime, unit, threadId);
var16 = false;
return var16;
}
} finally {
this.unsubscribe(subscribeFuture, threadId);
}
}
}
}
}
网友评论