主要接触到的Redis分布式锁有两种框架RedisLockRegistry和Redisson,今天来看下两种框架的实现原理;
RedisLockRegistry
Spring-inintegration-redis
中提供的一种实现方式,使用方式如下
@Bean("redisLockRegistry")
public RedisLockRegistry redisLockRegistry(RedisConnectionFactory redisConnectionFactory) {
return new RedisLockRegistry(redisConnectionFactory, "spring-redis-lock");
}
在配置文件中直接注册成一个Bean,下面是一些构造时的源码;
public RedisLockRegistry(RedisConnectionFactory connectionFactory, String registryKey) {
// DEFAULT_EXPIRE_AFTER默认是6秒
this(connectionFactory, registryKey, DEFAULT_EXPIRE_AFTER);
}
public RedisLockRegistry(RedisConnectionFactory connectionFactory, String registryKey, long expireAfter) {
Assert.notNull(connectionFactory, "'connectionFactory' cannot be null");
Assert.notNull(registryKey, "'registryKey' cannot be null");
this.redisTemplate = new StringRedisTemplate(connectionFactory);
this.obtainLockScript = new DefaultRedisScript<>(OBTAIN_LOCK_SCRIPT, Boolean.class);
this.registryKey = registryKey;
this.expireAfter = expireAfter;
}
构造时生成了一个RedisTemplate
用于执行Redis操作,一个ObtainLockScript
用来保存获取锁的脚本;
private final class RedisLock implements Lock {
private final String lockKey;
private final ReentrantLock localLock = new ReentrantLock();
private volatile boolean unlinkAvailable = true;
private volatile long lockedAt;
// 构造的时候只储存一个key
private RedisLock(String path) {
this.lockKey = constructLockKey(path);
}
private String constructLockKey(String path) {
return RedisLockRegistry.this.registryKey + ":" + path;
}
...
}
内部封装了一个Redis锁的实现,主要包括一个锁的标识lockKey
和一个重入锁ReentrantLock
;
Lock lock = redisLockRegistry.obtain(key);
try {
if (lock.tryLock(5, TimeUnit.SECONDS)) {
try {
...
} finally {
lock.unlock();
}
} else {
throw new GetLockTimeoutException();
}
} catch (InterruptedException e) {
throw new GetLockTimeoutException();
}
上面是代码中的基本用法,可以选择封装成一个切面使用起来方便一些,首先来看下获取锁的操作obtain
;
private final Map<String, RedisLock> locks = new ConcurrentHashMap<>();
public Lock obtain(Object lockKey) {
Assert.isInstanceOf(String.class, lockKey);
String path = (String) lockKey;
return this.locks.computeIfAbsent(path, RedisLock::new);
}
从一个ConcurrentHashMap
中根据锁的标识获取一个锁,如果没有则构造一个新的;
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
long now = System.currentTimeMillis();
// 先是本地的重入锁先尝试加锁
if (!this.localLock.tryLock(time, unit)) {
// 如果本地线程加锁失败,直接返回
return false;
}
try {
long expire = now + TimeUnit.MILLISECONDS.convert(time, unit);
boolean acquired;
// 本地加锁成功后执行redis加锁
while (!(acquired = obtainLock()) && System.currentTimeMillis() < expire) {
// 加锁失败且没过超时时间,睡一会再试
Thread.sleep(100);
}
if (!acquired) {
// 如果没有获取到锁,把本地锁释放直接返回
this.localLock.unlock();
}
return acquired;
}
catch (Exception e) {
this.localLock.unlock();
rethrowAsLockException(e);
}
return false;
}
看这段逻辑需要一点ReentrantLock
的知识,可以先了解下Java8 源码阅读 - AbstractQueuedSynchronizer
首先是本地的重入锁先进行加锁,加锁步骤如下:
- 如果锁资源没有被其他线程占用,直接加锁,亦或者是本地线程已经占用锁,再次重入也被视为加锁成功;
- 如果锁资源被其他线程占用,排队等待指定时间,指定时间内获取不到资源则加锁失败;
成功在本地加锁后会执行Redis加锁操作
private boolean obtainLock() {
Boolean success =
RedisLockRegistry.this.redisTemplate.execute(RedisLockRegistry.this.obtainLockScript,
Collections.singletonList(this.lockKey), RedisLockRegistry.this.clientId,
String.valueOf(RedisLockRegistry.this.expireAfter));
boolean result = Boolean.TRUE.equals(success);
if (result) {
this.lockedAt = System.currentTimeMillis();
}
return result;
}
private static final String OBTAIN_LOCK_SCRIPT =
"local lockClientId = redis.call('GET', KEYS[1])\n" +
"if lockClientId == ARGV[1] then\n" +
" redis.call('PEXPIRE', KEYS[1], ARGV[2])\n" +
" return true\n" +
"elseif not lockClientId then\n" +
" redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2])\n" +
" return true\n" +
"end\n" +
"return false";
利用Redis执行Lua脚本是原子性的特性来实现的,首先redis.call('GET', KEYS[1])
来判断是否已经获取到锁,如果发现锁已经被占用,则判断是不是来自同一个客户端的请求,如果是就更新解锁时间(默认6s),如果不是来自同一个客户端的(即多个节点对同一个key操作),只不会允许非第一个客户端的操作;上锁的操作是redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2])
,key是外面指定的key,而value则是客户端的uuid;
解锁的逻辑也是比较简单,直接调用del
或者unlink
删除key即可;
private void removeLockKey() {
if (this.unlinkAvailable) {
try {
RedisLockRegistry.this.redisTemplate.unlink(this.lockKey);
}
catch (Exception ex) {
LOGGER.warn("The UNLINK command has failed (not supported on the Redis server?); " +
"falling back to the regular DELETE command", ex);
this.unlinkAvailable = false;
RedisLockRegistry.this.redisTemplate.delete(this.lockKey);
}
}
else {
RedisLockRegistry.this.redisTemplate.delete(this.lockKey);
}
}
在单节点的Redis架构下,RedisLockRegistry是可以满足一些不复杂的场景的,但是需要考虑锁的续租、Redis集群架构部署等问题的话,这个框架可能就会稍显拉垮,所以大多数情况都是使用Redisson;
Redisson
使用方式大致如下
Config config = new Config();
config.useSentinelServers()
.addSentinelAddress(
"redis://127.0.0.1:6379",
"redis://127.0.0.2:6379",
"redis://127.0.0.3:6379");
RedissonClient redissonClient = Redisson.create(config);
RLock lock = redissonClient.getLock("key");
// 加锁的两种方式
lock.lock();
lock.lock(5, TimeUnit.SECONDS);
lock.unlock();
下面简单分析下Redisson的加锁逻辑
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
...
}
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
// 在获取锁后,如果还没有通过调用unlock释放锁,
// 则保持锁的最大时间。如果leaseTime为-1,则保持锁定直到显式解锁。
if (leaseTime != -1) {
// 指定了锁的释放时间
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 没有指定锁的释放时间
// 尝试加锁
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
// 发现异常
return;
}
// 加锁成功
if (ttlRemaining == null) {
// 定时续租
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
// 里面处理了包括选取节点、重试等操作
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
先看下加锁的逻辑,这里就忽略了集群节点选取等其他代码,直接看这段Lua脚本的命令,执行逻辑如下
- 检查key是否存在
- 如果key不存在,直接将field即客户端id+key+线程id组成的uuid对应的value置为1,并更新过期时间,最后返回nil;
- 如果key存在,则判断field(即上面的uuid)是否存在,如果不存在,则说明该锁的已被其他占用,直接返回key剩余的ttl;如果field存在,则说明自己是锁的持有者,将filed对应的value自增1(重入);
如果没有指定锁的释放时间,将会触发锁续租的操作;
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
// 记录续租的次数
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
// 执行续租操作
renewExpiration();
}
}
// 续租
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
// 已经被停止续租
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
// 停止续租就是将该key从renewal集合中移除
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
// 续租操作有异常,停止续租
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) {
// 重新续租
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
来看下续租的操作,每个获取锁的线程都会执行续租任务,默认是30 / 3 = 10秒钟重新执行一次续租操作直到下面三个条件其中一个触发
- redis执行续租命令异常
- 手动解锁操作
- 续租操作失败(通常是redis中key不存在了)
来看下续租的Lua命令
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('pexpire', KEYS[1], ARGV[1]);
return 1;
end;
return 0;
如果对应key和field都存在,直接通过pexpire命令更新过期时间;
下面是看看获取锁失败的操作
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
if (ttl == null) {
// 获取锁成功
return;
}
// 获取锁失败
// 利用redis的pub/sub机制来订阅到锁被释放的消息
RFuture<RedissonLockEntry> future = subscribe(threadId);
// 根据是否支持中断来使用netty不同的同步操作
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
}
try {
while (true) {
// 重复尝试加锁
ttl = tryAcquire(-1, leaseTime, unit, threadId);
if (ttl == null) {
// 直到获取锁成功
break;
}
if (ttl >= 0) {
try {
// 利用Semaphore阻塞直到ttl过去
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
future.getNow().getLatch().acquire();
} else {
future.getNow().getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe(future, threadId);
}
}
如果tryAcquire返回的值不为null,即表示锁资源被其他线程占用,本地会启动一个死循环重复执行加锁操作,每一次上锁失败拿到ttl后,会利用Semaphore阻塞当前线程直到ttl过去,当然也可能被之前订阅的解锁消息主动唤醒;
protected void onMessage(RedissonLockEntry value, Long message) {
if (message.equals(UNLOCK_MESSAGE)) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute != null) {
runnableToExecute.run();
}
// 当收到解锁的通知时会主动把Semaphore阻塞的线程释放
value.getLatch().release();
}
}
这是当Redis收到订阅的解锁事件执行的方法,这里会主动把RedissonLockEntry
持有的所有被阻塞的线程都释放;
// 解锁的实际逻辑
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<>();
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
// 取消锁续租操作
cancelExpirationRenewal(threadId);
...
});
return result;
}
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
解锁的逻辑如下
- 判断这个锁是否被当前线程持有
- 如果当前线程不是锁的持有者,直接返回nil;
- 如果当前线程是锁的持有者,将redis保存的数值减1,如果减1后的结果是大于0的,说明该锁是被重复加锁的,这时候需要对锁进行续租,时间为
internalLockLeaseTime
,并返回为0;- 如果减1后的结果为0,将key删除后并发布解锁的消息;
RedisLockRegistry和Redisson对比
不同点:
- RedisLockRegistry有一个本地加锁的逻辑,只有当本地加锁成功才能继续执行redis加锁逻辑,重入逻辑也是做在本地,所以理论上RedisLockRegistry比Redisson会快那么一点点;
- Redisson有锁续租功能,解决了加锁成功后逻辑执行未完成时锁到期被释放,导致其他资源获取锁的混乱;
RedLock
因为Redis集群主从同步时会有延迟,有可能因为master节点挂掉,master节点的锁还未同步到slave时,slave被选举成master而可能其他线程能在新master上重复获得锁,而导致锁资源加锁混乱的问题;
所以就有了一个RedLock来解决这种问题,大致的思想是在集群上不同节点都尝试加锁,步骤大致如下
- 获取当前的时间戳(毫秒)。
- 依次尝试从 N 个实例,使用相同的 key 和随机值获取锁。在步骤 2,当向 Redis 设置锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为 10 秒,则超时时间应该在 5-50 毫秒之间。这样可以避免服务器端 Redis 已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试另外一个 Redis 实例。
- 客户端使用当前时间减去开始获取锁时间(步骤 1 记录的时间)就得到获取锁使用的时间。当且仅当从大多数(这里是 3 个节点)的 Redis 节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。
如果取到了锁,key 的真正有效时间等于有效时间减去获取锁所使用的时间(步骤 3 计算的结果)。- 如果因为某些原因,获取锁失败(没有在至少 N/2+1 个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的 Redis 实例上进行解锁(即便某些 Redis 实例根本就没有加锁成功)。
通过在集群大多数实例上获取来保证锁的可用性,比较一个集群内节点都宕机的可能性还是很低的,Redisson内置了该算法的实现,实现类是RedissonRedLock;
RedissonRedLock
RLock lock = redissonClient.getLock("key");
RLock lock2 = redissonClient.getLock("key");
RedissonRedLock redLock = new RedissonRedLock(lock, lock2);
redLock.lock();
redLock.unlock();
使用方式如上,下面看下源码具体的实现逻辑
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
// 每个锁至少等待1500ms,总时间就是1500*数量
long baseWaitTime = locks.size() * 1500;
long waitTime = -1;
if (leaseTime == -1) {
waitTime = baseWaitTime;
} else {
// 如果指定了锁的释放时间
leaseTime = unit.toMillis(leaseTime);
waitTime = leaseTime;
if (waitTime <= 2000) {
waitTime = 2000;
} else if (waitTime <= baseWaitTime) {
// 如果锁的释放时间小于框架内置的等待时间
// 重制等待时间
waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime);
} else {
// 如果锁的释放时间大于框架内置的等待时间
// 重制等待时间
waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);
}
}
while (true) {
// 重复上锁直到成功或者异常
if (tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)) {
return;
}
}
}
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
// 重制锁的释放时间 至少要求是集群获取锁的等待时间以上
long newLeaseTime = ...;
// 获取当前集群的时间戳
long time = System.currentTimeMillis();
long remainTime = -1;
if (waitTime != -1) {
remainTime = unit.toMillis(waitTime);
}
// 每个锁保留的时间是waitTime
long lockWaitTime = calcLockWaitTime(remainTime);
// 加锁失败的限制数
int failedLocksLimit = failedLocksLimit();
List<RLock> acquiredLocks = new ArrayList<>(locks.size());
// 遍历锁
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
RLock lock = iterator.next();
boolean lockAcquired;
try {
// 进行加锁
if (waitTime == -1 && leaseTime == -1) {
lockAcquired = lock.tryLock();
} else {
long awaitTime = Math.min(lockWaitTime, remainTime);
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (RedisResponseTimeoutException e) {
// redis访问超时 解锁当前的节点
unlockInner(Arrays.asList(lock));
lockAcquired = false;
} catch (Exception e) {
lockAcquired = false;
}
if (lockAcquired) {
acquiredLocks.add(lock);
} else {
// 加锁失败
if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
// 如果获取到锁的节点数大于预置的节点数,则判断这次加锁已经成功
break;
}
if (failedLocksLimit == 0) {
// 加锁失败次数已经到达了限制
// 将已加锁成功的全部解锁
unlockInner(acquiredLocks);
if (waitTime == -1) {
return false;
}
failedLocksLimit = failedLocksLimit();
acquiredLocks.clear();
// 重制迭代器
while (iterator.hasPrevious()) {
iterator.previous();
}
} else {
// 记录失败的次数
failedLocksLimit--;
}
}
if (remainTime != -1) {
// 计算还剩余的时间
remainTime -= System.currentTimeMillis() - time;
time = System.currentTimeMillis();
// 如果已经没有剩余时间了,将所有获取到锁的节点进行解锁
if (remainTime <= 0) {
unlockInner(acquiredLocks);
return false;
}
}
}
if (leaseTime != -1) {
// 更新每个节点锁的超时时间
List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
for (RLock rLock : acquiredLocks) {
RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
futures.add(future);
}
for (RFuture<Boolean> rFuture : futures) {
rFuture.syncUninterruptibly();
}
}
return true;
}
加锁的逻辑基本上按照RedLock算法来实现的,流程如下:
- 每个节点加锁至少等待1500ms,等待总时间就是1500*节点数量;
- 按照RedLock算法,计算每个节点获取锁的等待时间;
- 从第一个节点开始进行加锁操作,加锁的执行逻辑和上面单节点的一样,如果加锁成功继续下一个节点,如果加锁失败,首先判断加锁成功的节点数是否已经满足最低加锁个数限制,比如5个节点中必须大于等于3个节点;
- 如果满足则停止加锁,本次加锁操作执行成功;
- 如果不满足,判断加锁失败次数是否达到上限,如果没有达到上限,记录失败次数并对下一个节点加锁,如果已经达到加锁失败次数的上限,判断本次集群加锁失败,解锁所有成功获取到锁的节点;
- 每个节点加锁无论成功或失败,执行完成后都需要判断剩余时间,如果剩余时间已经小于0且未完成所有节点的加锁,判断为加锁失败并进行解锁;
- 如果加锁成功,更新所有加锁成功节点的过期时间;
后续
在Redisson3.15.0版本中RedissonRedLock已经不被推荐使用了,理由是现在RLock对象加锁时会自动传播到所有的Slaves,具体文档可以在这里查询;https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers#84-redlock
虽然不再推荐用了,但是长点见识也是可以的;
网友评论