美文网首页
RedissonLock的分布式锁过程解析(源码)

RedissonLock的分布式锁过程解析(源码)

作者: hubo_li | 来源:发表于2020-09-29 14:56 被阅读0次

    笔者最近在面试过程中,发现面试官喜欢面试关于redis的分布式锁的实现。为了更加清晰地了解加锁工程,然后看了下redisson下封装的锁的操作过程。

    redis锁的实现是一个学习redis的难点,那么了解其原理可以让我们更好的使用好lock。
    首先聊聊单体redis下如何实现锁的。

    单体redis模式下的锁实现

    接下来先看下加锁的实现。

    加锁

    // 构造redisson实现分布式锁必要的Config
    Config config = new Config();
    config.useSingleServer().setAddress("redis://172.201.1.180:5379").setPassword("a123456").setDatabase(0);
    // 构造RedissonClient
    RedissonClient redissonClient = Redisson.create(config);
    // 设置锁定资源名称
    RLock disLock = redissonClient.getLock("DISLOCK1");
    boolean isLock;
    try {
        //尝试获取分布式锁
        isLock = disLock.tryLock(500, 5000, TimeUnit.MILLISECONDS);
        if (isLock) {
            //TODO if get lock success, do something;
            Thread.sleep(5000);
        }
    } catch (Exception e) {
    } finally {
        // 无论如何, 最后都要解锁
        disLock.unlock();
    }
    

    首先看到trylock的源码:

        public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
            return this.tryLock(waitTime, -1L, unit);
        }
    

    具体进入到trylock中。

        public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
            long time = unit.toMillis(waitTime);
            long current = System.currentTimeMillis();
            final long threadId = Thread.currentThread().getId();
            //要注意到从上面得到的leaseTime值为-1L。 
            Long ttl = this.tryAcquire(leaseTime, unit, threadId);
          //如果ttl为空,说明获取到锁,当前没有线程加锁。
            if (ttl == null) {
                return true;
            } else {
                time -= System.currentTimeMillis() - current;
                //规定时间内没有获取到锁,然后加锁失败。
                if (time <= 0L) {
                    this.acquireFailed(threadId);
                    return false;
                } else {
                  //
                    current = System.currentTimeMillis();
                    //对当前线程进行订阅。
                    final RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
                    if (!this.await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
                        if (!subscribeFuture.cancel(false)) {
                            subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
                                public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                                  //对该位置进行监听。
                                    if (subscribeFuture.isSuccess()) {
                                        RedissonLock.this.unsubscribe(subscribeFuture, threadId);
                                    }
    
                                }
                            });
                        }
    
                        this.acquireFailed(threadId);
                        return false;
                    } else {
                        try {
                    //经过上面的处理,看下现在时间是否过期
                            time -= System.currentTimeMillis() - current;
                            if (time <= 0L) {
                        //过期了就获取失败
                                this.acquireFailed(threadId);
                                boolean var20 = false;
                                return var20;
                            } else {
                                boolean var16;
                                do {
                         //然后使用tryacquire去获取
                                    long currentTime = System.currentTimeMillis();
                            //下面对tryAcquire进行获取。
                                    ttl = this.tryAcquire(leaseTime, unit, threadId);
                                //ttl是空,说明获取到锁
                                      if (ttl == null) {
                                        var16 = true;
                                        return var16;
                                    }
    
                                    time -= System.currentTimeMillis() - currentTime;
                          //看时间是否到了,到了就过期了。
                                    if (time <= 0L) {
                                        this.acquireFailed(threadId);
                                        var16 = false;
                                        return var16;
                                    }
    
                                    currentTime = System.currentTimeMillis();
                            //ttl 大于0,并且ttl小于过期时间,那么尝试去获取锁。
                                    if (ttl >= 0L && ttl < time) {
                                        this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                                    } else {
                                        this.getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                                    }
    
                                    time -= System.currentTimeMillis() - currentTime;
                                } while(time > 0L);
    
                                this.acquireFailed(threadId);
                                var16 = false;
                                return var16;
                            }
                        } finally {
                         //最终释放掉订阅。
                            this.unsubscribe(subscribeFuture, threadId);
                        }
                    }
                }
            }
        }
    

    从 Long ttl = this.tryAcquire(leaseTime, unit, threadId)这句进去到tryAcuire中

        private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
            // 从上面得到leastTime的值为-1L。那么会跳过上面的判断,走下面的路径。
            if (leaseTime != -1L) {
                return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
            } else {
              //先按照30秒的过期时间来执行获取锁的方法
                RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(30L, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
              //如果还持有这个锁,则开启定时任务不断刷新该锁的过期时间
             // 异步获取结果,如果获取锁成功,则启动定时线程进行锁续约
                ttlRemainingFuture.addListener(new FutureListener<Long>() {
                    public void operationComplete(Future<Long> future) throws Exception {
         
                        if (future.isSuccess()) {
                            Long ttlRemaining = (Long)future.getNow();
                           //如何续约,看下scheduleExpirationRenewal实现方法。
                           //拿到了锁
                            if (ttlRemaining == null) {
                                RedissonLock.this.scheduleExpirationRenewal(threadId);
                            }
    
                        }
                    }
                });
                return ttlRemainingFuture;
            }
        }
    

    先看tryLockInnerAsync尝试去获取到锁,这里里面的实现主要是通过lua脚本来实现的,保证了原子性。

    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit,     
                                long threadId, RedisStrictCommand<T> command) {
    
            //过期时间
            internalLockLeaseTime = unit.toMillis(leaseTime);
    
            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                      //如果锁不存在,则通过hset设置它的值,并设置过期时间
                      "if (redis.call('exists', KEYS[1]) == 0) then " +
                          "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                      "end; " +
                      //如果锁已存在,并且锁的是当前线程,则通过hincrby给数值递增1
                      // 续约internalLockLeaseTime(30s)            
                      "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                      "end; " +
                      //如果锁已存在,但并非本线程,则返回过期时间ttl
                      "return redis.call('pttl', KEYS[1]);",
            Collections.<Object>singletonList(getName()), 
                    internalLockLeaseTime, getLockName(threadId));
        }
    

    然后使用定时任务去给这个锁续约(重点)。
    那接下来我们就看到scheduleExpirationRenewal方法的源码实现。

        private void scheduleExpirationRenewal(final long threadId) {
            if (!expirationRenewalMap.containsKey(this.getEntryName())) {
                Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
                    public void run(Timeout timeout) throws Exception {
    //future尝试获取到锁。
                        RFuture<Boolean> future = RedissonLock.this.commandExecutor.evalWriteAsync(RedissonLock.this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
    //如果获取到改锁,然后给改锁进行续约。
     "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) 
    then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; 
    end; return 0;",
     Collections.singletonList(RedissonLock.this.getName()), new Object[]{RedissonLock.this.internalLockLeaseTime, RedissonLock.this.getLockName(threadId)});
                        future.addListener(new FutureListener<Boolean>() {
                            public void operationComplete(Future<Boolean> future) throws Exception {
                                RedissonLock.expirationRenewalMap.remove(RedissonLock.this.getEntryName());
                               //  如果future为非succes,那么就没法去更新lock的时间了。
                                if (!future.isSuccess()) {
                                    RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", future.cause());
                                } else {
                    //如果获取到了锁,然后就循环给锁去做续约,做上面的操作。
                                    if ((Boolean)future.getNow()) {
                                        RedissonLock.this.scheduleExpirationRenewal(threadId);
                                    }
    
                                }
                            }
                        });
                    }
          //每次间隔租期的1/3时间执行这个续约。
    
                }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
                if (expirationRenewalMap.putIfAbsent(this.getEntryName(), task) != null) {
                    task.cancel();
                }
    
            }
        }
    

    上面就是trylock获取锁并进行续约的流程。
    然后看一下tryAcquire的实现。有意思的是这里是用了信号量Semaphore的tryAcqurie方法实现。

        public boolean tryAcquire(long timeout, TimeUnit unit)
            throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }
    

    另外强调一点的是:如果tryLock的时候没有设置leasetime为-1L,那么直接去使用
    return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG)。
    不会有续约的情况了。同时我们也看见里面也实现了可重入锁。

    接下来看下解锁的实现。

    解锁

        public void unlock() {
          //这里主要是看this.unlockInnerAsync方法的实现。
            Boolean opStatus = (Boolean)this.get(this.unlockInnerAsync(Thread.currentThread().getId()));
    //按照提示,得到如果为null,那么没有得到当前的锁。
            if (opStatus == null) {
                throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + Thread.currentThread().getId());
            } else {
                if (opStatus) {
                    this.cancelExpirationRenewal();
                }
    
            }
        }
    

    然后我们进入到unlockInnerAsync中去看下实现原理。

    
    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        // 释放锁时需要在redis实例上执行的lua命令
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                // 如果分布式锁KEY不存在,那么向channel发布一条消息
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                "end;" +
                // 如果分布式锁存在,但是value不匹配,表示锁已经被占用,那么直接返回
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +
                // 如果就是当前线程占有分布式锁,那么将重入次数减1
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                // 重入次数减1后的值如果大于0,表示分布式锁有重入过,那么只设置失效时间,还不能删除
                "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                "else " +
                    // 重入次数减1后的值如果为0,表示分布式锁只获取过1次,那么删除这个KEY,并发布解锁消息
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; "+
                "end; " +
                "return nil;",
                // 这5个参数分别对应KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3]
                Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
    
    }
    
    

    同时我们看下unlockAsync的方法。

    
    public RFuture<Void> unlockAsync(final long threadId) {
        final RPromise<Void> result = new RedissonPromise<Void>();
        
        //解锁方法
        RFuture<Boolean> future = unlockInnerAsync(threadId);
    
        future.addListener(new FutureListener<Boolean>() {
            @Override
            public void operationComplete(Future<Boolean> future) throws Exception {
                if (!future.isSuccess()) {
                    cancelExpirationRenewal(threadId);
                    result.tryFailure(future.cause());
                    return;
                }
                //获取返回值
                Boolean opStatus = future.getNow();
                //如果返回空,则证明解锁的线程和当前锁不是同一个线程,抛出异常
                if (opStatus == null) {
                    IllegalMonitorStateException cause = 
                        new IllegalMonitorStateException("
                            attempt to unlock lock, not locked by current thread by node id: "
                            + id + " thread-id: " + threadId);
                    result.tryFailure(cause);
                    return;
                }
                //解锁成功,取消刷新过期时间的那个定时任务
                if (opStatus) {
                    cancelExpirationRenewal(null);
                }
                result.trySuccess(null);
            }
        });
    
        return result;
    }
    

    同时小伙伴可以注意下forceUnlock()方法加锁的方式:

    在源码中并没有找到forceUnlock()被调用的痕迹(也有可能是我没有找对),但是forceUnlockAsync()方法被调用的地方很多,大多都是在清理资源时删除锁。此部分比较简单粗暴,删除锁成功则并发布锁被删除的消息,返回1结束,否则返回0结束。

    
       public void forceUnlock() {
            this.get(this.forceUnlockAsync());
        }
    
        public RFuture<Boolean> forceUnlockAsync() {
            this.cancelExpirationRenewal();
            return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
     "if (redis.call('del', KEYS[1]) == 1) t
    hen redis.call('publish', KEYS[2], ARGV[1]);
     return 1 else return 0 end",
     Arrays.asList(this.getName(), this.getChannelName()), new Object[]{LockPubSub.unlockMessage});
        }
    

    上文就是redisson的分布式锁实现。下次讲讲redlock的实现原理。

    参考资料:

    redissonLock源码
    https://www.jianshu.com/p/47fd7f86c848
    https://www.jianshu.com/p/7e47a4503b87
    https://my.oschina.net/u/2369201/blog/1573730

    相关文章

      网友评论

          本文标题:RedissonLock的分布式锁过程解析(源码)

          本文链接:https://www.haomeiwen.com/subject/rlkeuktx.html