美文网首页高级专题
分布式锁续约问题-续

分布式锁续约问题-续

作者: Wind哥 | 来源:发表于2020-12-25 08:47 被阅读0次

    萌芽

    对于分布式锁,有一个不错的客户端Redisson,是会自动锁续约的,详情请自行搜索
    但,对于这个客户端的使用方式,个人不是很喜欢,还是更倾向Lettuce

    对于锁续约原理 不复杂(参考Redisson),无非就是用个WatchDog线程,时不时对比一下锁的超时时间还剩余多少,如果小于某个值就续约(预设时间30s,已经过去20s了,就刷新超时时间)

    分析源码

    这里使用的是sping integration包里的分布式锁(理解成官方),详情请查阅
    我们的目标是获取存在的分布式锁,通过这些锁的key进行续约
    代码片段如下(精简了部分内容)

    public final class RedisLockRegistry implements ExpirableLockRegistry, DisposableBean {
        private static final String OBTAIN_LOCK_SCRIPT =
                "local lockClientId = redis.call('GET', KEYS[1])\n" +
                        "if lockClientId == ARGV[1] then\n" +
                        "  redis.call('PEXPIRE', KEYS[1], ARGV[2])\n" +
                        "  return true\n" +
                        "elseif not lockClientId then\n" +
                        "  redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2])\n" +
                        "  return true\n" +
                        "end\n" +
                        "return false";
        private final Map<String, RedisLock> locks = new ConcurrentHashMap<>();
    

    这里我截取了两个关键部分

    • 申请锁的指令
    • 锁的存放位置(私有Map)

    那么目标就很明确了

    第一个小目标

    • 续约指令
      分析申请锁指令可得
    1.如果当前发指令的clientId 就是锁的持有者,则续约
    2.如果当前clientId不存在,则锁上
    3.否则锁失败
    

    这也是为啥在分布式锁续约问题,我可以使用重入锁的的方式续约的原因
    那么问题来了,为啥这里不能直接重入锁去刷新呢?
    因为重入锁是需要业务代码上调用的,这样对业务代码侵入性太强了,而WatchDog调用,本身在不同线程上,就连本地锁都没办法重入(为了性能,分布式锁也维持了一把本地锁)

    清楚指令意思之后就好办了,编写续约代码

    private static final String OBTAIN_LOCK_SCRIPT_VERSION = "aa1fc9ae99657e86372b45452e5d6f71";
    
    private static final String RENEW_LOCK_SCRIPT =
                "local lockClientId = redis.call('GET', KEYS[1])\n" +
                        "if lockClientId == ARGV[1] then\n" +
                        "  redis.call('PEXPIRE', KEYS[1], ARGV[2])\n" +
                        "  return true\n" +
                        "end\n" +
                        "return false";
    
    代码意思也是显然而已的
    1.锁是自己的,续约
    2.否则,失败(业务代码完成了,主动释放锁了)
    
    当然,为了保险起见,这边把申请锁的代码md5了,作为分布式锁的版本,用来判断当前的插件代码是否还合适
    
    • 侵入RedisLockRegistry,获取locks
      这里没什么好说,直接反射,强制读取就行了(虽然官方已经不建议这样用,但是现在太多太多框架都这样做了,现在还没强制,不是么?)

    完整代码

    上面的是废话,看代码就好了

    /**
     * (why) 提供【自动续约】功能
     * (what)本类以【暴力】锁进行续约
     * (how)自动初始化,并以插件方式运行
     *
     * @Todo 若 RedisLockRegistry 提供续约功能,应使用官方功能
     * @author Wind
     */
    @Slf4j
    public class LockWatchdog {
    
        /**
         * 这个是RedisLockRegistry的script, 用于确认版本是否正确
         */
        private static final String OBTAIN_LOCK_SCRIPT_VERSION = "aa1fc9ae99657e86372b45452e5d6f71";
    
        /**
         * renew锁使用
         * 和 OBTAIN_LOCK_SCRIPT 最大的区别就是如果lockClientId不存在,不会创建一条
         */
        private static final String RENEW_LOCK_SCRIPT =
                "local lockClientId = redis.call('GET', KEYS[1])\n" +
                        "if lockClientId == ARGV[1] then\n" +
                        "  redis.call('PEXPIRE', KEYS[1], ARGV[2])\n" +
                        "  return true\n" +
                        "end\n" +
                        "return false";
    
        /**
         * RedisLockRegistry(static).FIELDS
         */
        private static final String FIELD_OBTAIN_LOCK_SCRIPT = "OBTAIN_LOCK_SCRIPT";
    
        /**
         * redisLockRegistry(object).FIELDS
         */
        private static final String FIELD_CLIENT_ID = "clientId";
        private static final String FIELD_EXPIRE_AFTER = "expireAfter";
        private static final String FIELD_LOCKS = "locks";
    
        /**
         * redisLock(object).FIELDS
         */
        private static final String FIELD_LOCK_KEY = "lockKey";
        private static final String FIELD_LOCKED_AT = "lockedAt";
    
        private final RedisLockRegistry lockRegistry;
        private final StringRedisTemplate redisTemplate;
        private final RedisScript<Boolean> renewLockScript;
        private final String clientId;
        private final long expireAfter;
    
        public LockWatchdog(RedisLockRegistry lockRegistry, StringRedisTemplate redisTemplate) throws IllegalAccessException, NoSuchMethodException, InvocationTargetException {
            //依赖
            this.lockRegistry = lockRegistry;
            this.redisTemplate = redisTemplate;
    
            //刷新脚本
            this.renewLockScript = new DefaultRedisScript<>(RENEW_LOCK_SCRIPT, Boolean.class);
    
            //代理类的参数
            this.clientId = (String) UnsafeBeanUtils.getProperty(lockRegistry, FIELD_CLIENT_ID);
            Assert.hasText(this.clientId, "client id is required!");
            this.expireAfter = (Long) UnsafeBeanUtils.getProperty(lockRegistry, FIELD_EXPIRE_AFTER);
            Assert.notNull(this.expireAfter, "expire after is required!");
            Assert.isTrue(this.expireAfter > 0, "expire after <= 0");
    
            //check version
            String script = (String) UnsafeBeanUtils.getProperty(RedisLockRegistry.class, FIELD_OBTAIN_LOCK_SCRIPT);
            Assert.isTrue(CipherUtils.md5(script).equalsIgnoreCase(OBTAIN_LOCK_SCRIPT_VERSION),"verion error");
            log.info("init success clientId {}, expireAfter {}", clientId, expireAfter);
        }
    
        /**
         * 续约锁
         * 续约成功后会修改lockedAt字段,避免锁被超时回收了
         *
         * @param redisLock
         * @return
         * @throws IllegalAccessException
         * @throws NoSuchMethodException
         * @throws InvocationTargetException
         */
        private boolean renewLock(Object redisLock) throws IllegalAccessException, NoSuchMethodException, InvocationTargetException {
            // lock key 和 map key不同的,需要重新获取
            String lockKey = (String) UnsafeBeanUtils.getProperty(redisLock, FIELD_LOCK_KEY);
            if(log.isDebugEnabled()) {
                log.debug("LockWatchdog:try to renew {}", lockKey);
            }
            Boolean success =
                    redisTemplate.execute(renewLockScript,
                            Collections.singletonList(lockKey), clientId,
                            String.valueOf(expireAfter));
    
            boolean result = Boolean.TRUE.equals(success);
            if (result) {
                UnsafeBeanUtils.setProperty(redisLock, FIELD_LOCKED_AT, System.currentTimeMillis());
                if(log.isDebugEnabled()) {
                    log.debug("LockWatchdog:{} renew success!", lockKey);
                }
            } else {
                if(log.isDebugEnabled()) {
                    log.debug("LockWatchdog:{} renew fail!", lockKey);
                }
            }
    
            return result;
        }
    
        /**
         * 定时器(10s执行一次续约)
         * 这里直接获取到Map里的内容尝试续约
         * Map里的锁是会自动删除的(ExpirableLockRegistry)
         * 而且使用分布式锁的场景不算多,所以已经解锁的,也去尝试续约也是没问题的
         *
         * @throws NoSuchMethodException
         * @throws InvocationTargetException
         * @throws IllegalAccessException
         */
        @Scheduled(cron = "*/10 * * * * ?")
        private void scheduled() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
            if(log.isDebugEnabled()){
                log.debug("LockWatchdog:renew lock");
            }
            Map<String, Lock> locks = (Map<String, Lock>) UnsafeBeanUtils.getProperty(lockRegistry, FIELD_LOCKS);
            if(log.isDebugEnabled()) {
                log.debug("LockWatchdog:locks {}", locks.size());
            }
            if (MapUtils.isNotEmpty(locks)) {
                Iterator<Map.Entry<String, Lock>> iter = locks.entrySet().iterator();
                Map.Entry<String, Lock> entry = null;
                while (iter.hasNext()) {
                    entry = iter.next();
                    renewLock(entry.getValue());
                }
            } else {
                if(log.isDebugEnabled()) {
                    log.debug("LockWatchdog:not need to renew!");
                }
            }
            if(log.isDebugEnabled()) {
                log.debug("LockWatchdog:renew lock finish!");
            }
        }
    
        /**
         * 不安全的类操作
         */
        private final static class UnsafeBeanUtils {
            public static Object getProperty(final Class clazz, final String name) throws IllegalAccessException {
                Field field = FieldUtils.getDeclaredField(clazz, name, true);
                return field.get(clazz);
            }
    
            public static Object getProperty(final Object bean, final String name) throws IllegalAccessException {
                Field field = FieldUtils.getDeclaredField(bean.getClass(), name, true);
                return field.get(bean);
            }
    
            public static void setProperty(final Object bean, final String name, final Object value) throws IllegalAccessException {
                Field field = FieldUtils.getDeclaredField(bean.getClass(), name, true);
                field.set(bean, value);
            }
    
        }
    }
    

    代码说明

    • 使用了Scheduled 做定时任务,并且10s续约一次
      对,没判断当前锁时间,直接10s一次,本身就是临时解决方案,能达到目的就好了,系统设置了30s的锁时间,也就是没10s自动把超时时间重置为30s,注意,是重置,并不是延长,所以多次调用效果是没太大差别的,并且业务完成后主动释放锁是好习惯,再次,一个应用这种排他锁也就10把8把,多调用几次(续约)影响忽略不计
      有兴趣可以优化(按Redisson的判断下锁定时间,超时时间什么的)
    • 对于已经解锁的分布式锁,还会存在map里的,所以代码会出现续约失败的情况
      看代码是会一段时间清理的,目前看也可以在续约之前试试trylock,成功了 说明对方已经解锁了,不用续约了,但是成功了还需要解锁,多次调用redis,还不如直接一开始就尝试续约好了,等spring自己清理(这边就自然不会续了)
    • 临时方案,临时方案,临时方案,如果官方支持自动续约了,就用官方的好了

    相关文章

      网友评论

        本文标题:分布式锁续约问题-续

        本文链接:https://www.haomeiwen.com/subject/lhamgktx.html