美文网首页JDK源码系列中间件Java
Redis分布式锁的实现原理 - Redisson和RedisL

Redis分布式锁的实现原理 - Redisson和RedisL

作者: Mhhhhhhy | 来源:发表于2021-02-25 15:00 被阅读0次

    主要接触到的Redis分布式锁有两种框架RedisLockRegistryRedisson,今天来看下两种框架的实现原理;

    RedisLockRegistry

    Spring-inintegration-redis中提供的一种实现方式,使用方式如下

    @Bean("redisLockRegistry")
    public RedisLockRegistry redisLockRegistry(RedisConnectionFactory redisConnectionFactory) {
        return new RedisLockRegistry(redisConnectionFactory, "spring-redis-lock");
    }
    

    在配置文件中直接注册成一个Bean,下面是一些构造时的源码;

    public RedisLockRegistry(RedisConnectionFactory connectionFactory, String registryKey) {
        // DEFAULT_EXPIRE_AFTER默认是6秒
        this(connectionFactory, registryKey, DEFAULT_EXPIRE_AFTER);
    }
    
    public RedisLockRegistry(RedisConnectionFactory connectionFactory, String registryKey, long expireAfter) {
        Assert.notNull(connectionFactory, "'connectionFactory' cannot be null");
        Assert.notNull(registryKey, "'registryKey' cannot be null");
        this.redisTemplate = new StringRedisTemplate(connectionFactory);
        this.obtainLockScript = new DefaultRedisScript<>(OBTAIN_LOCK_SCRIPT, Boolean.class);
        this.registryKey = registryKey;
        this.expireAfter = expireAfter;
    }
    

    构造时生成了一个RedisTemplate用于执行Redis操作,一个ObtainLockScript用来保存获取锁的脚本;

    private final class RedisLock implements Lock {
            
        private final String lockKey;
    
        private final ReentrantLock localLock = new ReentrantLock();
    
        private volatile boolean unlinkAvailable = true;
    
        private volatile long lockedAt;
        // 构造的时候只储存一个key
        private RedisLock(String path) {
            this.lockKey = constructLockKey(path);
        }
            
        private String constructLockKey(String path) {
            return RedisLockRegistry.this.registryKey + ":" + path;
        }
        ...
    }
    

    内部封装了一个Redis锁的实现,主要包括一个锁的标识lockKey和一个重入锁ReentrantLock

    Lock lock = redisLockRegistry.obtain(key);
    try {
        if (lock.tryLock(5, TimeUnit.SECONDS)) {
            try {
                ...
            } finally {
                lock.unlock();
            }
        } else {
            throw new GetLockTimeoutException();
        }
    } catch (InterruptedException e) {
        throw new GetLockTimeoutException();
    }
    

    上面是代码中的基本用法,可以选择封装成一个切面使用起来方便一些,首先来看下获取锁的操作obtain;

    private final Map<String, RedisLock> locks = new ConcurrentHashMap<>();
    public Lock obtain(Object lockKey) {
        Assert.isInstanceOf(String.class, lockKey);
        String path = (String) lockKey;
        return this.locks.computeIfAbsent(path, RedisLock::new);
    }
    

    从一个ConcurrentHashMap中根据锁的标识获取一个锁,如果没有则构造一个新的;

    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        long now = System.currentTimeMillis();
        // 先是本地的重入锁先尝试加锁
        if (!this.localLock.tryLock(time, unit)) {
            // 如果本地线程加锁失败,直接返回
            return false;
        }
        try {
            long expire = now + TimeUnit.MILLISECONDS.convert(time, unit);
            boolean acquired;
            // 本地加锁成功后执行redis加锁
            while (!(acquired = obtainLock()) && System.currentTimeMillis() < expire) { 
                // 加锁失败且没过超时时间,睡一会再试
                Thread.sleep(100); 
            }
            if (!acquired) {
                // 如果没有获取到锁,把本地锁释放直接返回
                this.localLock.unlock();
            }
            return acquired;
        }
        catch (Exception e) {
            this.localLock.unlock();
            rethrowAsLockException(e);
        }
        return false;
    }
    

    看这段逻辑需要一点ReentrantLock的知识,可以先了解下Java8 源码阅读 - AbstractQueuedSynchronizer

    首先是本地的重入锁先进行加锁,加锁步骤如下:

    • 如果锁资源没有被其他线程占用,直接加锁,亦或者是本地线程已经占用锁,再次重入也被视为加锁成功;
    • 如果锁资源被其他线程占用,排队等待指定时间,指定时间内获取不到资源则加锁失败;

    成功在本地加锁后会执行Redis加锁操作

    private boolean obtainLock() {
        Boolean success =
                RedisLockRegistry.this.redisTemplate.execute(RedisLockRegistry.this.obtainLockScript,
                        Collections.singletonList(this.lockKey), RedisLockRegistry.this.clientId,
                        String.valueOf(RedisLockRegistry.this.expireAfter));
    
        boolean result = Boolean.TRUE.equals(success);
    
        if (result) {
            this.lockedAt = System.currentTimeMillis();
        }
        return result;
    }
    
    private static final String OBTAIN_LOCK_SCRIPT =
            "local lockClientId = redis.call('GET', KEYS[1])\n" +
                    "if lockClientId == ARGV[1] then\n" +
                    "  redis.call('PEXPIRE', KEYS[1], ARGV[2])\n" +
                    "  return true\n" +
                    "elseif not lockClientId then\n" +
                    "  redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2])\n" +
                    "  return true\n" +
                    "end\n" +
                    "return false";
    

    利用Redis执行Lua脚本是原子性的特性来实现的,首先redis.call('GET', KEYS[1])来判断是否已经获取到锁,如果发现锁已经被占用,则判断是不是来自同一个客户端的请求,如果是就更新解锁时间(默认6s),如果不是来自同一个客户端的(即多个节点对同一个key操作),只不会允许非第一个客户端的操作;上锁的操作是redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2]),key是外面指定的key,而value则是客户端的uuid;

    解锁的逻辑也是比较简单,直接调用del或者unlink删除key即可;

    private void removeLockKey() {
        if (this.unlinkAvailable) {
            try {
                RedisLockRegistry.this.redisTemplate.unlink(this.lockKey);
            }
            catch (Exception ex) {
                LOGGER.warn("The UNLINK command has failed (not supported on the Redis server?); " +
                        "falling back to the regular DELETE command", ex);
                this.unlinkAvailable = false;
                RedisLockRegistry.this.redisTemplate.delete(this.lockKey);
            }
        }
        else {
            RedisLockRegistry.this.redisTemplate.delete(this.lockKey);
        }
    }
    
    

    在单节点的Redis架构下,RedisLockRegistry是可以满足一些不复杂的场景的,但是需要考虑锁的续租、Redis集群架构部署等问题的话,这个框架可能就会稍显拉垮,所以大多数情况都是使用Redisson

    Redisson

    使用方式大致如下

    Config config = new Config();
    config.useSentinelServers()
            .addSentinelAddress(
                    "redis://127.0.0.1:6379",
                    "redis://127.0.0.2:6379",
                    "redis://127.0.0.3:6379");
    RedissonClient redissonClient = Redisson.create(config);
    RLock lock = redissonClient.getLock("key");
    // 加锁的两种方式
    lock.lock();
    lock.lock(5, TimeUnit.SECONDS);
    lock.unlock();
    

    下面简单分析下Redisson的加锁逻辑

    private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
        ...
    }
    
    private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
    }
    
    private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        // 在获取锁后,如果还没有通过调用unlock释放锁,
        // 则保持锁的最大时间。如果leaseTime为-1,则保持锁定直到显式解锁。
        if (leaseTime != -1) {
            // 指定了锁的释放时间
            return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        // 没有指定锁的释放时间
        // 尝试加锁
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                                                                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e != null) {
                // 发现异常
                return;
            }
    
            // 加锁成功
            if (ttlRemaining == null) {
                // 定时续租
                scheduleExpirationRenewal(threadId);
            }
        });
        return ttlRemainingFuture;
    }
    
    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);
        // 里面处理了包括选取节点、重试等操作
        return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }
    

    先看下加锁的逻辑,这里就忽略了集群节点选取等其他代码,直接看这段Lua脚本的命令,执行逻辑如下

    • 检查key是否存在
      • 如果key不存在,直接将field即客户端id+key+线程id组成的uuid对应的value置为1,并更新过期时间,最后返回nil;
      • 如果key存在,则判断field(即上面的uuid)是否存在,如果不存在,则说明该锁的已被其他占用,直接返回key剩余的ttl;如果field存在,则说明自己是锁的持有者,将filed对应的value自增1(重入);

    如果没有指定锁的释放时间,将会触发锁续租的操作;

    protected void scheduleExpirationRenewal(long threadId) {
        ExpirationEntry entry = new ExpirationEntry();
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
        if (oldEntry != null) {
            // 记录续租的次数
            oldEntry.addThreadId(threadId);
        } else {
            entry.addThreadId(threadId);
            // 执行续租操作
            renewExpiration();
        }
    }
    // 续租
    private void renewExpiration() {
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            // 已经被停止续租
            return;
        }
        
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    // 停止续租就是将该key从renewal集合中移除
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                    if (e != null) {
                        // 续租操作有异常,停止续租
                        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                        return;
                    }
                    
                    if (res) {
                        // 重新续租
                        renewExpiration();
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        
        ee.setTimeout(task);
    }
    
    protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return 0;",
                Collections.singletonList(getName()),
                internalLockLeaseTime, getLockName(threadId));
    }
    

    来看下续租的操作,每个获取锁的线程都会执行续租任务,默认是30 / 3 = 10秒钟重新执行一次续租操作直到下面三个条件其中一个触发

    • redis执行续租命令异常
    • 手动解锁操作
    • 续租操作失败(通常是redis中key不存在了)

    来看下续租的Lua命令

    if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then 
        redis.call('pexpire', KEYS[1], ARGV[1]);
        return 1; 
    end;
        return 0;
    

    如果对应key和field都存在,直接通过pexpire命令更新过期时间;

    下面是看看获取锁失败的操作

    private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
            long threadId = Thread.currentThread().getId();
            Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
            if (ttl == null) {
                // 获取锁成功
                return;
            }
            // 获取锁失败
            // 利用redis的pub/sub机制来订阅到锁被释放的消息
            RFuture<RedissonLockEntry> future = subscribe(threadId);
            // 根据是否支持中断来使用netty不同的同步操作
            if (interruptibly) {
                commandExecutor.syncSubscriptionInterrupted(future);
            } else {
                commandExecutor.syncSubscription(future);
            }
    
            try {
                while (true) {
                    // 重复尝试加锁
                    ttl = tryAcquire(-1, leaseTime, unit, threadId);
                    if (ttl == null) {
                        // 直到获取锁成功
                        break;
                    }
    
                    if (ttl >= 0) {
                        try {
                            // 利用Semaphore阻塞直到ttl过去
                            future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                        } catch (InterruptedException e) {
                            if (interruptibly) {
                                throw e;
                            }
                            future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                        }
                    } else {
                        if (interruptibly) {
                            future.getNow().getLatch().acquire();
                        } else {
                            future.getNow().getLatch().acquireUninterruptibly();
                        }
                    }
                }
            } finally {
                unsubscribe(future, threadId);
            }
        }
    

    如果tryAcquire返回的值不为null,即表示锁资源被其他线程占用,本地会启动一个死循环重复执行加锁操作,每一次上锁失败拿到ttl后,会利用Semaphore阻塞当前线程直到ttl过去,当然也可能被之前订阅的解锁消息主动唤醒;

    protected void onMessage(RedissonLockEntry value, Long message) {
        if (message.equals(UNLOCK_MESSAGE)) {
            Runnable runnableToExecute = value.getListeners().poll();
            if (runnableToExecute != null) {
                runnableToExecute.run();
            }
            // 当收到解锁的通知时会主动把Semaphore阻塞的线程释放
            value.getLatch().release();
        }
    }
    

    这是当Redis收到订阅的解锁事件执行的方法,这里会主动把RedissonLockEntry持有的所有被阻塞的线程都释放;

    // 解锁的实际逻辑
    public RFuture<Void> unlockAsync(long threadId) {
        RPromise<Void> result = new RedissonPromise<>();
        RFuture<Boolean> future = unlockInnerAsync(threadId);
    
        future.onComplete((opStatus, e) -> {
            // 取消锁续租操作
            cancelExpirationRenewal(threadId);
            ...
        });
    
        return result;
    }
    
    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                        "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return nil;",
                Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
    }
    

    解锁的逻辑如下

    • 判断这个锁是否被当前线程持有
      • 如果当前线程不是锁的持有者,直接返回nil;
      • 如果当前线程是锁的持有者,将redis保存的数值减1,如果减1后的结果是大于0的,说明该锁是被重复加锁的,这时候需要对锁进行续租,时间为internalLockLeaseTime,并返回为0;
        • 如果减1后的结果为0,将key删除后并发布解锁的消息;
    RedisLockRegistry和Redisson对比

    不同点:

    • RedisLockRegistry有一个本地加锁的逻辑,只有当本地加锁成功才能继续执行redis加锁逻辑,重入逻辑也是做在本地,所以理论上RedisLockRegistryRedisson会快那么一点点;
    • Redisson有锁续租功能,解决了加锁成功后逻辑执行未完成时锁到期被释放,导致其他资源获取锁的混乱;
    RedLock

    因为Redis集群主从同步时会有延迟,有可能因为master节点挂掉,master节点的锁还未同步到slave时,slave被选举成master而可能其他线程能在新master上重复获得锁,而导致锁资源加锁混乱的问题;

    所以就有了一个RedLock来解决这种问题,大致的思想是在集群上不同节点都尝试加锁,步骤大致如下

    1. 获取当前的时间戳(毫秒)。
    2. 依次尝试从 N 个实例,使用相同的 key 和随机值获取锁。在步骤 2,当向 Redis 设置锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为 10 秒,则超时时间应该在 5-50 毫秒之间。这样可以避免服务器端 Redis 已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试另外一个 Redis 实例。
    3. 客户端使用当前时间减去开始获取锁时间(步骤 1 记录的时间)就得到获取锁使用的时间。当且仅当从大多数(这里是 3 个节点)的 Redis 节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。
      如果取到了锁,key 的真正有效时间等于有效时间减去获取锁所使用的时间(步骤 3 计算的结果)。
    4. 如果因为某些原因,获取锁失败(没有在至少 N/2+1 个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的 Redis 实例上进行解锁(即便某些 Redis 实例根本就没有加锁成功)。

    通过在集群大多数实例上获取来保证锁的可用性,比较一个集群内节点都宕机的可能性还是很低的,Redisson内置了该算法的实现,实现类是RedissonRedLock

    RedissonRedLock
    RLock lock = redissonClient.getLock("key");
    RLock lock2 = redissonClient.getLock("key");
    RedissonRedLock redLock = new RedissonRedLock(lock, lock2);
    redLock.lock();
    redLock.unlock();
    

    使用方式如上,下面看下源码具体的实现逻辑

    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        // 每个锁至少等待1500ms,总时间就是1500*数量
        long baseWaitTime = locks.size() * 1500;
        long waitTime = -1;
        if (leaseTime == -1) {
            waitTime = baseWaitTime;
        } else {
            // 如果指定了锁的释放时间
            leaseTime = unit.toMillis(leaseTime);
            waitTime = leaseTime;
            if (waitTime <= 2000) {
                waitTime = 2000;
            } else if (waitTime <= baseWaitTime) {
                // 如果锁的释放时间小于框架内置的等待时间
                // 重制等待时间
                waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime);
            } else {
                // 如果锁的释放时间大于框架内置的等待时间
                // 重制等待时间
                waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);
            }
        }
        
        while (true) {
            // 重复上锁直到成功或者异常
            if (tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)) {
                return;
            }
        }
    }
    
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        // 重制锁的释放时间 至少要求是集群获取锁的等待时间以上
        long newLeaseTime = ...;
        // 获取当前集群的时间戳
        long time = System.currentTimeMillis();
        long remainTime = -1;
        if (waitTime != -1) {
            remainTime = unit.toMillis(waitTime);
        }
        // 每个锁保留的时间是waitTime
        long lockWaitTime = calcLockWaitTime(remainTime);
        // 加锁失败的限制数
        int failedLocksLimit = failedLocksLimit();
        List<RLock> acquiredLocks = new ArrayList<>(locks.size());
        // 遍历锁
        for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
            RLock lock = iterator.next();
            boolean lockAcquired;
            try {
                // 进行加锁
                if (waitTime == -1 && leaseTime == -1) {
                    lockAcquired = lock.tryLock();
                } else {
                    long awaitTime = Math.min(lockWaitTime, remainTime);
                    lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
                }
            } catch (RedisResponseTimeoutException e) {
                // redis访问超时 解锁当前的节点
                unlockInner(Arrays.asList(lock));
                lockAcquired = false;
            } catch (Exception e) {
                lockAcquired = false;
            }
            
            if (lockAcquired) {
                acquiredLocks.add(lock);
            } else {
                // 加锁失败
                if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
                    // 如果获取到锁的节点数大于预置的节点数,则判断这次加锁已经成功
                    break;
                }
                
                if (failedLocksLimit == 0) {
                    // 加锁失败次数已经到达了限制
                    // 将已加锁成功的全部解锁
                    unlockInner(acquiredLocks);
                    if (waitTime == -1) {
                        return false;
                    }
                    failedLocksLimit = failedLocksLimit();
                    acquiredLocks.clear();
                    // 重制迭代器
                    while (iterator.hasPrevious()) {
                        iterator.previous();
                    }
                } else {
                    // 记录失败的次数
                    failedLocksLimit--;
                }
            }
            
            if (remainTime != -1) {
                // 计算还剩余的时间
                remainTime -= System.currentTimeMillis() - time;
                time = System.currentTimeMillis();
                // 如果已经没有剩余时间了,将所有获取到锁的节点进行解锁
                if (remainTime <= 0) {
                    unlockInner(acquiredLocks);
                    return false;
                }
            }
        }
    
        if (leaseTime != -1) {
            // 更新每个节点锁的超时时间
            List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
            for (RLock rLock : acquiredLocks) {
                RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
                futures.add(future);
            }
            
            for (RFuture<Boolean> rFuture : futures) {
                rFuture.syncUninterruptibly();
            }
        }
        return true;
    }
    

    加锁的逻辑基本上按照RedLock算法来实现的,流程如下:

    1. 每个节点加锁至少等待1500ms,等待总时间就是1500*节点数量;
    2. 按照RedLock算法,计算每个节点获取锁的等待时间;
    3. 从第一个节点开始进行加锁操作,加锁的执行逻辑和上面单节点的一样,如果加锁成功继续下一个节点,如果加锁失败,首先判断加锁成功的节点数是否已经满足最低加锁个数限制,比如5个节点中必须大于等于3个节点;
    • 如果满足则停止加锁,本次加锁操作执行成功;
    • 如果不满足,判断加锁失败次数是否达到上限,如果没有达到上限,记录失败次数并对下一个节点加锁,如果已经达到加锁失败次数的上限,判断本次集群加锁失败,解锁所有成功获取到锁的节点;
    1. 每个节点加锁无论成功或失败,执行完成后都需要判断剩余时间,如果剩余时间已经小于0且未完成所有节点的加锁,判断为加锁失败并进行解锁;
    2. 如果加锁成功,更新所有加锁成功节点的过期时间;
    后续

    在Redisson3.15.0版本中RedissonRedLock已经不被推荐使用了,理由是现在RLock对象加锁时会自动传播到所有的Slaves,具体文档可以在这里查询;https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers#84-redlock

    虽然不再推荐用了,但是长点见识也是可以的;

    相关文章

      网友评论

        本文标题:Redis分布式锁的实现原理 - Redisson和RedisL

        本文链接:https://www.haomeiwen.com/subject/pgagxltx.html