美文网首页
RedissonRedLock的分布式锁实现(源码)。

RedissonRedLock的分布式锁实现(源码)。

作者: hubo_li | 来源:发表于2020-09-29 16:02 被阅读0次

上一期聊到了单体redis使用分布式锁的问题,这次聊集群下如何使用redLock分布式锁。

如果对于redissonLock锁不熟悉的小伙伴,可以通过这篇文章扫盲 (https://www.jianshu.com/p/245fdb188e87)

事实上使用RedissonLock琐最大的缺点就是它加锁时只作用在一个Redis节点上,即使Redis通过sentinel保证高可用,如果这个master节点由于某些原因发生了主从切换,那么就会出现锁丢失的情况:

在Redis的master节点上拿到了锁;但是这个加锁的key还没有同步到slave节点;master故障,发生故障转移,slave节点升级为master节点;导致锁丢失。

正因为如此,Redis作者antirez基于分布式环境下提出了一种更高级的分布式锁的实现方式:Redlock。笔者认为,Redlock也是Redis所有分布式锁实现方式中唯一能让面试官高潮的方式。

antirez提出的redlock算法大概是这样的:
在Redis的分布式环境中,我们假设有N个Redis master。这些节点完全互相独立,不存在主从复制或者其他集群协调机制。我们确保将在N个实例上使用与在Redis单实例下相同方法获取和释放锁。现在我们假设有5个Redis master节点,同时我们需要在5台服务器上面运行这些Redis实例,这样保证他们不会同时都宕掉。

为了取到锁,客户端应该执行以下操作:

获取当前Unix时间,以毫秒为单位。
依次尝试从5个实例,使用相同的key和具有唯一性的value(例如UUID)获取锁。当向Redis请求获取锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为10秒,则超时时间应该在5-50毫秒之间。这样可以避免服务器端Redis已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试去另外一个Redis实例请求获取锁。
客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁使用的时间。当且仅当从大多数(N/2+1,这里是3个节点)的Redis节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。
如果取到了锁,key的真正有效时间等于有效时间减去获取锁所使用的时间(步骤3计算的结果)。
如果因为某些原因,获取锁失败(没有在至少N/2+1个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功,防止某些节点获取到锁但是客户端没有得到响应而导致接下来的一段时间不能被重新获取锁)(摘自https://www.jianshu.com/p/7e47a4503b87)。


Config config1 = new Config();
config1.useSingleServer().setAddress("redis://100.100.0.1:5378")
        .setPassword("root").setDatabase(0);
RedissonClient redissonClient1 = Redisson.create(config1);

Config config2 = new Config();
config2.useSingleServer().setAddress("redis://100.100.0.1:5379")
        .setPassword("root").setDatabase(0);
RedissonClient redissonClient2 = Redisson.create(config2);

Config config3 = new Config();
config3.useSingleServer().setAddress("redis://100.100.0.1:5380")
        .setPassword("root").setDatabase(0);
RedissonClient redissonClient3 = Redisson.create(config3);

String resourceName = "REDLOCK";

RLock lock1 = redissonClient1.getLock(resourceName);
RLock lock2 = redissonClient2.getLock(resourceName);
RLock lock3 = redissonClient3.getLock(resourceName);
RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
boolean isLock;
try {
    // isLock = redLock.tryLock();
    // 500ms拿不到锁, 就认为获取锁失败。10000ms即10s是锁失效时间。
    isLock = redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS);
    System.out.println("isLock = "+isLock);
    if (isLock) {
        //TODO 这里写业务逻辑
    }
} catch (Exception e) {
} finally {
    // 无论如何, 最后都要解锁
    redLock.unlock();
}

首先我们看下RedissonRedLock的实现源码:


public class RedissonRedLock extends RedissonMultiLock {
    public RedissonRedLock(RLock... locks) {
        super(locks);
    }
//      锁可以失败的次数,锁的数量-锁成功客户端最小的数量。
    protected int failedLocksLimit() {
        return this.locks.size() - this.minLocksAmount(this.locks);
    }
     //最少的锁成功数量为锁的数量 / 2 + 1,例如有3个客户端加锁,那么最少需要2个客户端加锁成功
    protected int minLocksAmount(List<RLock> locks) {
        return locks.size() / 2 + 1;
    }

    public void unlock() {
        this.unlockInner(this.locks);
    }

    protected boolean isLockFailed(Future<Boolean> future) {
        return false;
    }

    protected boolean isAllLocksAcquired(AtomicReference<RLock> lockedLockHolder, AtomicReference<Throwable> failed, Queue<RLock> lockedLocks) {
        return lockedLockHolder.get() == null && failed.get() == null || lockedLocks.size() >= this.minLocksAmount(this.locks);
    }
}

看redissson的源码就知道,主要实现是继承于RedissonMultiLock,通过这个类来实现的,接下来我们看下trylock方法。

    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long newLeaseTime = -1;
        if (leaseTime != -1) {
            // 如果等待时间设置了,那么将等待时间 * 2
            newLeaseTime = unit.toMillis(waitTime)*2;
        }
        
        // time为当前时间戳
        long time = System.currentTimeMillis();
        long remainTime = -1;
        if (waitTime != -1) {
            remainTime = unit.toMillis(waitTime);
        }
        // 计算锁的等待时间,RedLock中:如果remainTime=-1,那么lockWaitTime为1
        long lockWaitTime = calcLockWaitTime(remainTime);
        
        // RedLock中failedLocksLimit即为n/2 + 1
        int failedLocksLimit = failedLocksLimit();
        List<RLock> acquiredLocks = new ArrayList<RLock>(locks.size());
        // 循环每个redis客户端,去获取锁
        for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
            RLock lock = iterator.next();
            boolean lockAcquired;
            try {
                // 调用tryLock方法去获取锁,如果获取锁成功,则lockAcquired=true
                if (waitTime == -1 && leaseTime == -1) {
                    lockAcquired = lock.tryLock();
                } else {
//这里也会有锁续约的问题。
                    long awaitTime = Math.min(lockWaitTime, remainTime);
                    lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
                }
            } catch (Exception e) {
                lockAcquired = false;
            }
            
            // 如果获取锁成功,将锁加入到list集合中
            if (lockAcquired) {
                acquiredLocks.add(lock);
            } else {
                // 如果获取锁失败,判断失败次数是否等于失败的限制次数
                // 比如,3个redis客户端,最多只能失败1次
                // 这里locks.size = 3, 3-x=1,说明只要成功了2次就可以直接break掉循环
                if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
                    break;
                }

                // 如果最大失败次数等于0
                if (failedLocksLimit == 0) {
                    // 释放所有的锁,RedLock加锁失败
                    unlockInner(acquiredLocks);
                    if (waitTime == -1 && leaseTime == -1) {
                        return false;
                    }
                    failedLocksLimit = failedLocksLimit();
                    acquiredLocks.clear();
                    // 重置迭代器 重试再次获取锁
                    while (iterator.hasPrevious()) {
                        iterator.previous();
                    }
                } else {
                    // 失败的限制次数减一
                    // 比如3个redis实例,最大的限制次数是1,如果遍历第一个redis实例,失败了,那么failedLocksLimit会减成0
                    // 如果failedLocksLimit就会走上面的if逻辑,释放所有的锁,然后返回false
                    failedLocksLimit--;
                }
            }
            
            if (remainTime != -1) {
                remainTime -= (System.currentTimeMillis() - time);
                time = System.currentTimeMillis();
                if (remainTime <= 0) {
                    unlockInner(acquiredLocks);
                    return false;
                }
            }
        }

        if (leaseTime != -1) {
            List<RFuture<Boolean>> futures = new ArrayList<RFuture<Boolean>>(acquiredLocks.size());
            for (RLock rLock : acquiredLocks) {
                RFuture<Boolean> future = rLock.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
                futures.add(future);
            }
            
            for (RFuture<Boolean> rFuture : futures) {
                rFuture.syncUninterruptibly();
            }
        }
        
        return true;
    }

