一、读锁
读写锁的意义:
1, redis分布式锁,主要就是在理解他里面的lua脚本的逻辑,逻辑全部都在lua脚本里,我们只能枚举清楚各种情况下,lua脚本会执行什么逻辑,其实就知道了这个分布式锁的实现原理
- 多个客户端同时加读锁,是不会互斥的,多个客户端可以同时加这个读锁,读锁和读锁是不互斥的
- 如果有人加了读锁,此时就不能加写锁,任何人都不能加写锁了,读锁和写锁是互斥的
- 如果有人加了写锁,此时任何人都不能加写锁了,写锁和写锁也是互斥的
- RedissonReadLock是RedissonLock的子类,所以很多逻辑会直接复用父类RedissonLock中的逻辑
- 这里的核心逻辑主要有三块、第一个是加读锁的lua脚本的逻辑;第二个是读锁的释放的lua脚本的逻辑;第三个是读锁的wathdog刷新锁key的生存时间的逻辑
代码
代码片段一、
public class Application {
public static void main(String[] args) throws Exception {
Config config = new Config();
// 自己本地的Redis集群,直接写死了,主要是研究redisson的源码,配置之类的可以弱化
config.useClusterServers()
.addNodeAddress("redis://192.168.0.107:7001")
.addNodeAddress("redis://192.168.0.107:7002")
.addNodeAddress("redis://192.168.0.110:7003")
.addNodeAddress("redis://192.168.0.110:7004")
.addNodeAddress("redis://192.168.0.111:7005")
.addNodeAddress("redis://192.168.0.111:7006");
RedissonClient redisson = Redisson.create(config);
RReadWriteLock rwLock = redisson.getReadWriteLock("anyRWLock");
// 代码片段
rwLock.readLock().lock();
rwLock.readLock().unlock();
rwLock.writeLock().lock();
rwLock.writeLock().unlock();
}
}
代码片段二、
// RedissonLock类中
@Override
public void lock() {
try {
// 向下看
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
// 参数-1和null
lockInterruptibly(-1, null);
}
// leaseTime = -1,unit=null
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
// 当前加锁客户端的线程ID
long threadId = Thread.currentThread().getId();
// 实际的加锁逻辑
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// tryLockInnerAsync会到RedissonReadLock类中,参数分别是30000毫秒,为超时时间,线程ID,代码片段三、
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
// 这里其实就是会有一个监听器,watchdog
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
// 如果加锁失败,直接返回
if (!future.isSuccess()) {
return;
}
// 读取锁的剩余生存时间
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
// 代码片段四、
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
代码片段三、
这里其实还是老路子,我们先把lua脚本里面的参数给提取出来,方便在阅读lua脚本的时候,脑海里对redis的命令执行结果有一个快速的认识
KEYS[1] = “anyRWLock”
KEYS[2] = “{anyRWLock}:UUID_01:threadId_01:rwlock_timeout”
ARGV[1]=30000毫秒
ARGV[2] = UUID_01:threadId_01
ARGV[3]=UUID_01:threadId_01:write
@Override
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// hget anyRWLock mode 从anyRWLock这个hash里面获取mode作为key的值,刚开始进来肯定是null
"local mode = redis.call('hget', KEYS[1], 'mode'); “ +
// if条件成立
"if (mode == false) then “ +
// hset anyRWLock mode read 此时anyRWLock的hash为 anyRWLock:{“mode”:”read"}
"redis.call('hset', KEYS[1], 'mode', 'read'); “ +
// hset anyRWLock UUID_01:threadId_01 1 此时anyRWLock:{“mode”:”read”,"UUID_01:threadId_01”:1}
"redis.call('hset', KEYS[1], ARGV[2], 1); “ +
// set {anyRWLock}:UUID_01:threadId_01:rwlock_timeout ..:1 3000,这里的..其实是字符串拼接的意思,
// 实际的结果是{anyLock}:UUID_01:threadId_01:rwlock_timeout:1 1
"redis.call('set', KEYS[2] .. ':1', 1); “ +
//设置{anyLock}:UUID_01:threadId_01:rwlock_timeout:1的过期时间是30000毫秒
"redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); “ +
// 设置anyRWLock的过期时间30000毫秒
"redis.call('pexpire', KEYS[1], ARGV[1]); “ +
// 返回nil,其实这里上面的调用这段逻辑的函数里,就会开启一个watchdog,会每隔10秒钟去执行一段lua脚本,
// 判断一下当前这个线程是否还持有着这个锁,如果还持有锁,更新一下锁key的生存时间为30000毫秒,watchdog主要是保持redis的锁key和java代码中持有的锁是保持同步的
"return nil; " +
"end; " +
"if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
"local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"local key = KEYS[2] .. ':' .. ind;" +
"redis.call('set', key, 1); " +
"redis.call('pexpire', key, ARGV[1]); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end;" +
"return redis.call('pttl', KEYS[1]);",
Arrays.<Object>asList(getName(), getReadWriteTimeoutNamePrefix(threadId)),
internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId));
}
代码片段四、
private void scheduleExpirationRenewal(final long threadId) {
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
// 代码片段五、这个线程延迟10秒后执行
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
}
if (future.getNow()) {
// reschedule itself
// 调用自己,不停的延长锁的生存时间
scheduleExpirationRenewal(threadId);
}
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
if (expirationRenewalMap.putIfAbsent(getEntryName(), new ExpirationEntry(threadId, task)) != null) {
task.cancel();
}
}
代码片段六、
释放锁的逻辑,RedissonReadLock类中
KEYS[1] = anyRWLock
KEYS[2] = {anyRWLock}
ARGV[1] = 30000毫秒
ARGV[2] = UUID_01:threadId_01
@Override
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
// {anyRWLock}:UUID_01:threadId_01:rwlock_timeout
String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);
String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 这里判断KEYS[1] mode是否存在以及KEYS[1], ARGV[2]是否存在,条件是不成立的
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " +
"if (lockExists == 0) then " +
"return nil;" +
"end; " +
// anyRWLock UUID_01:threadId_01减一
"local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " +
"if (counter == 0) then “ +
// 如果等于0,等于所有的可重入锁都已经释放,这里释放最后一把锁,那么直接删除anyRWLock UUID_01:threadId_01
"redis.call('hdel', KEYS[1], ARGV[2]); " +
"end;” +
"redis.call('del', KEYS[3] .. ':' .. (counter+1)); " +
// hlen anyLock = 2 > 1,就是说,如果你的读锁,anyLock hash内部的key-value对超过了1个,这里肯定是成立的
"if (redis.call('hlen', KEYS[1]) > 1) then " +
"local maxRemainTime = -3; " +
"local keys = redis.call('hkeys', KEYS[1]); " +
"for n, key in ipairs(keys) do “ +
// 加读锁的时候,其实是每个线程都可以加多次这个读锁,读锁也是可重入的,每次同一个线程加多次读锁的时候,他的加锁次数就会加1,counter = 1 ,也可能是 10 、20
// 就是遍历counter -> 1,每次递减1,假设counter = 10,10,9,8,7,6,5,4,3,2,1,
"counter = tonumber(redis.call('hget', KEYS[1], key)); " +
"if type(counter) == 'number' then " +
"for i=counter, 1, -1 do " +
"local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " +
"maxRemainTime = math.max(remainTime, maxRemainTime);" +
"end; " +
"end; " +
"end; " +
"if maxRemainTime > 0 then " +
"redis.call('pexpire', KEYS[1], maxRemainTime); " +
"return 0; " +
"end;" +
"if mode == 'write' then " +
"return 0;" +
"end; " +
"end; " +
// 删除anyRWLock
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; ",
Arrays.<Object>asList(getName(), getChannelName(), timeoutPrefix, keyPrefix),
LockPubSub.unlockMessage, getLockName(threadId));
}
二、写锁
其实写锁和读锁的Java代码类似,只是lua脚本略有不同,所以,这里直接分析lua脚本了
KEYS[1] = anyRWLock
ARGV[1] = 30000
ARGV[2] = UUID_01:threadId_01:write
代码片段一、RedissonWriteLock
@Override
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// hget anyRWLock mode此时肯定获取的是空的
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then “ +
// hset anyRWLock mode write向anyRWLock model 的hash中写入write ,即:anyRWLock:{“mode”:”write"}
"redis.call('hset', KEYS[1], 'mode', 'write'); “ +
// hset anyRWLock UUID_01:threadId_01:write,即:anyRWLock:{“mode”:”write”,“UUID_01:threadId_01”:“write”}
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
// pexpire anyRWLock 30000设置anyRWLock的生存时间
"redis.call('pexpire', KEYS[1], ARGV[1]); “ +
// 返回nil,说明加锁成功了,Java逻辑拿到加锁成功标志后,watchdog其实就是用的RedissonLock中的逻辑
"return nil; " +
"end; " +
"if (mode == 'write') then " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"local currentExpire = redis.call('pttl', KEYS[1]); " +
"redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); " +
"return nil; " +
"end; " +
"end;" +
"return redis.call('pttl', KEYS[1]);",
Arrays.<Object>asList(getName()),
internalLockLeaseTime, getLockName(threadId));
}

网友评论