笔者最近在面试过程中,发现面试官喜欢面试关于redis的分布式锁的实现。为了更加清晰地了解加锁工程,然后看了下redisson下封装的锁的操作过程。
redis锁的实现是一个学习redis的难点,那么了解其原理可以让我们更好的使用好lock。
首先聊聊单体redis下如何实现锁的。
单体redis模式下的锁实现
接下来先看下加锁的实现。
加锁
// 构造redisson实现分布式锁必要的Config
Config config = new Config();
config.useSingleServer().setAddress("redis://172.201.1.180:5379").setPassword("a123456").setDatabase(0);
// 构造RedissonClient
RedissonClient redissonClient = Redisson.create(config);
// 设置锁定资源名称
RLock disLock = redissonClient.getLock("DISLOCK1");
boolean isLock;
try {
//尝试获取分布式锁
isLock = disLock.tryLock(500, 5000, TimeUnit.MILLISECONDS);
if (isLock) {
//TODO if get lock success, do something;
Thread.sleep(5000);
}
} catch (Exception e) {
} finally {
// 无论如何, 最后都要解锁
disLock.unlock();
}
首先看到trylock的源码:
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
return this.tryLock(waitTime, -1L, unit);
}
具体进入到trylock中。
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
final long threadId = Thread.currentThread().getId();
//要注意到从上面得到的leaseTime值为-1L。
Long ttl = this.tryAcquire(leaseTime, unit, threadId);
//如果ttl为空,说明获取到锁,当前没有线程加锁。
if (ttl == null) {
return true;
} else {
time -= System.currentTimeMillis() - current;
//规定时间内没有获取到锁,然后加锁失败。
if (time <= 0L) {
this.acquireFailed(threadId);
return false;
} else {
//
current = System.currentTimeMillis();
//对当前线程进行订阅。
final RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
if (!this.await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
//对该位置进行监听。
if (subscribeFuture.isSuccess()) {
RedissonLock.this.unsubscribe(subscribeFuture, threadId);
}
}
});
}
this.acquireFailed(threadId);
return false;
} else {
try {
//经过上面的处理,看下现在时间是否过期
time -= System.currentTimeMillis() - current;
if (time <= 0L) {
//过期了就获取失败
this.acquireFailed(threadId);
boolean var20 = false;
return var20;
} else {
boolean var16;
do {
//然后使用tryacquire去获取
long currentTime = System.currentTimeMillis();
//下面对tryAcquire进行获取。
ttl = this.tryAcquire(leaseTime, unit, threadId);
//ttl是空,说明获取到锁
if (ttl == null) {
var16 = true;
return var16;
}
time -= System.currentTimeMillis() - currentTime;
//看时间是否到了,到了就过期了。
if (time <= 0L) {
this.acquireFailed(threadId);
var16 = false;
return var16;
}
currentTime = System.currentTimeMillis();
//ttl 大于0,并且ttl小于过期时间,那么尝试去获取锁。
if (ttl >= 0L && ttl < time) {
this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
this.getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
} while(time > 0L);
this.acquireFailed(threadId);
var16 = false;
return var16;
}
} finally {
//最终释放掉订阅。
this.unsubscribe(subscribeFuture, threadId);
}
}
}
}
}
从 Long ttl = this.tryAcquire(leaseTime, unit, threadId)这句进去到tryAcuire中
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
// 从上面得到leastTime的值为-1L。那么会跳过上面的判断,走下面的路径。
if (leaseTime != -1L) {
return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
//先按照30秒的过期时间来执行获取锁的方法
RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(30L, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
//如果还持有这个锁,则开启定时任务不断刷新该锁的过期时间
// 异步获取结果,如果获取锁成功,则启动定时线程进行锁续约
ttlRemainingFuture.addListener(new FutureListener<Long>() {
public void operationComplete(Future<Long> future) throws Exception {
if (future.isSuccess()) {
Long ttlRemaining = (Long)future.getNow();
//如何续约,看下scheduleExpirationRenewal实现方法。
//拿到了锁
if (ttlRemaining == null) {
RedissonLock.this.scheduleExpirationRenewal(threadId);
}
}
}
});
return ttlRemainingFuture;
}
}
先看tryLockInnerAsync尝试去获取到锁,这里里面的实现主要是通过lua脚本来实现的,保证了原子性。
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit,
long threadId, RedisStrictCommand<T> command) {
//过期时间
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
//如果锁不存在,则通过hset设置它的值,并设置过期时间
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//如果锁已存在,并且锁的是当前线程,则通过hincrby给数值递增1
// 续约internalLockLeaseTime(30s)
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//如果锁已存在,但并非本线程,则返回过期时间ttl
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
然后使用定时任务去给这个锁续约(重点)。
那接下来我们就看到scheduleExpirationRenewal方法的源码实现。
private void scheduleExpirationRenewal(final long threadId) {
if (!expirationRenewalMap.containsKey(this.getEntryName())) {
Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
//future尝试获取到锁。
RFuture<Boolean> future = RedissonLock.this.commandExecutor.evalWriteAsync(RedissonLock.this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//如果获取到改锁,然后给改锁进行续约。
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)
then redis.call('pexpire', KEYS[1], ARGV[1]); return 1;
end; return 0;",
Collections.singletonList(RedissonLock.this.getName()), new Object[]{RedissonLock.this.internalLockLeaseTime, RedissonLock.this.getLockName(threadId)});
future.addListener(new FutureListener<Boolean>() {
public void operationComplete(Future<Boolean> future) throws Exception {
RedissonLock.expirationRenewalMap.remove(RedissonLock.this.getEntryName());
// 如果future为非succes,那么就没法去更新lock的时间了。
if (!future.isSuccess()) {
RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", future.cause());
} else {
//如果获取到了锁,然后就循环给锁去做续约,做上面的操作。
if ((Boolean)future.getNow()) {
RedissonLock.this.scheduleExpirationRenewal(threadId);
}
}
}
});
}
//每次间隔租期的1/3时间执行这个续约。
}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
if (expirationRenewalMap.putIfAbsent(this.getEntryName(), task) != null) {
task.cancel();
}
}
}
上面就是trylock获取锁并进行续约的流程。
然后看一下tryAcquire的实现。有意思的是这里是用了信号量Semaphore的tryAcqurie方法实现。
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
另外强调一点的是:如果tryLock的时候没有设置leasetime为-1L,那么直接去使用
return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG)。
不会有续约的情况了。同时我们也看见里面也实现了可重入锁。
接下来看下解锁的实现。
解锁
public void unlock() {
//这里主要是看this.unlockInnerAsync方法的实现。
Boolean opStatus = (Boolean)this.get(this.unlockInnerAsync(Thread.currentThread().getId()));
//按照提示,得到如果为null,那么没有得到当前的锁。
if (opStatus == null) {
throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + Thread.currentThread().getId());
} else {
if (opStatus) {
this.cancelExpirationRenewal();
}
}
}
然后我们进入到unlockInnerAsync中去看下实现原理。
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
// 释放锁时需要在redis实例上执行的lua命令
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 如果分布式锁KEY不存在,那么向channel发布一条消息
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
// 如果分布式锁存在,但是value不匹配,表示锁已经被占用,那么直接返回
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
// 如果就是当前线程占有分布式锁,那么将重入次数减1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
// 重入次数减1后的值如果大于0,表示分布式锁有重入过,那么只设置失效时间,还不能删除
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
// 重入次数减1后的值如果为0,表示分布式锁只获取过1次,那么删除这个KEY,并发布解锁消息
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
// 这5个参数分别对应KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3]
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
同时我们看下unlockAsync的方法。
public RFuture<Void> unlockAsync(final long threadId) {
final RPromise<Void> result = new RedissonPromise<Void>();
//解锁方法
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
cancelExpirationRenewal(threadId);
result.tryFailure(future.cause());
return;
}
//获取返回值
Boolean opStatus = future.getNow();
//如果返回空,则证明解锁的线程和当前锁不是同一个线程,抛出异常
if (opStatus == null) {
IllegalMonitorStateException cause =
new IllegalMonitorStateException("
attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
//解锁成功,取消刷新过期时间的那个定时任务
if (opStatus) {
cancelExpirationRenewal(null);
}
result.trySuccess(null);
}
});
return result;
}
同时小伙伴可以注意下forceUnlock()方法加锁的方式:
在源码中并没有找到forceUnlock()被调用的痕迹(也有可能是我没有找对),但是forceUnlockAsync()方法被调用的地方很多,大多都是在清理资源时删除锁。此部分比较简单粗暴,删除锁成功则并发布锁被删除的消息,返回1结束,否则返回0结束。
public void forceUnlock() {
this.get(this.forceUnlockAsync());
}
public RFuture<Boolean> forceUnlockAsync() {
this.cancelExpirationRenewal();
return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('del', KEYS[1]) == 1) t
hen redis.call('publish', KEYS[2], ARGV[1]);
return 1 else return 0 end",
Arrays.asList(this.getName(), this.getChannelName()), new Object[]{LockPubSub.unlockMessage});
}
上文就是redisson的分布式锁实现。下次讲讲redlock的实现原理。
参考资料:
redissonLock源码
https://www.jianshu.com/p/47fd7f86c848
https://www.jianshu.com/p/7e47a4503b87
https://my.oschina.net/u/2369201/blog/1573730
网友评论