美文网首页
Redis实现分布式锁(利用分布式锁,实现分布式定时任务)

Redis实现分布式锁(利用分布式锁,实现分布式定时任务)

作者: Knight_9 | 来源:发表于2020-12-10 16:58 被阅读0次

    简述

    利用Redis的Setnx命令,来实现一个分布式的加锁方案。利用注解,在拥有该注解的方法上,进行切面处理,在方法执行前,进行加锁,执行结束后,根据是否自动释放锁,进行解锁。
    将该注解用在定时任务的方法上,即可实现分布式定时任务,即获取到锁的方法,才会执行。

    1 redis命令

    1.1 setnx命令

    • Redis setnx(SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。(该命令无法设置过期时间)
      Redis为单进程单线程模式,采用队列模式将并发访问变成串行访问,且多客户端对Redis的连接并不存在竞争关系。redis的SETNX命令可以方便的实现分布式锁。
      当某一个客户端将key的值设置成功后,其他的客户端再进行设置,将返回失败,保证同一时间,只有一个客户端能够设置成功。
    • Redis事务
      watch key1 key2 ... : 监视一或多个key,如果在事务执行之前,被监视的key被其他命令改动,则事务被打断 ( 类似乐观锁 )
      multi : 标记一个事务块的开始( queued )
      exec : 执行所有事务块的命令 ( 一旦执行exec后,之前加的监控锁都会被取消掉 ) 
      discard : 取消事务,放弃事务块中的所有命令
      unwatch : 取消watch对所有key的监控
    # 事务正常使用
    127.0.0.1:6379> multi
    127.0.0.1:6379> set name jack
    127.0.0.1:6379> exec
    
    # 取消事务
    127.0.0.1:6379> multi
    127.0.0.1:6379> set name jack
    127.0.0.1:6379> discard
    
    # watch使用
    # number初始为10
    127.0.0.1:6379> watch number
    127.0.0.1:6379> multi
    127.0.0.1:6379> set number 11
    127.0.0.1:6379> exec
    # 如果在执行exec时,number没有被其他客户端修改,还是10,则事务执行成功;
    # 如果被其他客户端修改了,number不是10了,则事务执行失败,这时候就需求程序自行处理,进行再次提交或者其他操作
    
    
    • 在spring boot 中,我们用StringRedisTemplate来操作Redis,它的方法:stringRedisTemplate.opsForValue().setIfAbsent()方法即对应setnx命令,这个方法有两个重载的方法:
      1、Boolean setIfAbsent(K key, V value); 设置key value,返回成功/失败
      2、Boolean setIfAbsent(K key, V value, long timeout, TimeUnit unit); 设置key value,返回成功/失败,同时设置过期时间,redisTemplate 会调用 EXPIRE进行过期时间的设定,同时在设置值和过期时间时,会开启事务,保存全部成功。
    // org.springframework.data.redis.core 中实现的方法
    @Override
    public Boolean setIfAbsent(K key, V value) {
    
        byte[] rawKey = rawKey(key);
        byte[] rawValue = rawValue(value);
        return execute(connection -> connection.setNX(rawKey, rawValue), true);
    }
    
    @Override
    public Boolean setIfAbsent(K key, V value, long timeout, TimeUnit unit) {
    
        byte[] rawKey = rawKey(key);
        byte[] rawValue = rawValue(value);
    
        Expiration expiration = Expiration.from(timeout, unit);
        return execute(connection -> connection.set(rawKey, rawValue, expiration, SetOption.ifAbsent()), true);
        }
    

    1.2 DEL命令、lua脚本

    在加锁之后,解锁时,需要判断锁,是否是当前线程所拥有的,如果是当前线程拥有的,则删除该key,删除key,用del命令。

    • del key_name

    我们会先取出key对应的值,然后判断是否和当前线程的定义的值一致。如果一致,则说明是该线程拥有的key。如果我们在代码中取出key的值,然后判断通过后,调用redis del 删除key,这就不是一个原子操作了。如果在我们取出key的值后,然后在删除前,其他线程获取了锁,当前线程删除的动作,就会导致删除其他线程拥有的锁。所以释放锁,需要利用lua脚本进行,将判断和删除,这两个动作,合为一个原子性的操作。
    所以我们会利用代码去执行下面的lua脚本,保证判断和删除的原子性。

    if redis.call("get",KEYS[1]) == ARGV[1] then
        return redis.call("del",KEYS[1])
    else
        return 0
    end
    

    一般教程中,利用RedisTemplate来执行lua脚本时,会将lua脚本放到静态资源目录中。而在下面的代码中,利用ByteArrayResource直接从String字符串中读取了lua脚本内容:

        /*
         * 保存lua脚本
         */
        private DefaultRedisScript<List> getRedisScript;
    
        @PostConstruct
        public void init(){
            // 定义lua脚本资源
            // 也可以放到文件中,加载进来: new ResourceScriptSource(new ClassPathResource("redis/demo.lua"))
            String luaStr = "if redis.call('get',KEYS[1]) == ARGV[1] then return   redis.call('del',KEYS[1])  else return 0 end";
            ByteArrayResource resource = new ByteArrayResource(luaStr.getBytes());
    
            getRedisScript = new DefaultRedisScript<>();
            getRedisScript.setResultType(List.class);
            getRedisScript.setScriptSource(new ResourceScriptSource(resource));
        }
    

    2 分布式锁实现

    下面是实现的核心类:

    • RedisLock: reids分布式锁工具类
    • EmLock: 分布式锁注解
    • LockRangeEnum: 分布式锁的范围枚举
    • EmLockAspect: 分布式锁切面

    2.1 RedisLock,reids分布式锁工具类

    代码如下:

    package com.emdata.lowvis.common.redislock;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.stereotype.Component;
    import org.springframework.util.StringUtils;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * reids分布式锁工具类
     *
     * @version 1.0
     * @date 2020/12/8 14:37
     */
    @Slf4j
    @Component
    public class RedisLock {
    
        private static final String SPLIT = "_";
    
        @Autowired
        private StringRedisTemplate stringRedisTemplate;
    
        /**
         * 加锁解锁工具类
         * @param lockKey 加锁的key
         * @param uuid 线程的标志
         * @param timeout 超时时间
         * @param timeUnit 超时时间粒度
         * @return true:获取成功
         */
        public boolean lock(String lockKey, String uuid, long timeout, TimeUnit timeUnit) {
            // 根据key获取值
            String currentLock = stringRedisTemplate.opsForValue().get(lockKey);
    
            // 值为:uuid_时间
            String value = uuid + SPLIT + (timeUnit.toMillis(timeout) + System.currentTimeMillis());
    
            // 如果为空,则设置值
            if (StringUtils.isEmpty(currentLock)) {
                if (stringRedisTemplate.opsForValue().setIfAbsent(lockKey, value, timeout, timeUnit)) {
                    // 对应setnx命令,可以成功设置,也就是key不存在,获得锁成功
                    return true;
                } else {
                    return false;
                }
            } else {
                // 可重入锁,如果是这个uuid持有的锁,则更新时间
                if (currentLock.startsWith(uuid)) {
                    stringRedisTemplate.opsForValue().set(lockKey, value, timeout, timeUnit);
                    return true;
                } else {
                    return false;
                }
            }
        }
        
        /*
         * 保存lua脚本
         */
        private DefaultRedisScript<List> getRedisScript;
    
        @PostConstruct
        public void init(){
            // 定义lua脚本资源
            // 也可以放到文件中,加载进来: new ResourceScriptSource(new ClassPathResource("redis/demo.lua"))
            String luaStr = "if redis.call('get',KEYS[1]) == ARGV[1] then return   redis.call('del',KEYS[1])  else return 0 end";
            ByteArrayResource resource = new ByteArrayResource(luaStr.getBytes());
    
            getRedisScript = new DefaultRedisScript<>();
            getRedisScript.setResultType(List.class);
            getRedisScript.setScriptSource(new ResourceScriptSource(resource));
        }
    
        /**
         * 释放锁
         *
         * @param lockKey 加锁的key
         * @param uuid 线程的标志
         */
        public void release(String lockKey, String uuid) {
            try {
                List<Integer> execute = stringRedisTemplate.execute(getRedisScript, Collections.singletonList(lockKey), uuid);
                log.debug("解锁结果: {}", execute.get(0) == 0);
            } catch (Exception e) {
                log.error("解锁异常, key: {}, uuid: {}", lockKey, uuid);
                log.error("", e);
            }
        }
    
    }
    

    2.2 EmLock,分布式锁注解

    package com.emdata.lowvis.common.redislock;
    
    import java.lang.annotation.*;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 分布式锁注解
     *
     * @version 1.0
     * @date 2020/12/8 17:59
     */
    @Documented
    @Retention(RetentionPolicy.RUNTIME)
    @Target({ElementType.METHOD})
    public @interface EmLock {
    
        /**
         * 锁的范围,默认应用级别
         * @return 锁的范围
         */
        LockRangeEnum lockRange() default LockRangeEnum.APPLICATION;
    
        /**
         * 锁对应的key
         * @return key
         */
        String key();
    
        /**
         * 锁超时时间
         * @return 时间
         */
        int timeout() default 5;
    
        /**
         * 锁超时时间粒度
         * @return 粒度
         */
        TimeUnit timeUnit() default TimeUnit.SECONDS;
    
        /**
         * 是否自动释放锁
         * @return true: 方法完成后,自动释放
         */
        boolean autoRelease() default true;
    }
    
    

    2.3 LockRangeEnum, 分布式锁的范围枚举

    package com.emdata.lowvis.common.redislock;
    
    /**
     * 分布式锁的范围枚举
     *
     * @author pupengfei
     * @version 1.0
     * @date 2020/12/10 13:46
     */
    public enum LockRangeEnum {
    
        /**
         * 应用级别,锁的级别在整个应用容器内
         */
        APPLICATION,
    
        /**
         * 线程级别,锁的级别在每个线程
         */
        THREAD
    
    }
    
    

    2.4 EmLockAspect,分布式锁切面

    package com.emdata.lowvis.common.redislock;
    
    import com.emdata.lowvis.common.utils.UUIDUtils;
    import lombok.extern.slf4j.Slf4j;
    import org.aspectj.lang.ProceedingJoinPoint;
    import org.aspectj.lang.annotation.Around;
    import org.aspectj.lang.annotation.Aspect;
    import org.aspectj.lang.annotation.Pointcut;
    import org.aspectj.lang.reflect.MethodSignature;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.stereotype.Component;
    
    import java.lang.reflect.Method;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 分布式锁切面
     *
     * @version 1.0
     * @date 2020/12/8 17:59
     */
    @Slf4j
    @Component
    @Aspect
    @Configuration
    public class EmLockAspect {
    
        @Autowired
        private RedisLock redisLock;
    
        /**
         * 应用级别的容器的id
         */
        private final String appUUID = UUIDUtils.get();
    
        /**
         * 线程级别的线程的id
         */
        private final ThreadLocal<String> threadUUID = ThreadLocal.withInitial(UUIDUtils::get);
    
        /**
         * 定义切点
         */
        @Pointcut("@annotation(com.emdata.lowvis.common.redislock.EmLock)")
        public void lockAop() {
    
        }
    
        @Around("lockAop()")
        public Object doAround(ProceedingJoinPoint point) throws Throwable {
            // 获取方法
            MethodSignature signature = (MethodSignature) point.getSignature();
            Method method = signature.getMethod();
    
            // 看有没有日志注解
            EmLock emLock = method.getAnnotation(EmLock.class);
            if (emLock == null) {
                return point.proceed();
            }
    
            // 获取锁的级别
            LockRangeEnum lockRangeEnum = emLock.lockRange();
            String uuid = lockRangeEnum == LockRangeEnum.APPLICATION ? appUUID : threadUUID.get();
    
            // 获取锁的key和超时时间
            String key = emLock.key();
            int timeout = emLock.timeout();
            TimeUnit timeUnit = emLock.timeUnit();
    
            // 加锁
            boolean lock = redisLock.lock(key, uuid, timeout, timeUnit);
    
            Object proceed = null;
    
            try {
                if (lock) {
                    log.info("获取到锁,继续执行...");
                    // 继续执行
                    proceed = point.proceed();
                }
            } finally {
                // 自动释放,则释放锁
                if (emLock.autoRelease()) {
                    redisLock.release(key, uuid);
                }
            }
    
            return proceed;
        }
    
    }
    
    

    3 使用示例

    3.1 使用RedisLock

          @Autowired
        private RedisLock redisLock;
    
        public void useLock() {
            // 定义锁的key
            String lockKey = "camera_update_key";
            String uuid = UUIDUtils.get();
    
            // 定义超时时间
            long timeout = 5;
            TimeUnit timeUnit = TimeUnit.SECONDS;
    
            // 加锁
            boolean lock = redisLock.lock(lockKey, uuid, timeout, timeUnit);
            try {
                if (lock) {
                    log.info("执行...");
                } else {
                    throw new IllegalStateException("未获取到锁,放弃执行");
                }
            } finally {
                // 在finally里面进行解锁
                redisLock.release(lockKey, uuid);
            }
        }
    

    3.2 使用EmLock

    @Component
    @Slf4j
    public class ScheduleTask {
    
        /**
         * 用在定时任务方法上,锁的key为test_lock,指定了超时时间为2秒钟
         * 锁的级别为默认的应用级别(LockRangeEnum.APPLICATION),在这个如果应用启动了多个容器运行,在只会有一个容器获取到锁,
         * 自动释放锁为false,即方法执行完成后,也不会自动释放锁,只有到超时时间了,锁才会释放
         */
        @Scheduled(cron = "0 0/1 * * * ? ")
        @EmLock(key = "test_lock", timeout = 2, timeUnit = TimeUnit.SECONDS, autoRelease = false)
        public void recordUpdateTask() {
            log.info("执行任务.......");
        }
       
       /**
         * 用在普通的方法上,锁的key为method_Lock,指定了超时时间为1分钟,
         * 锁的级别为默认的线程级别,在该应用内多个线程执行该方法,则只会有一个线程获取到锁
         * 如果启动了多个应用容器,同样多个容器内的所有线程,也只会有一个线程获取到锁
         */
        @EmLock(key = "method_Lock", timeout = 1, timeUnit = TimeUnit.MINUTES, lockRange = LockRangeEnum.THREAD)
        public void recordUpdate() {
            log.info("执行任务2.......");
        }
    }
    

    4 使用注意

    • 使用Redis作为分布式锁的实现,依赖于Redis服务,如果Redis服务无法正常访问,则会导致整个方法无法执行。
    • 如果EmLock注解用在定时任务上时,如果应用运行在不同的服务器上,或者不同的docker容器里面时,必须保证运行环境的时间一致。
    • 如果设置了定时任务上面的锁,不是自动释放的,则运行环境的时间,相差不大于锁超时时间的时候,也可以保证定时任务,唯一执行。因为在超时时间范围内,某个应用容器持有该锁,其他应用来获取锁时,同样获取不到,方法不会执行。

    相关文章

      网友评论

          本文标题:Redis实现分布式锁(利用分布式锁,实现分布式定时任务)

          本文链接:https://www.haomeiwen.com/subject/ttalgktx.html