美文网首页分布式开发Java 开发
Redisson 源码解析,如何利用Redis实现分布式可重入锁

Redisson 源码解析,如何利用Redis实现分布式可重入锁

作者: 殷天文 | 来源:发表于2019-12-04 22:37 被阅读0次

    最开始使用Redisson 的api的时候,我觉得哇,这个api 太牛逼了居然有分布式的可重入锁,正好最近研究了下Redisson的源码,和大家分享一下

    前言

    首先我们先回顾一下 Java 中的 ReentrantLock 是如何实现的?

    这里我先简单介绍一下ReentrantLock 实现的思路

    • 锁标识:通过AQS的state变量作为锁标识,利用Java的CAS保证多线程竞争锁时的线程安全问题

    • 队列:未竞争到锁的线程进入AQS的队列并挂起,等待解锁时被唤醒(或者超时)

    如何设计分布式可重入锁

    首先锁标识,这个在Redis中很容易实现,可以用lock name 作为key,当前线程生成一个uuid,作为value,加上Redis 单线程模型,实现线程安全的锁竞争

    这种方式在之前的博客里也提到过,可以参考下 Redis分布式锁的正确实现方式

    但是如何基于Redis 做一个队列,像Java那样可以挂起唤醒线程呢?这点我在看源码之前一直没有想到...

    那么Redisson 是如何做的呢?

    答案:利用Redis的发布订阅,加上Java的Semaphore(信号量,不了解Semaphore的小伙伴可以Google一下)

    Redisson 分布式锁实现思路

    锁标识:Hash 数据结构,key 为锁的名字,filed 当前竞争锁成功线程的"唯一标识",value 重入次数

    队列:所有竞争锁失败的线程,会订阅当前锁的解锁事件,利用 Semaphore 实现线程的挂起和唤醒

    源码分析

    我们来看一下tryLock方法的源码

        public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
            long time = unit.toMillis(waitTime);
            long current = System.currentTimeMillis();
            long threadId = Thread.currentThread().getId();
            // 尝试获取锁,返回null 代表获取锁成功,当获取锁失败时返回当前锁的释放时间
            Long ttl = tryAcquire(leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                return true;
            }
            
            // 如果此时已经超过等待时间则获取锁失败
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
            
            current = System.currentTimeMillis();
            // 订阅解锁事件
            RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
            // 等待订阅成功,成功后唤醒当前线程
            if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
                if (!subscribeFuture.cancel(false)) {
                    subscribeFuture.onComplete((res, e) -> {
                        if (e == null) {
                            unsubscribe(subscribeFuture, threadId);
                        }
                    });
                }
                acquireFailed(threadId);
                return false;
            }
    
            try {
                // 再次判断一下是否超时
                time -= System.currentTimeMillis() - current;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            
                while (true) {
                    long currentTime = System.currentTimeMillis();
                    // 尝试获取锁
                    ttl = tryAcquire(leaseTime, unit, threadId);
                    // lock acquired
                    if (ttl == null) {
                        return true;
                    }
    
                    time -= System.currentTimeMillis() - currentTime;
                    if (time <= 0) {
                        acquireFailed(threadId);
                        return false;
                    }
    
                    // waiting for message
                    currentTime = System.currentTimeMillis();
                    if (ttl >= 0 && ttl < time) {
                        // 等待解锁消息,此处利用Semaphore,锁未释放时,permits=0,线程处于挂起状态
                        // 当发布解锁消息时,当前的Semaphore对象的release() permits=1
                        // 所有的客户端都会有一个线程被唤醒,去尝试竞争锁
                        getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } else {
                        getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                    }
    
                    time -= System.currentTimeMillis() - currentTime;
                    if (time <= 0) {
                        acquireFailed(threadId);
                        return false;
                    }
                }
            } finally {
                unsubscribe(subscribeFuture, threadId);
            }
    //        return get(tryLockAsync(waitTime, leaseTime, unit));
        }
    

    tryAcquire(leaseTime, unit, threadId); 这个方法我们下面会分析,现在我们只需要知道这个方法是用来获取锁就可以了

    这个时候我们已经可以理清Redisson可重入锁的思路了

    1. 获取锁
    2. 如果获取锁失败,订阅解锁事件
    3. 之后是一个无限循环
    while(true) {
      // 尝试获取锁
    
      // 判断是否超时
    
      // 等待解锁消息释放信号量 
      //(此时每个Java客户端都可能会有多个线程被挂起,但是只有一个线程会被唤醒)
    
      // 判断是否超时
    }
    

    利用信号量,合理控制线程对锁的竞争,合理利用系统资源,可以说做的灰常的奈斯了

    需要注意:
    !await(subscribeFuture, time, TimeUnit.MILLISECONDS) ,这里很多博客都解释错了,这里并不是等待发布解锁消息,只要订阅事件成功后,就会往下执行,真正等待解锁消息的是 getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);

    这里你可能不信,为什么我说的就对啊,debug一下你就知道

    tryLockInnerAsync

    tryAcquire 内部依靠 tryLockInnerAsync 来实现获取锁的逻辑,我们来看下源码

        <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
            internalLockLeaseTime = unit.toMillis(leaseTime);
    
            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                      // 是否存在锁
                      "if (redis.call('exists', KEYS[1]) == 0) then " +
                           // 不存在则创建
                          "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                          // 设置过期时间
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          // 竞争锁成功 返回null
                          "return nil; " +
                      "end; " +
                       // 如果锁已经被当前线程获取
                      "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                           // 重入次数加1
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                      "end; " +
                      // 锁被其他线程获取,返回锁的过期时间
                      "return redis.call('pttl', KEYS[1]);",
    
                        // 下面三个参数分别为 KEYS[1], ARGV[1], ARGV[2]
                        // 即锁的name,锁释放时间,当前线程唯一标识
                        Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
        }
    

    tryLockInnerAsync 中利用lua脚本 和 Redis 单线程的特点来实现锁的竞争

    这里可以看到锁的结构,和我们上文所说的一样,Hash 数据结构,key 为锁的name,filed 当前竞争锁成功线程的"唯一标识",value 重入次数

    unlockInnerAsync

    接下来我们再来看解锁的核心代码

        protected RFuture<Boolean> unlockInnerAsync(long threadId) {
            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                    // 用锁的name和线程唯一标识去判断是否存在这样的键值对
                    // 解铃还须系铃人,不存在则无权解锁,返回null
                    "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                    "end; " +
                    // 解锁逻辑
                    // 冲入次数-1
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                    // 如果大于0 代表当前线程重入锁多次无法解锁,更新锁的有效时间
                    "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                    "else " +
                        // 解锁,删除key
                        "redis.call('del', KEYS[1]); " +
                        // 发布解锁消息
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; "+
                    "end; " +
                    "return nil;",
                    // KEYS[1],KEYS[2]
                    // 锁的name,发布订阅的Channel
                    Arrays.<Object>asList(getName(), getChannelName()), 
                    // ARGV[1] ~ ARGV[3]
                    // 解锁消息,释放时间,当前线程唯一标识
                    LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
    
        }
    

    发布解锁消息后,会调用到LockPubSub 的 onMessage,释放信号量,唤醒等待锁的线程

    public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {
    
        public static final Long UNLOCK_MESSAGE = 0L;
        public static final Long READ_UNLOCK_MESSAGE = 1L;
    
        public LockPubSub(PublishSubscribeService service) {
            super(service);
        }
        
        @Override
        protected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) {
            return new RedissonLockEntry(newPromise);
        }
    
        @Override
        protected void onMessage(RedissonLockEntry value, Long message) {
            if (message.equals(UNLOCK_MESSAGE)) {
                Runnable runnableToExecute = value.getListeners().poll();
                if (runnableToExecute != null) {
                    runnableToExecute.run();
                }
    
                // 释放信号量
                value.getLatch().release();
            } else if (message.equals(READ_UNLOCK_MESSAGE)) {
                while (true) {
                    Runnable runnableToExecute = value.getListeners().poll();
                    if (runnableToExecute == null) {
                        break;
                    }
                    runnableToExecute.run();
                }
    
                value.getLatch().release(value.getLatch().getQueueLength());
            }
        }
    
    }
    
    

    参考

    欢迎点赞、转发。你的支持就是对我最大的帮助

    相关文章

      网友评论

        本文标题:Redisson 源码解析,如何利用Redis实现分布式可重入锁

        本文链接:https://www.haomeiwen.com/subject/smohgctx.html