美文网首页Redisson源码剖析
7.Redisson源码剖析-MultiLock加锁与释放锁

7.Redisson源码剖析-MultiLock加锁与释放锁

作者: T_log | 来源:发表于2020-10-28 01:05 被阅读0次

    一、说明

    1 .Redisson的官网文档地址:https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers#83-multilock 其实也有中文,如果刚开始英文有点困难,可以尝试中英文对比着看,这对于阅读英文文档也许有一定的帮助

    1. redisson分布式锁这块是支持MultiLock这个机制的,可以将多个锁合并为一个大锁,对一个大锁进行统一的申请加锁以及释放锁,一次性锁定多个资源,再去处理一些事情,然后一次性释放所有的资源对应的锁
    2. 在项目里使用的时候,很多时候一次性要锁定多个资源,比如说锁掉一个库存,锁掉一个订单,锁掉一个积分,一次性锁掉多个资源,多个资源都不让别人随意修改,然后你再一次性更新多个资源,释放多个锁

    二、源码

    代码片段一、

    public static void main(String[] args) throws Exception {
            Config config = new Config();
            // 1. 这里的Redis集群是我本地搭建的一套集群,因为是研究源码,所以配置直接硬编码到代码里
            config.useClusterServers()
                    .addNodeAddress("redis://192.168.0.107:7001")
                    .addNodeAddress("redis://192.168.0.107:7002")
                    .addNodeAddress("redis://192.168.0.110:7003")
                    .addNodeAddress("redis://192.168.0.110:7004")
                    .addNodeAddress("redis://192.168.0.111:7005")
                    .addNodeAddress("redis://192.168.0.111:7006");
    
    
            RedissonClient redisson = Redisson.create(config);
       
            RLock lock1 = redisson.getLock("lock1");
            RLock lock2 = redisson.getLock("lock2");
            RLock lock3 = redisson.getLock("lock3");
    
            RedissonMultiLock lock = new RedissonMultiLock(lock1,lock2,lock3);
            // 代码片段二、
            lock.lock();
            // 代码片段六、
            lock.unlock();
    
        }
    

    代码片段二、

    RedissonMultiLock类中

    public void lock(long leaseTime, TimeUnit unit) {
        try {
            // 1. 代码片段三、
            lockInterruptibly(leaseTime, unit);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    

    代码片段三、

    @Override
    public void lockInterruptibly() throws InterruptedException {
        // 这里的-1后面会用到,具体-1代表是什么意思,后面的代码分析,参考代码片段四、
        lockInterruptibly(-1, null);
    }
    
    

    代码片段四、

    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        // 1. 通过代码片段三可以知道,leaseTime为-1 unit=null
        // baseWaitTime = 锁的个数(3个) * 1500 = 4500毫秒
        long baseWaitTime = locks.size() * 1500;
        long waitTime = -1;
        // leaseTime肯定是-1,所以这里成立,不走else逻辑了,这里的代码写的就感觉很有意思,上面等于-1,下面等于-1还if判断
        if (leaseTime == -1) {
            // waitTime= 4500毫秒
            waitTime = baseWaitTime;
            unit = TimeUnit.MILLISECONDS;
        } else {
            waitTime = unit.toMillis(leaseTime);
            if (waitTime <= 2000) {
                waitTime = 2000;
            } else if (waitTime <= baseWaitTime) {
                waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime);
            } else {
                waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);
            }
            waitTime = unit.convert(waitTime, TimeUnit.MILLISECONDS);
        }
        
        
        // 这里有个死循环逻辑,其实就是不停的去获取锁
        while (true) {
            // 1. 代码片段五、waitTime = 4500毫秒,leaseTime = -1
            if (tryLock(waitTime, leaseTime, unit)) {
                return;
            }
        }
    }
    
    

    代码片段五、

    // waitTime = 4500毫秒,leaseTime = -1
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    //        try {
    //            return tryLockAsync(waitTime, leaseTime, unit).get();
    //        } catch (ExecutionException e) {
    //            throw new IllegalStateException(e);
    //        }
            // 1. newLeaseTime = -1,其实这里的参数值,都会影响对程序的逻辑以及加锁释放锁
            // 1.现在是真的想不通这个逻辑,先等于-1,然后在if判断,和下面的remainTime一样
            long newLeaseTime = -1;
            if (leaseTime != -1) {
                newLeaseTime = unit.toMillis(waitTime)*2;
            }
            // 当前时间
            long time = System.currentTimeMillis();
            long remainTime = -1;
            if (waitTime != -1) {
                remainTime = unit.toMillis(waitTime);
            }
            // 这里其实就是返回remainTime,calcLockWaitTime是给他什么参数,返回什么参数,也挺有意思的。
            long lockWaitTime = calcLockWaitTime(remainTime);
            
            // 这里会返回一个固定的值0
            int failedLocksLimit = failedLocksLimit();
            List<RLock> acquiredLocks = new ArrayList<RLock>(locks.size());
            // 1. 拿到锁的迭代器
            for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
                RLock lock = iterator.next();
                boolean lockAcquired;
                try {
                    // waitTime = 4500毫秒,leaseTime = -1 参数传递进来的,所以会走else逻辑
                    if (waitTime == -1 && leaseTime == -1) {
                        lockAcquired = lock.tryLock();
                    } else {
                        // 这里去lockWaitTime和remainTime中的最小值(lockWaitTime = 0,就是上面那个固定值,remainTime=-1),
                        // 所以awaitTime=-1,这个-1其实很关键,在tryLock中,-1代表了如果获取锁成功了,就会启动一个lock watchDog,不停的刷新锁的生存时间
                        long awaitTime = Math.min(lockWaitTime, remainTime);
                        // 这里就是获取锁,等待awaitTime=4500毫秒,获取锁成功,启动一个watchDog
                        lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
                    }
                } catch (Exception e) {
                    lockAcquired = false;
                }
                
                if (lockAcquired) {
                    acquiredLocks.add(lock);
                } else {
                    if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
                        break;
                    }
    
    
                    if (failedLocksLimit == 0) {
                        unlockInner(acquiredLocks);
                        if (waitTime == -1 && leaseTime == -1) {
                            return false;
                        }
                        failedLocksLimit = failedLocksLimit();
                        acquiredLocks.clear();
                        // reset iterator
                        while (iterator.hasPrevious()) {
                            iterator.previous();
                        }
                    } else {
                        failedLocksLimit--;
                    }
                }
                
                if (remainTime != -1) {
                    // 如果获取锁成功,当前时间减去获取锁耗费的时间time
                    remainTime -= (System.currentTimeMillis() - time);
                    time = System.currentTimeMillis();
                    if (remainTime <= 0) {
                        // 如果remainTime <0 说明获取锁超时,那么就释放掉这个锁
                        unlockInner(acquiredLocks);
                        // 返回false,说明加锁失败
                        return false;
                    }
                }
            }
    
    
            if (leaseTime != -1) {
                List<RFuture<Boolean>> futures = new ArrayList<RFuture<Boolean>>(acquiredLocks.size());
                for (RLock rLock : acquiredLocks) {
                    RFuture<Boolean> future = rLock.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
                    futures.add(future);
                }
                
                for (RFuture<Boolean> rFuture : futures) {
                    rFuture.syncUninterruptibly();
                }
            }
            
            return true;
        }
    

    代码片段六、

    // 释放锁的话,就是依次调用所有的锁的释放的逻辑,lua脚本,同步等待所有的锁释放完毕,才会返回
    @Override
    public void unlock() {
        List<RFuture<Void>> futures = new ArrayList<RFuture<Void>>(locks.size());
    
    
        for (RLock lock : locks) {
            // 代码片段七、
            futures.add(lock.unlockAsync());
        }
    
    
        for (RFuture<Void> future : futures) {
            future.syncUninterruptibly();
        }
    }
    
    

    代码片段七、

    这里的释放锁的底层lua脚本,和加锁很类似,就不做具体的分析了,一眼看上去,其实还是很简单的

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                "end;" +
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; "+
                "end; " +
                "return nil;",
                Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
    
    
    }
    

    总结

    其实Redisson中的MultiLock的加锁与释放锁相对来说还是比较简单的,这也归根于Redisson的源码写的比较优雅又关系
    最后释放锁的Lua 脚本就不一行一行的分析注释了,只要之前跟着之前的文章,这些lua脚本相对来说还是比较简单的

    相关文章

      网友评论

        本文标题:7.Redisson源码剖析-MultiLock加锁与释放锁

        本文链接:https://www.haomeiwen.com/subject/esofvktx.html