萌芽
对于分布式锁,有一个不错的客户端Redisson,是会自动锁续约的,详情请自行搜索
但,对于这个客户端的使用方式,个人不是很喜欢,还是更倾向Lettuce
对于锁续约原理 不复杂(参考Redisson),无非就是用个WatchDog线程,时不时对比一下锁的超时时间还剩余多少,如果小于某个值就续约(预设时间30s,已经过去20s了,就刷新超时时间)
分析源码
这里使用的是sping integration包里的分布式锁(理解成官方),详情请查阅
我们的目标是获取存在的分布式锁,通过这些锁的key进行续约
代码片段如下(精简了部分内容)
public final class RedisLockRegistry implements ExpirableLockRegistry, DisposableBean {
private static final String OBTAIN_LOCK_SCRIPT =
"local lockClientId = redis.call('GET', KEYS[1])\n" +
"if lockClientId == ARGV[1] then\n" +
" redis.call('PEXPIRE', KEYS[1], ARGV[2])\n" +
" return true\n" +
"elseif not lockClientId then\n" +
" redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2])\n" +
" return true\n" +
"end\n" +
"return false";
private final Map<String, RedisLock> locks = new ConcurrentHashMap<>();
这里我截取了两个关键部分
- 申请锁的指令
- 锁的存放位置(私有Map)
那么目标就很明确了
第一个小目标
- 续约指令
分析申请锁指令可得
1.如果当前发指令的clientId 就是锁的持有者,则续约
2.如果当前clientId不存在,则锁上
3.否则锁失败
这也是为啥在分布式锁续约问题,我可以使用重入锁的的方式续约的原因
那么问题来了,为啥这里不能直接重入锁去刷新呢?
因为重入锁是需要业务代码上调用的,这样对业务代码侵入性太强了,而WatchDog调用,本身在不同线程上,就连本地锁都没办法重入(为了性能,分布式锁也维持了一把本地锁)
清楚指令意思之后就好办了,编写续约代码
private static final String OBTAIN_LOCK_SCRIPT_VERSION = "aa1fc9ae99657e86372b45452e5d6f71";
private static final String RENEW_LOCK_SCRIPT =
"local lockClientId = redis.call('GET', KEYS[1])\n" +
"if lockClientId == ARGV[1] then\n" +
" redis.call('PEXPIRE', KEYS[1], ARGV[2])\n" +
" return true\n" +
"end\n" +
"return false";
代码意思也是显然而已的
1.锁是自己的,续约
2.否则,失败(业务代码完成了,主动释放锁了)
当然,为了保险起见,这边把申请锁的代码md5了,作为分布式锁的版本,用来判断当前的插件代码是否还合适
- 侵入RedisLockRegistry,获取locks
这里没什么好说,直接反射,强制读取就行了(虽然官方已经不建议这样用,但是现在太多太多框架都这样做了,现在还没强制,不是么?)
完整代码
上面的是废话,看代码就好了
/**
* (why) 提供【自动续约】功能
* (what)本类以【暴力】锁进行续约
* (how)自动初始化,并以插件方式运行
*
* @Todo 若 RedisLockRegistry 提供续约功能,应使用官方功能
* @author Wind
*/
@Slf4j
public class LockWatchdog {
/**
* 这个是RedisLockRegistry的script, 用于确认版本是否正确
*/
private static final String OBTAIN_LOCK_SCRIPT_VERSION = "aa1fc9ae99657e86372b45452e5d6f71";
/**
* renew锁使用
* 和 OBTAIN_LOCK_SCRIPT 最大的区别就是如果lockClientId不存在,不会创建一条
*/
private static final String RENEW_LOCK_SCRIPT =
"local lockClientId = redis.call('GET', KEYS[1])\n" +
"if lockClientId == ARGV[1] then\n" +
" redis.call('PEXPIRE', KEYS[1], ARGV[2])\n" +
" return true\n" +
"end\n" +
"return false";
/**
* RedisLockRegistry(static).FIELDS
*/
private static final String FIELD_OBTAIN_LOCK_SCRIPT = "OBTAIN_LOCK_SCRIPT";
/**
* redisLockRegistry(object).FIELDS
*/
private static final String FIELD_CLIENT_ID = "clientId";
private static final String FIELD_EXPIRE_AFTER = "expireAfter";
private static final String FIELD_LOCKS = "locks";
/**
* redisLock(object).FIELDS
*/
private static final String FIELD_LOCK_KEY = "lockKey";
private static final String FIELD_LOCKED_AT = "lockedAt";
private final RedisLockRegistry lockRegistry;
private final StringRedisTemplate redisTemplate;
private final RedisScript<Boolean> renewLockScript;
private final String clientId;
private final long expireAfter;
public LockWatchdog(RedisLockRegistry lockRegistry, StringRedisTemplate redisTemplate) throws IllegalAccessException, NoSuchMethodException, InvocationTargetException {
//依赖
this.lockRegistry = lockRegistry;
this.redisTemplate = redisTemplate;
//刷新脚本
this.renewLockScript = new DefaultRedisScript<>(RENEW_LOCK_SCRIPT, Boolean.class);
//代理类的参数
this.clientId = (String) UnsafeBeanUtils.getProperty(lockRegistry, FIELD_CLIENT_ID);
Assert.hasText(this.clientId, "client id is required!");
this.expireAfter = (Long) UnsafeBeanUtils.getProperty(lockRegistry, FIELD_EXPIRE_AFTER);
Assert.notNull(this.expireAfter, "expire after is required!");
Assert.isTrue(this.expireAfter > 0, "expire after <= 0");
//check version
String script = (String) UnsafeBeanUtils.getProperty(RedisLockRegistry.class, FIELD_OBTAIN_LOCK_SCRIPT);
Assert.isTrue(CipherUtils.md5(script).equalsIgnoreCase(OBTAIN_LOCK_SCRIPT_VERSION),"verion error");
log.info("init success clientId {}, expireAfter {}", clientId, expireAfter);
}
/**
* 续约锁
* 续约成功后会修改lockedAt字段,避免锁被超时回收了
*
* @param redisLock
* @return
* @throws IllegalAccessException
* @throws NoSuchMethodException
* @throws InvocationTargetException
*/
private boolean renewLock(Object redisLock) throws IllegalAccessException, NoSuchMethodException, InvocationTargetException {
// lock key 和 map key不同的,需要重新获取
String lockKey = (String) UnsafeBeanUtils.getProperty(redisLock, FIELD_LOCK_KEY);
if(log.isDebugEnabled()) {
log.debug("LockWatchdog:try to renew {}", lockKey);
}
Boolean success =
redisTemplate.execute(renewLockScript,
Collections.singletonList(lockKey), clientId,
String.valueOf(expireAfter));
boolean result = Boolean.TRUE.equals(success);
if (result) {
UnsafeBeanUtils.setProperty(redisLock, FIELD_LOCKED_AT, System.currentTimeMillis());
if(log.isDebugEnabled()) {
log.debug("LockWatchdog:{} renew success!", lockKey);
}
} else {
if(log.isDebugEnabled()) {
log.debug("LockWatchdog:{} renew fail!", lockKey);
}
}
return result;
}
/**
* 定时器(10s执行一次续约)
* 这里直接获取到Map里的内容尝试续约
* Map里的锁是会自动删除的(ExpirableLockRegistry)
* 而且使用分布式锁的场景不算多,所以已经解锁的,也去尝试续约也是没问题的
*
* @throws NoSuchMethodException
* @throws InvocationTargetException
* @throws IllegalAccessException
*/
@Scheduled(cron = "*/10 * * * * ?")
private void scheduled() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
if(log.isDebugEnabled()){
log.debug("LockWatchdog:renew lock");
}
Map<String, Lock> locks = (Map<String, Lock>) UnsafeBeanUtils.getProperty(lockRegistry, FIELD_LOCKS);
if(log.isDebugEnabled()) {
log.debug("LockWatchdog:locks {}", locks.size());
}
if (MapUtils.isNotEmpty(locks)) {
Iterator<Map.Entry<String, Lock>> iter = locks.entrySet().iterator();
Map.Entry<String, Lock> entry = null;
while (iter.hasNext()) {
entry = iter.next();
renewLock(entry.getValue());
}
} else {
if(log.isDebugEnabled()) {
log.debug("LockWatchdog:not need to renew!");
}
}
if(log.isDebugEnabled()) {
log.debug("LockWatchdog:renew lock finish!");
}
}
/**
* 不安全的类操作
*/
private final static class UnsafeBeanUtils {
public static Object getProperty(final Class clazz, final String name) throws IllegalAccessException {
Field field = FieldUtils.getDeclaredField(clazz, name, true);
return field.get(clazz);
}
public static Object getProperty(final Object bean, final String name) throws IllegalAccessException {
Field field = FieldUtils.getDeclaredField(bean.getClass(), name, true);
return field.get(bean);
}
public static void setProperty(final Object bean, final String name, final Object value) throws IllegalAccessException {
Field field = FieldUtils.getDeclaredField(bean.getClass(), name, true);
field.set(bean, value);
}
}
}
代码说明
- 使用了Scheduled 做定时任务,并且10s续约一次
对,没判断当前锁时间,直接10s一次,本身就是临时解决方案,能达到目的就好了,系统设置了30s的锁时间,也就是没10s自动把超时时间重置为30s,注意,是重置,并不是延长,所以多次调用效果是没太大差别的,并且业务完成后主动释放锁是好习惯,再次,一个应用这种排他锁也就10把8把,多调用几次(续约)影响忽略不计
有兴趣可以优化(按Redisson的判断下锁定时间,超时时间什么的) - 对于已经解锁的分布式锁,还会存在map里的,所以代码会出现续约失败的情况
看代码是会一段时间清理的,目前看也可以在续约之前试试trylock,成功了 说明对方已经解锁了,不用续约了,但是成功了还需要解锁,多次调用redis,还不如直接一开始就尝试续约好了,等spring自己清理(这边就自然不会续了) - 临时方案,临时方案,临时方案,如果官方支持自动续约了,就用官方的好了
网友评论