美文网首页java高级开发
redis分布式锁过期时间到了,但业务没执行完怎么办?

redis分布式锁过期时间到了,但业务没执行完怎么办?

作者: 老鼠AI大米_Java全栈 | 来源:发表于2022-04-28 13:45 被阅读0次

    redisson是如何处理的?

    redisson给的答案是锁获取成功后,注册一个定时任务,每隔一定时间(this.internalLockLeaseTime / 3L)就去续约。internalLockLeaseTime可配置,默认30s。这种方式每次获取一个锁,就会创建一个定时任务,有些浪费。

    我的处理

    这里给出了一种方式,借鉴jvm对自旋锁优化的思想(根据历史耗时动态调整锁的过期时间),将续约的key的耗时保存在redis,再次获取锁时,直接使用上次耗时 + 10s 作为锁的过期时间, 不用每次都去续约。缺点:获取锁后->释放锁前,若每次执行耗时差距过大(业务方法的各条流程耗时差距过大),将得不到预期的效果。

    pom.xml如下:

    <dependencies> 
       <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter</artifactId>
       </dependency>
       <dependency>
          <groupId>org.projectlombok</groupId>
          <artifactId>lombok</artifactId>
          <optional>true</optional>
       </dependency>
       <dependency>
          <groupId>org.springframework.data</groupId>
          <artifactId>spring-data-redis</artifactId>
          <version>2.1.10.RELEASE</version>
       </dependency>
       <dependency>
          <groupId>io.lettuce</groupId>
          <artifactId>lettuce-core</artifactId>
          <version>5.1.8.RELEASE</version>
       </dependency>
    <dependencies>
    

    代码如下:

    package com.example.core;
     
    import lombok.Data;
    import lombok.ToString;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.data.redis.core.RedisCallback;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.data.redis.core.script.RedisScript;
    import org.springframework.stereotype.Service;
     
    import java.io.Serializable;
    import java.util.*;
    import java.util.concurrent.*;
     
    /**
     * redis分布式锁
     * redis分布式锁过期时间到了,但业务还没执行完,怎么办?
     *    redisson给的答案是锁获取成功后,注册一个定时任务,每隔一定时间(this.internalLockLeaseTime / 3L)就去续约。
     *    internalLockLeaseTime可配置,默认30s。
     *    这种方式每次获取一个锁,就会创建一个定时任务,有些浪费
     * 这里借鉴jvm对自旋锁优化的思想,将续约的key的耗时保存在redis,再次获取锁时,直接使用上次耗时 + 10s 作为锁的过期时间,
     * 不用每次都去续约。
     * 缺点:获取锁后->释放锁前,若每次执行耗时差距过大(业务方法的各条流程耗时差距过大),将得不到预期的效果
     *
     */
    @Slf4j
    @Service
    public class RedisDistributionLockPlus {
     
        /**
         * 加锁超时时间,单位毫秒, 即:加锁时间内执行完操作,如果未完成会有并发现象
         */
        private static final long DEFAULT_LOCK_TIMEOUT = 30;
     
        private static final long TIME_SECONDS_FIVE = 5 ;
     
        /**
         * 每个key的过期时间 {@link LockContent}
         */
        private Map<String, LockContent> lockContentMap = new ConcurrentHashMap<>(512);
     
        /**
         * redis执行成功的返回
         */
        private static final Long EXEC_SUCCESS = 1L;
     
        /**
         * 获取锁lua脚本, k1:获锁key, k2:续约耗时key, arg1:requestId,arg2:超时时间
         */
        private static final String LOCK_SCRIPT = "if redis.call('exists', KEYS[2]) == 1 then ARGV[2] = math.floor(redis.call('get', KEYS[2]) + 10) end " +
                "if redis.call('exists', KEYS[1]) == 0 then " +
                   "local t = redis.call('set', KEYS[1], ARGV[1], 'EX', ARGV[2]) " +
                   "for k, v in pairs(t) do " +
                     "if v == 'OK' then return tonumber(ARGV[2]) end " +
                   "end " +
                "return 0 end";
     
        /**
         * 释放锁lua脚本, k1:获锁key, k2:续约耗时key, arg1:requestId,arg2:业务耗时 arg3: 业务开始设置的timeout
         */
        private static final String UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then " +
                "local ctime = tonumber(ARGV[2]) " +
                "local biz_timeout = tonumber(ARGV[3]) " +
                "if ctime > 0 then  " +
                   "if redis.call('exists', KEYS[2]) == 1 then " +
                       "local avg_time = redis.call('get', KEYS[2]) " +
                       "avg_time = (tonumber(avg_time) * 8 + ctime * 2)/10 " +
                       "if avg_time >= biz_timeout - 5 then redis.call('set', KEYS[2], avg_time, 'EX', 24*60*60) " +
                       "else redis.call('del', KEYS[2]) end " +
                   "elseif ctime > biz_timeout -5 then redis.call('set', KEYS[2], ARGV[2], 'EX', 24*60*60) end " +
                "end " +
                "return redis.call('del', KEYS[1]) " +
                "else return 0 end";
        /**
         * 续约lua脚本
         */
        private static final String RENEW_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1], ARGV[2]) else return 0 end";
     
     
        private final StringRedisTemplate redisTemplate;
     
        public RedisDistributionLockPlus(StringRedisTemplate redisTemplate) {
            this.redisTemplate = redisTemplate;
            ScheduleTask task = new ScheduleTask(this, lockContentMap);
            // 启动定时任务
            ScheduleExecutor.schedule(task, 1, 1, TimeUnit.SECONDS);
        }
     
        /**
         * 加锁
         * 取到锁加锁,取不到锁一直等待知道获得锁
         *
         * @param lockKey
         * @param requestId 全局唯一
         * @param expire   锁过期时间, 单位秒
         * @return
         */
        public boolean lock(String lockKey, String requestId, long expire) {
            log.info("开始执行加锁, lockKey ={}, requestId={}", lockKey, requestId);
            for (; ; ) {
                // 判断是否已经有线程持有锁,减少redis的压力
                LockContent lockContentOld = lockContentMap.get(lockKey);
                boolean unLocked = null == lockContentOld;
                // 如果没有被锁,就获取锁
                if (unLocked) {
                    long startTime = System.currentTimeMillis();
                    // 计算超时时间
                    long bizExpire = expire == 0L ? DEFAULT_LOCK_TIMEOUT : expire;
                    String lockKeyRenew = lockKey + "_renew";
     
                    RedisScript<Long> script = RedisScript.of(LOCK_SCRIPT, Long.class);
                    List<String> keys = new ArrayList<>();
                    keys.add(lockKey);
                    keys.add(lockKeyRenew);
                    Long lockExpire = redisTemplate.execute(script, keys, requestId, Long.toString(bizExpire));
                    if (null != lockExpire && lockExpire > 0) {
                        // 将锁放入map
                        LockContent lockContent = new LockContent();
                        lockContent.setStartTime(startTime);
                        lockContent.setLockExpire(lockExpire);
                        lockContent.setExpireTime(startTime + lockExpire * 1000);
                        lockContent.setRequestId(requestId);
                        lockContent.setThread(Thread.currentThread());
                        lockContent.setBizExpire(bizExpire);
                        lockContent.setLockCount(1);
                        lockContentMap.put(lockKey, lockContent);
                        log.info("加锁成功, lockKey ={}, requestId={}", lockKey, requestId);
                        return true;
                    }
                }
                // 重复获取锁,在线程池中由于线程复用,线程相等并不能确定是该线程的锁
                if (Thread.currentThread() == lockContentOld.getThread()
                          && requestId.equals(lockContentOld.getRequestId())){
                    // 计数 +1
                    lockContentOld.setLockCount(lockContentOld.getLockCount()+1);
                    return true;
                }
     
                // 如果被锁或获取锁失败,则等待100毫秒
                try {
                    TimeUnit.MILLISECONDS.sleep(100);
                } catch (InterruptedException e) {
                    // 这里用lombok 有问题
                    log.error("获取redis 锁失败, lockKey ={}, requestId={}", lockKey, requestId, e);
                    return false;
                }
            }
        }
     
     
        /**
         * 解锁
         *
         * @param lockKey
         * @param lockValue
         */
        public boolean unlock(String lockKey, String lockValue) {
            String lockKeyRenew = lockKey + "_renew";
            LockContent lockContent = lockContentMap.get(lockKey);
     
            long consumeTime;
            if (null == lockContent) {
                consumeTime = 0L;
            } else if (lockValue.equals(lockContent.getRequestId())) {
                int lockCount = lockContent.getLockCount();
                // 每次释放锁, 计数 -1,减到0时删除redis上的key
                if (--lockCount > 0) {
                    lockContent.setLockCount(lockCount);
                    return false;
                }
                consumeTime = (System.currentTimeMillis() - lockContent.getStartTime()) / 1000;
            } else {
                log.info("释放锁失败,不是自己的锁。");
                return false;
            }
     
            // 删除已完成key,先删除本地缓存,减少redis压力, 分布式锁,只有一个,所以这里不加锁
            lockContentMap.remove(lockKey);
     
            RedisScript<Long> script = RedisScript.of(UNLOCK_SCRIPT, Long.class);
            List<String> keys = new ArrayList<>();
            keys.add(lockKey);
            keys.add(lockKeyRenew);
     
            Long result = redisTemplate.execute(script, keys, lockValue, Long.toString(consumeTime),
                    Long.toString(lockContent.getBizExpire()));
            return EXEC_SUCCESS.equals(result);
     
        }
     
        /**
         * 多服务器集群,使用下面的方法,代替System.currentTimeMillis(),获取redis时间,避免多服务的时间不一致问题!!!
         *
         * @return
         */
        public long currtTimeForRedis() {
            return redisTemplate.execute((RedisCallback<Long>) redisConnection -> redisConnection.time());
        }
     
        /**
         * 续约
         *
         * @param lockKey
         * @param lockContent
         * @return true:续约成功,false:续约失败(1、续约期间执行完成,锁被释放 2、不是自己的锁,3、续约期间锁过期了(未解决))
         */
        public boolean renew(String lockKey, LockContent lockContent) {
     
            // 检测执行业务线程的状态
            Thread.State state = lockContent.getThread().getState();
            if (Thread.State.TERMINATED == state) {
                log.info("执行业务的线程已终止,不再续约 lockKey ={}, lockContent={}", lockKey, lockContent);
                return false;
            }
     
            String requestId = lockContent.getRequestId();
            long timeOut = (lockContent.getExpireTime() - lockContent.getStartTime()) / 1000;
     
            RedisScript<Long> script = RedisScript.of(RENEW_SCRIPT, Long.class);
            List<String> keys = new ArrayList<>();
            keys.add(lockKey);
     
            Long result = redisTemplate.execute(script, keys, requestId, Long.toString(timeOut));
            log.info("续约结果,True成功,False失败 lockKey ={}, result={}", lockKey, EXEC_SUCCESS.equals(result));
            return EXEC_SUCCESS.equals(result);
        }
     
     
        static class ScheduleExecutor {
     
            public static void schedule(ScheduleTask task, long initialDelay, long period, TimeUnit unit) {
                long delay = unit.toMillis(initialDelay);
                long period_ = unit.toMillis(period);
                // 定时执行
                new Timer("Lock-Renew-Task").schedule(task, delay, period_);
            }
        }
     
        static class ScheduleTask extends TimerTask {
     
            private final RedisDistributionLockPlus redisDistributionLock;
            private final Map<String, LockContent> lockContentMap;
     
            public ScheduleTask(RedisDistributionLockPlus redisDistributionLock, Map<String, LockContent> lockContentMap) {
                this.redisDistributionLock = redisDistributionLock;
                this.lockContentMap = lockContentMap;
            }
     
            @Override
            public void run() {
                if (lockContentMap.isEmpty()) {
                    return;
                }
                Set<Map.Entry<String, LockContent>> entries = lockContentMap.entrySet();
                for (Map.Entry<String, LockContent> entry : entries) {
                    String lockKey = entry.getKey();
                    LockContent lockContent = entry.getValue();
                    long expireTime = lockContent.getExpireTime();
                    // 减少线程池中任务数量
                    if ((expireTime - System.currentTimeMillis())/ 1000 < TIME_SECONDS_FIVE) {
                        //线程池异步续约
                        ThreadPool.submit(() -> {
                            boolean renew = redisDistributionLock.renew(lockKey, lockContent);
                            if (renew) {
                                long expireTimeNew = lockContent.getStartTime() + (expireTime - lockContent.getStartTime()) * 2 - TIME_SECONDS_FIVE * 1000;
                                lockContent.setExpireTime(expireTimeNew);
                            } else {
                                // 续约失败,说明已经执行完 OR redis 出现问题
                                lockContentMap.remove(lockKey);
                            }
                        });
                    }
                }
            }
        }
     
     
        @Data
        @ToString
        public static class LockContent implements Serializable {
     
            /**
             * 锁过期时间,单位秒
             */
            private volatile long lockExpire;
     
            /**
             * 锁过期时间,单位毫秒
             */
            private volatile long expireTime;
     
            /**
             * 获取锁的开始时间,单位毫秒
             */
            private volatile long startTime;
     
            /**
             * 用于防止锁的误删,全局唯一
             */
            private String requestId;
     
            /**
             * 执行业务的线程
             */
            private volatile Thread thread;
     
            /**
             * 业务调用设置的锁过期时间,单位秒
             */
            private long bizExpire;
     
            /**
             * 重入次数
             */
            private int lockCount = 0;
     
        }
     
     
        static class ThreadPool {
     
            private static final int CORESIZE = Runtime.getRuntime().availableProcessors();
            private static final int MAXSIZE = 200;
            private static final int KEEPALIVETIME = 60;
            private static final int CAPACITY = 2000;
     
            private static ThreadPoolExecutor threadPool;
     
            static {
                init();
            }
     
            private ThreadPool() {}
     
            /**
             * 初始化所有线程池。
             */
            private static void init() {
                threadPool = new ThreadPoolExecutor(CORESIZE, MAXSIZE, KEEPALIVETIME,
                        TimeUnit.SECONDS, new LinkedBlockingQueue<>(CAPACITY));
                log.info("初始化线程池成功。");
            }
     
            public static Future<?> submit(Runnable task) {
                if (null == task) {
                    throw new IllegalArgumentException("任务不能为空");
                }
                return threadPool.submit(task);
            }
        }
    }
    

    相关文章

      网友评论

        本文标题:redis分布式锁过期时间到了,但业务没执行完怎么办?

        本文链接:https://www.haomeiwen.com/subject/paqdyrtx.html