美文网首页
redis 分布式锁

redis 分布式锁

作者: skipper_shou | 来源:发表于2020-12-28 19:44 被阅读0次

    前言

    在日常的业务开发中,为了保证线上服务的安全性,往往同一个服务会部署多个服务器,以防止其中一个服务器宕机导致的服务不可用。
    在这个背景下,有些接口又不允许同一时刻有多个线程同时访问,或者说某两个逻辑不允许同一时刻有多个线程同时访问,比如说两个人pk时,不能同时使用同一种道具。在此时,就需要分布式锁来保证。

    实现原理

    常见的redis分布式锁实现原理其实非常简单。
    利用redis的单线程,具备互斥的特性,通过setnx、exist等来实现简单的分布式锁。
    以下会介绍几个自主实现的redis分布式锁。
    以下通过spring-data-redis中jedis客户端的方式来进行示例

    • 快速失败
      尝试获取锁,已经加锁则直接失败
        @Resource
        private RedisTemplate redisTemplate;
    
        private Boolean tryLock() {
    
            return redisTemplate.opsForValue().setIfAbsent("lock", "true", 10, TimeUnit.SECONDS);
        }
    

    类似幂等性的一个功能,当然也可以做如果未获取锁完成其他业务逻辑的功能。

    • 等待获取锁
      尝试获取锁,如果已经加锁,则一直等待
        @Resource
        private RedisTemplate redisTemplate;
    
        private Boolean checkLock() {
            try {
                while (!redisTemplate.opsForValue().setIfAbsent("lock", "true", 10, TimeUnit.SECONDS)) {
                    Thread.sleep(10);
                }
                //业务逻辑
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                redisTemplate.delete("lock");
            }
            return true;
        }
    

    在某些场景下,必须获取到锁,然后进行下一步的业务逻辑时需要,但是如果获取锁的线程运行时间过长,会造成该线程一直等待,当并发高时,会造成异常。

    • 等待指定时间获取锁
      尝试获取锁,如果已经加锁,则等待指定时间,再次尝试获取锁,超过指定次数,则自动认为获取锁
        @Resource
        private RedisTemplate redisTemplate;
    
        private Boolean checkLockLimitTimes(Integer times) {
            int tryTimes = 0;
            try {
                while (!redisTemplate.opsForValue().setIfAbsent("lock", "true", 10, TimeUnit.SECONDS)) {
                    Thread.sleep(10);
                    tryTimes++;
                    if (tryTimes >= times) {
                        break;
                    }
                }
                //业务逻辑
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                redisTemplate.delete("lock");
            }
            return true;
        }
    

    在这种模式下,等待超时的情况下,业务会自动获取锁,完成业务之后释放锁。
    但是也会存在问题,A获取锁,B尝试获取锁超时并获取锁,A完成业务释放锁,此时,会把B的锁同时释放,如果此时有C进来,就会获取锁,而且不等待。
    这种模式在一些并发要求并不高的情况下,还是相对适用的,对并发要求高的场景不建议适用。

    • 获取锁超时拒绝
      尝试获取锁,超过指定次数则自动获取失败
        @Resource
        private RedisTemplate redisTemplate;
    
        private Boolean checkLockWithTimesReject(Integer times) {
            int tryTimes = 0;
            try {
                while (!redisTemplate.opsForValue().setIfAbsent("lock", "true", 10, TimeUnit.SECONDS)) {
                    Thread.sleep(10);
                    tryTimes++;
                    if (tryTimes >= times) {
                        return false;
                    }
                }
                //业务逻辑
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                redisTemplate.delete("lock");
            }
            return true;
        }
    

    这个模式是在上面未获取锁则失败的一个扩展版本,并不是未获取锁直接拒绝,而是有一个尝试的过程,一定程序下相对友好,不过对项目压力也会有一点。

    以上是一些自己实现的简单redis分布式锁,能够达到部分业务需求,不过也同时存在问题:
    1.如果不是用spring-data-redis,而是用原生的jedis,因为setnx和expire是分开运行的,容易setnx之后expire失败,则会导致key不会自动失效,如果解锁的代码异常没走到,则所有后续的都获取不到锁。
    2.如果程序获取锁之后,运行时间超过锁过期时间,则会导致锁被其他线程获取,在一些要求比较高的场景,就会出现问题。
    3.A获取锁,B尝试获取锁超时并获取锁,A完成业务释放锁,此时,会把B的锁同时释放,如果此时有C进来,就会获取锁,而且不等待。

    redisson客户端

    Redisson使用非阻塞的I/O和基于Netty框架的事件驱动的通信层,其方法调用是异步的。Redisson的API是线程安全的,所以可以操作单个Redisson连接来完成各种操作。
    Redisson在2019年1月16日成为GitHub里星星最多的Redis Java客户端。

    Jedis仅支持基本的数据类型如:String、Hash、List、Set、Sorted Set。
    Redisson不仅提供了一系列的分布式Java常用对象,基本可以与Java的基本数据结构通用,还提供了许多分布式服务,其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service)。还实现了可重入锁(Reentrant Lock)、公平锁(Fair Lock、联锁(MultiLock)、 红锁(RedLock)、 读写锁(ReadWriteLock)等,还提供了许多分布式服务。

    对于从jedis迁移到redisson的使用者来说,前期迁移过程会比较痛苦,需要从原生的指令对应到redisson中的方法,具体的对应关系可以查阅以下网页Redis命令和Redisson对象匹配列表

    分布式锁

    常见的分布式锁实现方式有三种:

    • 基于数据库实现分布式锁
    • 基于zookeeper实现分布式锁
    • 基于Redis缓存实现分布式锁

    以上三种方式都可以实现分布式锁,其中,从健壮性考虑, 用 zookeeper 会比用 Redis 实现更好,但从性能角度考虑,基于 Redis 实现性能会更好,而且zookeeper对使用者来说上手较慢,如何选择,还是取决于业务需求。

    分布式锁需满足的四个要求

    • 互斥性。在任意时刻,只有一个客户端能持有锁。
    • 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
    • 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了,即不能误解锁。
    • 具有容错性。只要大多数Redis节点正常运行,客户端就能够获取和释放锁。

    这里会讲解一下redisson分布式锁的实现

         public void tryLock(Long waitTime, Long leaseTime, String lockKey) {
            RLock lock = redissonClient.getLock(lockKey);
            try {
                lock.tryLock(waitTime, leaseTime, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                System.out.println("==================");
            }
        }
    

    tryLock中尝试获取锁是通过tryAcquiretryAcquire 内部通过调用tryLockInnerAsync 实现申请锁的逻辑。申请锁并返回锁有效期还剩余的时间,如果为空说明锁未被其它线程申请则直接获取并返回,如果获取到时间,则进入等待竞争逻辑。
    加锁流程图如下:

    加锁流程图.png
    其中比较重要的lua语言源码如下:
        //如果不存在Sting keys 则设置
        <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
            internalLockLeaseTime = unit.toMillis(leaseTime);
    
            //设置过期key
            //如果Collections.singletonList(getName())的key不存在
            //将Collections.singletonList(getName())为key,getLockName(threadId)为field加1
            //将Collections.singletonList(getName())设置过期时间为internalLockLeaseTime
            //返回null
    
            //如果Collections.singletonList(getName())为key,getLockName(threadId)为field存在
            //将Collections.singletonList(getName())为key,getLockName(threadId)为field加1
            //将Collections.singletonList(getName())设置过期时间为internalLockLeaseTime
            //返回null
    
            //直接获取将Collections.singletonList(getName())过期时间
            return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                    "if (redis.call('exists', KEYS[1]) == 0) then " +
                            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; " +
                            "end; " +
                            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; " +
                            "end; " +
                            "return redis.call('pttl', KEYS[1]);",
                    Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
        }
    

    同样的,解锁的关键性源码如下:

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    
            //判断key为getName(),field为getLockName(threadId)是否存在
            //不存在则返回null
    
            //使key为getName(),field为getLockName(threadId)减1
            //如果大于0
            //则更新可以过期时间为internalLockLeaseTime
            //返回false
    
            //小于等于0
            //移除key
            //发布key为getChannelName(),值为LockPubSub.UNLOCK_MESSAGE的数据
            //返回true
    
            //否则返回null
            return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                    "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                            "return nil;" +
                            "end; " +
                            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                            "if (counter > 0) then " +
                            "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                            "return 0; " +
                            "else " +
                            "redis.call('del', KEYS[1]); " +
                            "redis.call('publish', KEYS[2], ARGV[1]); " +
                            "return 1; " +
                            "end; " +
                            "return nil;",
                    Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
        }
    

    具体流程图如下:


    解锁流程图.png

    阅读redisson实现分布式锁源码过程中,会发现比较重要的一些代码.
    例如:为了防止无用的获取锁,会按照等待时间和过期时间的最小值,通过JDK并发的信号量工具Semaphore来阻塞进行,以减少无用获取

     if (ttl >= 0 && ttl < time) {
                        subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } else {
                        subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                    }
    

    同时,通过redis发布订阅功能来实现当获取锁的线程完成时做到信号量的激活。

    public RFuture<E> subscribe(String entryName, String channelName) {
            //同一时刻只允许一个线程进入
            AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
            RPromise<E> newPromise = new RedissonPromise<>();
            semaphore.acquire(() -> {
                if (!newPromise.setUncancellable()) {
                    semaphore.release();
                    return;
                }
    
                E entry = entries.get(entryName);
                if (entry != null) {
                    entry.acquire();
                    semaphore.release();
                    entry.getPromise().onComplete(new TransferListener<E>(newPromise));
                    return;
                }
    
                E value = createEntry(newPromise);
                value.acquire();
    
                E oldValue = entries.putIfAbsent(entryName, value);
                if (oldValue != null) {
                    oldValue.acquire();
                    semaphore.release();
                    oldValue.getPromise().onComplete(new TransferListener<E>(newPromise));
                    return;
                }
    
                RedisPubSubListener<Object> listener = createListener(channelName, value);
                service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
            });
    
            return newPromise;
        }
    

    另外,redisson还提供了,为了获取锁的线程就算超过锁的过期时间,仍然持有锁,提供了过期时间更新的功能。

     ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
                if (e != null) {
                    return;
                }
    
                // lock acquired
                if (ttlRemaining == null) {
                    scheduleExpirationRenewal(threadId);
                }
            });
    
     private void scheduleExpirationRenewal(long threadId) {
            ExpirationEntry entry = new ExpirationEntry();
            ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
            if (oldEntry != null) {
                oldEntry.addThreadId(threadId);
            } else {
                entry.addThreadId(threadId);
                renewExpiration();
            }
        }
    
        protected RFuture<Boolean> renewExpirationAsync(long threadId) {
            return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return 1; " +
                            "end; " +
                            "return 0;",
                    Collections.singletonList(getName()),
                    internalLockLeaseTime, getLockName(threadId));
        }
    

    总结

    通过 Redisson 实现分布式可重入锁,比纯自己通过set key value px milliseconds nx +lua 实现的效果更好些,虽然基本原理都一样,因为通过分析源码可知,RedissonLock是可重入的,并且考虑了失败重试,可以设置锁的最大等待时间, 在实现上也做了一些优化,减少了无效的锁申请,提升了资源的利用率。(通过future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);实现)
    需要特别注意的是,RedissonLock同样没有解决 节点挂掉的时候,存在丢失锁的风险的问题。而现实情况是有一些场景无法容忍的,所以 Redisson 提供了实现了redlock算法的RedissonRedLockRedissonRedLock真正解决了单点失败的问题,代价是需要额外的为 RedissonRedLock 搭建Redis环境。

    通常,出现主从切换集群脑裂时,可能存在有两个线程同时获取锁的情况,具体原因可以自行百度查看。

    所以,如果业务场景可以容忍这种小概率的错误,则推荐使用 RedissonLock, 如果无法容忍,则推荐使用 RedissonRedLock
    RedissonRedLock具体实现在org.redisson.RedissonMultiLock#tryLock中,简单的讲就是通过多个RLock锁,同步都获取到锁来才算真正获取锁,这样能够在一定情况下避免因为节点挂掉或者主从切换时,导致的锁问题。

    相关文章

      网友评论

          本文标题:redis 分布式锁

          本文链接:https://www.haomeiwen.com/subject/uiqvnktx.html