然后看下解锁的过程。


  public void unlock() {
        List<RFuture<Void>> futures = new ArrayList(this.locks.size());
        Iterator var2 = this.locks.iterator();
        // 循环每个redis客户端,去解锁
        while(var2.hasNext()) {
            RLock lock = (RLock)var2.next();
            futures.add(lock.unlockAsync());
        }

        var2 = futures.iterator();
        while(var2.hasNext()) {
            RFuture<Void> future = (RFuture)var2.next();
            future.syncUninterruptibly();
        }

    }

相对来说,redlock的代码分析起来较为简单。当然使用redlock还是有不少特殊情况需要考虑的,以后单独聊下redlock的问题。

锁续的问题:
如果Client进行的工作耗时较短,那么可以默认使用一个较小的锁有效期,然后实现一个锁续约机制。当一个Client在工作计算到一半时发现锁的剩余有效期不足。可以向Redis实例发送续约锁的Lua脚本。如果Client在一定的期限内(耗间与申请锁的耗时接近)成功的续约了半数以上的实例,那么续约锁成功。

参考文章:

https://www.jianshu.com/p/7e47a4503b87
https://www.codercto.com/a/48775.html
https://my.oschina.net/u/1260221/blog/1929901

相关文章

网友评论

      本文标题:RedissonRedLock的分布式锁实现(源码)。

      本文链接:https://www.haomeiwen.com/subject/ocseuktx.html