美文网首页Java
Redis的分布式锁

Redis的分布式锁

作者: 呼噜噜睡 | 来源:发表于2020-04-11 12:04 被阅读0次

    经常听到分布式锁这个名词,觉得高大上又离我很遥远,于是也没有进行什么深究。直到面试被反复问道,我觉得有必要了解一下,以跟上这个时代的节奏。
    常见的分布式锁实现,一个是ZooKeeper,另一个就是Redis了。Redis这么熟悉,当然是拿它先开刀了。在进行锁的实现的时候,有几个基本概念,需要先说明一下。
    第一个就是锁的概念,这个锁不像java或者jvm中那种复杂又难懂的锁,你可以把锁理解为一个值,根据这个值的存在与否,来决定是否获取到锁。
    在Redis中,有一种称为乐观锁的东西,什么叫乐观锁,意思就是这种策略是乐观的,总乐观的认为别的客户端连接不会修改该值,因此除了本客户端连接可以读取到某种资源(存储在Redis上的键值对信息),其它客户端连接也可以读取到。那万一其余客户端连接修改了资源呢?这个时候Redis Server会给加了乐观锁的客户端连接发送消息,该诉你该数据已经修改过了,这个时候本客户端连接可以选择循环重试或者直接退出。这个听起来是不是像CAS的操作呢? 我的感觉是像极了。 具体涉及的命令就是watch unwatch multi exec,这些命令什么意思,下文详细说明。
    另一种锁称之为悲观锁。什么是悲观锁呢?就是客户端连接总是认为数据随时都可能被其余客户端连接修改掉,所以加了悲观锁,其余的客户端连接在此刻是无法读取到资源,自然也无法进行操作。当本客户端连接释放了锁,其余的客户端连接才可以获取锁,进而进行操作。涉及的命令不仅包括watch unwatch multi exec,也包括setnx ttl expire等命令,同样后文细说。

    不像java中已经提供了各种锁以及同步容器,并发工具,Redis本身它不提供任何锁的实现,也没有乐观锁、悲观锁,超时锁等东西,这需要我们组合使用Redis的各种原子命令以及不同的命令特性,来自己实现锁,同时在代码层面一致的使用和释放锁。

    再来说一下Redis客户端。Redis是用c语言写的,要么使用自带的redis-cli进行连接和交互,要么使用根据通信协议封装好的不同语言的客户端库。Redis的java客户端库很多,常用的有Jedis、Lettuce 。并且Spring SpringBoot默认的就是支持这两种客户端,所以学习了这两个客户端对以后也很有帮助。Lettuce 的功能更加强大,它基于netty,支持Redis的哨兵模式和集群模式。但是由于我知识浅薄,只能拿Jedis来作为演示啦。

    由于乐观锁会导致无谓的修改循环重试,导致很少能够修改成功,耗费资源。而悲观锁,虽然在获取锁时不断重试,但对于修改资源,却是一次就成功了,在资源竞争严重的时候,悲观锁策略性能更好,因此这里主要选择悲观锁这种思想来进行代码演示。

    首先我们设定一个键为锁(Redis就是Key Value型存储),并使用setnx命令,该命令的特点是,如果键存在,则设置定指定的值,并返回1,如果键存在,则什么都不做,并返回0。对应到代码中来就是,如果执行setnx命令返回了1,则说明获取到了锁,代码可以继续往下执行,如果执行setnx命令返回0,则说明没有获取到锁,当前线程等待或者重试。

    获取到锁,执行代码完毕,需要释放锁。怎么释放呢,就是很简单的执行del命令即可,即把锁的key给删除,这样其余的连接就可以获取锁了。
    伪代码如下:

    while(con.setnx(lockKey,lockValue)==0){
            //休眠  重试获取锁
    }
    // 执行业务代码
    con.del(lockKey);
    

    一切看起来很美好,不是吗?但现实总是千变万化的,一种可能是一段代码的执行,需要在不同地方获取不同的锁,导致死锁的发生,又或者是网络故障或者客户进程崩溃,造成锁永远无法释放。这就会导致其余的Redis连接一直无法获取到锁,因为一台机器的代码问题,网络问题,机器故障等原因,导致所有的服务都变得不可用,这是让人无法接受的,这违背了分布式的初衷。
    怎么解决这个问题呢?我们可以指定一个锁的过期时间,比如10s后这个锁会过期,并且极限条件下业务执行时间也不会超过10s。(在服务有互相依赖,复杂的服务调用中,调用链越长,超时时间越不好预估,但这个也是在解决问题和性能之间做一个平衡,超时时间设置太长,性能会大大降低,超时时间太短,会造成并发问题,因为一个连接中代码还没有执行完,锁已经被删除,同时另一个连接获取到了锁,执行业务代码)。设置了锁的过期时间,解决了锁不释放的问题,但是同时引入的新的问题,那就是可能会删除其余连接的锁。比如A连接获取到锁key1,执行很长时间,此时锁过期,被删除,另一个连接B获取到锁key1,并执行对应的代码,此时A连接执行结束,于是释放锁,但是此时,其实它的锁已经被释放了,在锁过期的时候,现在它释放的是B链接的锁,那是不对的。假如此刻C连接进来,是能够获取到锁的,那么就意味着B C在同时执行业务代码,违背了锁当初设计的本意,因此绝对不能释放其余连接的锁,而只能释放自己的。
    那么如何解决这个问题?其实我们可以在链接获取锁的时候,设置一个只有当前连接知道的唯一值,释放的时候会先取出锁的值,进行比较,只有跟存入的值是一致的时候,才会释放锁,也就是删除键,否则,什么也不做。

    分析了这么多,我们可以看看获取锁的工具类代码:

        /**
         * 在指定的等待时限内获取锁
         * @param jedis 连接
         * @param lockName 锁名称
         * @param timeOutMillionSeconds  获取锁超时时间   -1:一直等待,直到获取到锁
         * @return 如果获取到锁,返回一个锁标识符,否则返回null
         */
        public static String acquireLock(Jedis jedis,String lockName,long timeOutMillionSeconds){
            // 略去各类校验
            String identifier = UUID.randomUUID().toString();
            long timeEnd = timeOutMillionSeconds==-1?Long.MAX_VALUE:System.currentTimeMillis() + timeOutMillionSeconds;
            while(System.currentTimeMillis()<=timeEnd){
                long result = jedis.setnx(lockName,identifier);
                if(result==1){// 等于1  说明没有设置过,获取锁成功
                    return identifier;
                }
                try {
                    TimeUnit.MILLISECONDS.sleep(100);// 线程休眠值  根据业务来定
                }catch (InterruptedException e){//假设在本机没有多线程编程  不会通知另一方线程中断 根据具体业务来
                    throw new RuntimeException(e);
                }
            }
            return null;
        }
    

    注意代码中的jedis没有close,根据需要,也可以在工具类中close掉。

    以及超时锁,为了防止setnx和expire之间,程序奔溃,造成超时时间没有设置上,因此其余的连接在获取不到锁的时候,会先判断锁有没有过期时间,如果没有,给锁加上过期时间:

        /**
         *在指定的等待时限内获取锁,该锁自身带有超时特性
         * @param jedis 连接
         * @param lockName 锁名称
         * @param timeOutMillionSeconds  获取锁超时时间   -1:一直等待,直到获取到锁
         * @param lockTimeOutSeconds 锁的超时时间
         * @return 如果获取到锁,返回一个锁标识符,否则返回null
         */
        public static String acquireLockTimeOut(Jedis jedis,String lockName,long timeOutMillionSeconds,int lockTimeOutSeconds){
            // 略去各类校验
            String identifier = UUID.randomUUID().toString();
            long timeEnd = timeOutMillionSeconds==-1?Long.MAX_VALUE:System.currentTimeMillis() + timeOutMillionSeconds;
            while(System.currentTimeMillis()<=timeEnd){
                long result = jedis.setnx(lockName,identifier);
                if(result==1){// 等于1  说明没有设置过,获取锁成功
                    jedis.expire(lockName,lockTimeOutSeconds);
                    return identifier;
                }else{
                    if(jedis.ttl(lockName)==-1){
                        jedis.expire(lockName,lockTimeOutSeconds);
                    }
                }
                try {
                    TimeUnit.MILLISECONDS.sleep(100);// 线程休眠值  根据业务来定
                }catch (InterruptedException e){//假设在本机没有多线程编程  不会通知另一方线程中断 根据具体业务来
                    throw new RuntimeException(e);
                }
            }
            return null;
        }
    

    释放锁的代码:

        /**
         * 关于watch  multi exec等命令,参考https://redis.io/topics/transactions   才能深刻理解
         * @param jedis  连接
         * @param lockName  锁名称
         * @param identifier  锁标识符
         * @return 是否成功释放锁   仅作为参考
         */
        public static boolean releaseLock(Jedis jedis,String lockName,String identifier){
            // 略去各类校验
            boolean releaseNormal = false;// 锁是否正常释放
            jedis.watch(lockName);
            String identifierInRedis = jedis.get(lockName);
            if(identifier.equalsIgnoreCase(identifierInRedis)){// 如果标识符没有改动,则说明可以解锁
                Transaction transaction = jedis.multi();
                transaction.del(lockName);
                transaction.exec();
                releaseNormal = true;
            }else{
                jedis.unwatch();
                releaseNormal = false;
            }
            return releaseNormal;
        }
    

    这里重点解释一下释放锁的操作:只有现在取出的跟当时存入的值一致,才会进行删除操作。但为了防止get 和del之间的某个时候,另一个连接修改了锁的值,(为什么会修改?是因为当前连接A在执行完get之后,锁过期了,因此另一个连接B可以获取到锁,现在A执行删除操作,就是删除B连接获取到的锁),因此需要watch 操作,如果现在取出的值和当初存入的不一致,那么直接执行unwatch并返回。为什么要执行unwatch呢?因为为了安全,加入不执行unwatch就返回,在后续的代码中执行multi和exec,那就有很大的问题,当锁被删除或者修改,就会打断当前的事务,但是该事物跟锁是没有任何关系的,所以unwatch是一个需要执行的操作。另一个情况是假如当前连接取出的锁的值,跟存入的一直,就需要执行删除锁的操作。可能有同学就会问了,Redis的所有操作都是原子操作,执行del和包裹在multi exec中执行del不是一样的原子操作吗?为何还要多此一举,让代码变得不好理解。在这一点上,它们确实并无任何区别,但是重点是之前有一个watch命令。假如没有执行watch multi del exec这样的顺序,就会有释放掉其余连接的锁的风险,为什么会这样,上文已经做了分析。在get和del之间发生的事情,当前连接是不知道的,get del的执行不是原子性的。有了watch multi del exec这个顺序,当前连接A get执行之后,锁失效,且被另一个连接B获取到锁,也就是修改了锁,因为watch(key),所以当前连接就知道了有人修改了。当执行exec的时候,就会丢弃掉del命令,因为watch的通知使得事务已经失效了,这保证了其余连接的锁不会被删除。同时,当执行exec的时候,不论事务成功与否,都unwatch了。最终呢,释放锁的代码看起来就是这样了。
    写好了工具类,我们应该测试一下,看看是不是真的,我们可以使用线程池,放入200个任务,每个任务都是执行获取Redis的某个键,并加一,再设置回Redis。在执行代码前,进行锁获取,执行完毕,进行锁释放。为了等到所有线程执行完毕,便于获取最终执行结果,使用CountDownLatch进行等待线程池所有任务的执行完毕。另外的一些部分是初始化动作,防止锁已经设置了或者指定的键已经有值了。代码如下:

    public class LockTest {
        private static final Log log = LogFactory.getLog(LockTest.class);
        public static void main(String[] args)throws Exception {
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4,8,10,
                    TimeUnit.SECONDS,new LinkedBlockingDeque<Runnable>(1000),new ThreadPoolExecutor.CallerRunsPolicy());
            JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
            jedisPoolConfig.setMaxTotal(10);
            final JedisPool jedisPool = new JedisPool(jedisPoolConfig,"192.168.99.100");
            final CountDownLatch countDownLatch = new CountDownLatch(200);
            final String lockName = "lock_a";
            Jedis jedisInit = jedisPool.getResource();
            jedisInit.del(lockName);
            final String testResource = "test_str";
            jedisInit.set(testResource,"0");
            jedisInit.close();
            for(int i=0;i<200;i++){
                threadPoolExecutor.execute(new Runnable() {
                    public void run() {
                        Jedis jedis = jedisPool.getResource();
                        String identifiler = RedisSetnxLock.acquireLock(jedis,lockName,-1);
                        if(identifiler==null) {
                            log.info("获取锁失败");
                            countDownLatch.countDown();
                            jedis.close();
                            return;
                        }
                        try{
                            String value = jedis.get(testResource);
                            Thread.sleep(200);// 故意休眠
                            jedis.set(testResource,(Integer.parseInt(value)+1)+"");
                        }catch (Exception e){
                            e.printStackTrace();
                        }finally {
                            RedisSetnxLock.releaseLock(jedis,lockName,identifiler);
                            jedis.close();
                            countDownLatch.countDown();
                        }
    
                    }
                });
            }
            countDownLatch.await();
            log.info(jedisPool.getResource().get(testResource));
            threadPoolExecutor.shutdown();
        }
    }
    

    最后的执行结果符合预期。
    为了演示连接挂掉或者执行超常任务的情形,可以执行下面的测试:

    public class LockTest {
        private static final Log log = LogFactory.getLog(LockTest.class);
        public static void main(String[] args)throws Exception {
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4,8,10,
                    TimeUnit.SECONDS,new LinkedBlockingDeque<Runnable>(1000),new ThreadPoolExecutor.CallerRunsPolicy());
            JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
            jedisPoolConfig.setMaxTotal(10);
            final JedisPool jedisPool = new JedisPool(jedisPoolConfig,"192.168.99.100");
            final CountDownLatch countDownLatch = new CountDownLatch(200);
            final String lockName = "lock_a";
            Jedis jedisInit = jedisPool.getResource();
            jedisInit.del(lockName);
            final String testResource = "test_str";
            jedisInit.set(testResource,"0");
            jedisInit.close();
            for(int i=0;i<200;i++){
                threadPoolExecutor.execute(new Runnable() {
                    public void run() {
                        Jedis jedis = jedisPool.getResource();
                        // 锁的超时时间为1s
                        String identifiler = RedisSetnxLock.acquireLockTimeOur(jedis,lockName,-1,1);
                        if(identifiler==null) {
                            log.info("获取锁失败");
                            countDownLatch.countDown();
                            jedis.close();
                            return;
                        }
                        try{
                            String value = jedis.get(testResource);
                            Thread.sleep(200);
                            jedis.set(testResource,(Integer.parseInt(value)+1)+"");
                        }catch (Exception e){
                            e.printStackTrace();
                        }finally {
                            // 每次不释放锁,模拟执行超常任务或者进程挂掉的情形
                            //RedisSetnxLock.releaseLock(jedis,lockName,identifiler);
                            jedis.close();
                            countDownLatch.countDown();
                        }
    
                    }
                });
            }
            countDownLatch.await();
            log.info(jedisPool.getResource().get(testResource));
            threadPoolExecutor.shutdown();
        }
    }
    

    执行结果同样正确,只是执行时间变长了。

    相关maven pom:

    <!--jedis client-->
            <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>3.2.0</version>
            </dependency>
            <!--jedis连接池-->
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-pool2</artifactId>
                <version>2.8.0</version>
            </dependency>
            <dependency>
                <groupId>commons-logging</groupId>
                <artifactId>commons-logging</artifactId>
                <version>1.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-slf4j-impl</artifactId>
                <version>2.13.1</version>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-core</artifactId>
                <version>2.13.1</version>
            </dependency>
    

    参考书籍:《Redis实战》,一本非常棒的书。

    相关文章

      网友评论

        本文标题:Redis的分布式锁

        本文链接:https://www.haomeiwen.com/subject/mhgemhtx.html