分布式锁
关于什么是分布式锁,举一个很简单的例子:现在大多数人都会使用信用卡,信用卡有账单日和还款日,账单日就是统计你需要在还款日截止之前需要还多少钱。
上述场景意味着银行的服务器需要在账单日有一个定时任务去统计每个人的账单,如果银行的服务器是一台机器的话,按照最简单的计算方式顺序遍历计算就可以了,但是面对庞大的用户群体,一台服务器是远远不够的,除了需要解决计算的问题,也需要考虑单点故障的问题,所以部署多台服务器是显然的。但是部署多台服务器需要面临的就是共享资源的抢占问题,如果有一台服务器正在计算你的账单,另一台服务器正好也在计算你的账单,那是不是意味着你一个月需要还多份账单给银行呢,这样显然是不可能的。这个时候分布式锁就能解决上述统计多份账单的问题,分布式锁会在第一台服务器开始计算之前先锁定资源,其他的服务器就会发现你的账单资源已经被锁定,也就不会重复计算。而是把自己的计算能力放在其他还未被锁定的资源上面。
如何实现分布式锁?
实现分布式锁有很多方式,比如数据库实现,zookeeper实现,还有我们现在要介绍的redis实现等等。其中的优缺点我们就不细说,直接来看redisson实现分布式锁。
maven依赖:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.10.6</version>
</dependency>
如果需要将redisson集成到spring里面去:
/**
* 创建RedissonClient(这里以单机redis为例子)
*
* @return
*/
@Bean(destroyMethod = "shutdown")
public static RedissonClient singletonRedisson() {
Config config = new Config();
// 可以将配置参数放到配置文件中
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
return Redisson.create(config);
}
使用方式:
@Service
public class XxxService {
/**
* 从Spring中引用
*/
@Autowired
private RedissonClient redisson;
/**
* 需要获取分布式锁的业务方法
*/
public void redisson() {
RLock rLock = redisson.getLock("lockName");
// 如果获取锁成功
if (rLock.tryLock()) {
try {
System.out.println("获取锁成功,执行业务逻辑");
} finally {
// 解锁
rLock.unlock();
System.out.println("解锁成功");
}
}
}
}
下面开始分析redisson的实现方式:
tryLock()
/**
* 尝试获取锁
*
* @return
*/
public boolean tryLock() {
return (Boolean)this.get(this.tryLockAsync());
}
/**
* 一看就是为了获取异步执行的结果,所以重点应该看tryLockAsync()
*
* @return
*/
protected final <V> V get(RFuture<V> future) {
return this.commandExecutor.get(future);
}
/**
* 异步获取锁
*
* @return
*/
public RFuture<Boolean> tryLockAsync() {
return this.tryLockAsync(Thread.currentThread().getId());
}
/**
* 尝试获取锁
*
* @param threadId
* @return
*/
public RFuture<Boolean> tryLockAsync(long threadId) {
return this.tryAcquireOnceAsync(-1L, (TimeUnit) null, threadId);
}
/**
* 尝试获取锁
*
* @param leaseTime
* @param unit
* @param threadId
* @return
*/
private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, long threadId) {
/**
* 如果自定义过期时间
*/
if (leaseTime != -1L) {
return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
/**
* 如果是默认的过期时间
*/
else {
RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
//没有异常
if (e == null) {
//成功获取锁
if (ttlRemaining) {
// 更新过期时间
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
}
/**
* 真正的获取锁的代码
* @param leaseTime
* @param unit
* @param threadId
* @param command
* @param <T>
* @return
*/
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
//这个字段后面用作续过期时间
this.internalLockLeaseTime = unit.toMillis(leaseTime);
/**
* 利用lua脚本执行相关逻辑
*/
return this.commandExecutor.evalWriteAsync(this.getName(),
LongCodec.INSTANCE,
command,
"if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);",
Collections.singletonList(this.getName()),
new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}
//如果key不存在
if (redis.call('exists', KEYS[1]) == 0)
//设置这个key,并且设置超时时间,获取锁成功
then redis.call('hset', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil; end;
//如果key存在
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)
//就对key自增,并且重置过期时间(重入锁)
then redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil; end;
//获取key剩下的时间
return redis.call('pttl', KEYS[1]);
/**
* 重置过期时间
* @param threadId
*/
private void scheduleExpirationRenewal(long threadId) {
RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();
//检查是否存在指定的定时任务
RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
//如果已经存在指定的定时任务
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
}
//如果是第一次创建这个定时任务
else {
entry.addThreadId(threadId);
this.renewExpiration();
}
}
/**
* 定时任务重置过期时间
*/
private void renewExpiration() {
RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
if (ee != null) {
//每三分之一的过期时间续一次,直至解锁
Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
if (ent != null) {
//拿到第一个线程id
Long threadId = ent.getFirstThreadId();
if (threadId != null) {
//续时间
RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
// 如果有异常,打印日志
if (e != null) {
RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
}
// 没有异常就继续续时间
else {
RedissonLock.this.renewExpiration();
}
});
}
}
}
}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
}
/**
* 重置指定线程id的过期时间
* @param threadId
* @return
*/
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return this.commandExecutor.evalWriteAsync(this.getName(),
LongCodec.INSTANCE,
RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;",
Collections.singletonList(this.getName()),
new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}
//如果已经存在这个key,那就给它续时间
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)
then redis.call('pexpire', KEYS[1], ARGV[1]);
return 1; end;
return 0;
unlock()
/**
* 解锁
*/
public void unlock() {
try {
this.get(this.unlockAsync(Thread.currentThread().getId()));
} catch (RedisException var2) {
if (var2.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException)var2.getCause();
} else {
throw var2;
}
}
}
/**
* 解锁(异步)
* @param threadId
* @return
*/
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise();
RFuture<Boolean> future = this.unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
if (e != null) {
//取消续时间的任务
this.cancelExpirationRenewal(threadId);
result.tryFailure(e);
} else if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
result.tryFailure(cause);
} else {
//取消续时间的任务
this.cancelExpirationRenewal(threadId);
result.trySuccess((Object)null);
}
});
return result;
}
/**
* 真正执行解锁的代码
* @param threadId
* @return
*/
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return this.commandExecutor.evalWriteAsync(this.getName(),
LongCodec.INSTANCE,
RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;",
Arrays.asList(this.getName(), this.getChannelName()),
new Object[]{LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId)});
}
//如果不存在key,直接返回
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0)
then return nil;end;
//对key减一后再统计key的重入次数(考虑重入锁)
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
//如果次数大于0,就重置过期时间
if (counter > 0)
then redis.call('pexpire', KEYS[1], ARGV[2]); return 0;
//如果次数小于等于0,就删除key,并且发布消息(考虑到订阅了消息的线程正在等待)
else redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1; end;
return nil;
/**
* 取消续时间的任务
* @param threadId
*/
void cancelExpirationRenewal(Long threadId) {
//从Map中获取任务
RedissonLock.ExpirationEntry task = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
if (task != null) {
if (threadId != null) {
//移除线程id
task.removeThreadId(threadId);
}
if (threadId == null || task.hasNoThreads()) {
//取消续时间的任务
task.getTimeout().cancel();
//移除任务
EXPIRATION_RENEWAL_MAP.remove(this.getEntryName());
}
}
}
从上述源码分析上看,
1.tryLock()的过期时间默认是30秒,并且会存在一个定时续过期时间的任务,保证业务执行时间不会超过过期时间,抢占失败即返回false
2.tryLock(long waitTime, TimeUnit unit)这个方法我们没有分析,从源码中看,为了实现waitTime,使用了redis的订阅发布功能。也就是没有抢到锁的线程订阅消息,直至waitTime过期返回false或者被通知新一轮的开始抢占锁。当然,它如果抢占到锁,锁的过期时间也是30秒,同样也会存在一个定时任务续过期时间
3.tryLock(long waitTime, long leaseTime, TimeUnit unit)这个方法的参数leaseTime如果不是-1的话,是不会有定时任务续过期时间的,也就存在业务处理时间可能超过过期时间的风险。其他的和tryLock(long waitTime, TimeUnit unit)一致
4.lock()这个方法是要保证一定要抢到锁的,它的默认过期时间也是30秒,和tryLock()不同的是,它如果没抢占到锁,会一直自旋
以上是我对于redisson分布式锁的简单分析,感兴趣的小伙伴也可以自己对源码做一个简单了解。
网友评论