美文网首页javaWeb学习
Redisson分布式锁使用即源码解读

Redisson分布式锁使用即源码解读

作者: jackcooper | 来源:发表于2022-01-07 22:50 被阅读0次

    Redisson 提供的分布式锁

    image.png

    使用实例

    private void redissonDoc() throws InterruptedException {
        //1. 普通的可重入锁
        RLock lock = redissonClient.getLock("generalLock");
    
        // 拿锁失败时会不停的重试
        // 具有Watch Dog 自动延期机制 默认续30s 每隔30/3=10 秒续到30s
        lock.lock();
    
        // 尝试拿锁10s后停止重试,返回false
        // 具有Watch Dog 自动延期机制 默认续30s
        boolean res1 = lock.tryLock(10, TimeUnit.SECONDS);
    
        // 拿锁失败时会不停的重试
        // 没有Watch Dog ,10s后自动释放
        lock.lock(10, TimeUnit.SECONDS);
    
        // 尝试拿锁100s后停止重试,返回false
        // 没有Watch Dog ,10s后自动释放
        boolean res2 = lock.tryLock(100, 10, TimeUnit.SECONDS);
    
        //2. 公平锁 保证 Redisson 客户端线程将以其请求的顺序获得锁
        RLock fairLock = redissonClient.getFairLock("fairLock");
    
        //3. 读写锁 没错与JDK中ReentrantLock的读写锁效果一样
        RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("readWriteLock");
        readWriteLock.readLock().lock();
        readWriteLock.writeLock().lock();
    }
    

    如果拿到分布式锁的节点宕机,且这个锁正好处于锁住的状态时,会出现锁死的状态,为了避免这种情况的发生,锁都会设置一个过期时间。这样也存在一个问题,加入一个线程拿到了锁设置了30s超时,在30s后这个线程还没有执行完毕,锁超时释放了,就会导致问题,Redisson给出了自己的答案,就是 watch dog 自动延期机制。

    Redisson提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期,也就是说,如果一个拿到锁的线程一直没有完成逻辑,那么看门狗会帮助线程不断的延长锁超时时间,锁不会因为超时而被释放。

    默认情况下,看门狗的续期时间是30s,也可以通过修改Config.lockWatchdogTimeout来另行指定。

    另外Redisson 还提供了可以指定leaseTime参数的加锁方法来指定加锁的时间。超过这个时间后锁便自动解开了,不会延长锁的有效期

    watch dog 核心源码解读

    // 直接使用lock无参数方法
    public void lock() {
        try {
            lock(-1, null, false);
        } catch (InterruptedException e) {
            throw new IllegalStateException();
        }
    }
    
    // 进入该方法 其中leaseTime = -1
    private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return;
        }
    
       //...
    }
    
    // 进入 tryAcquire(-1, leaseTime, unit, threadId)
    private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
    }
    
    // 进入 tryAcquireAsync
    private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        if (leaseTime != -1) {
            return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        //当leaseTime = -1 时 启动 watch dog机制
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
                                                commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
                                                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        //执行完lua脚本后的回调
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e != null) {
                return;
            }
    
            if (ttlRemaining == null) {
                // watch dog 
                scheduleExpirationRenewal(threadId);
            }
        });
        return ttlRemainingFuture;
    }
    

    scheduleExpirationRenewal 方法开启监控

    private void scheduleExpirationRenewal(long threadId) {
        ExpirationEntry entry = new ExpirationEntry();
        //将线程放入缓存中
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
        //第二次获得锁后 不会进行延期操作
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
        } else {
            entry.addThreadId(threadId);
            
            // 第一次获得锁 延期操作
            renewExpiration();
        }
    }
    
    // 进入 renewExpiration()
    private void renewExpiration() {
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        //如果缓存不存在,那不再锁续期
        if (ee == null) {
            return;
        }
        
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                
                //执行lua 进行续期
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getName() + " expiration", e);
                        return;
                    }
                    
                    if (res) {
                        //延期成功,继续循环操作
                        renewExpiration();
                    }
                });
            }
            //每隔internalLockLeaseTime/3=10秒检查一次
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        
        ee.setTimeout(task);
    }
    
    //lua脚本 执行包装好的lua脚本进行key续期
    protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return 0;",
                Collections.singletonList(getName()),
                internalLockLeaseTime, getLockName(threadId));
    }
    

    关键结论

    上述源码读过来我们可以记住几个关键情报:

    • watch dog 在当前节点存活时每10s给分布式锁的key续期 30s;
    • watch dog 机制启动,且代码中没有释放锁操作时,watch dog 会不断的给锁续期;
    • 从可2得出,如果程序释放锁操作时因为异常没有被执行,那么锁无法被释放,所以释放锁操作一定要放到 finally {} 中;
      看到3的时候,可能会有人有疑问,如果释放锁操作本身异常了,watch dog 还会不停的续期吗?下面看一下释放锁的源码,找找答案
    // 锁释放
    public void unlock() {
        try {
            get(unlockAsync(Thread.currentThread().getId()));
        } catch (RedisException e) {
            if (e.getCause() instanceof IllegalMonitorStateException) {
                throw (IllegalMonitorStateException) e.getCause();
            } else {
                throw e;
            }
        }
    }
    
    // 进入 unlockAsync(Thread.currentThread().getId()) 方法 入参是当前线程的id
    public RFuture<Void> unlockAsync(long threadId) {
        RPromise<Void> result = new RedissonPromise<Void>();
        //执行lua脚本 删除key
        RFuture<Boolean> future = unlockInnerAsync(threadId);
    
        future.onComplete((opStatus, e) -> {
            // 无论执行lua脚本是否成功 执行cancelExpirationRenewal(threadId) 方法来删除EXPIRATION_RENEWAL_MAP中的缓存
            cancelExpirationRenewal(threadId);
    
            if (e != null) {
                result.tryFailure(e);
                return;
            }
    
            if (opStatus == null) {
                IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                        + id + " thread-id: " + threadId);
                result.tryFailure(cause);
                return;
            }
    
            result.trySuccess(null);
        });
    
        return result;
    }
    
    // 此方法会停止 watch dog 机制
    void cancelExpirationRenewal(Long threadId) {
        ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (task == null) {
            return;
        }
        
        if (threadId != null) {
            task.removeThreadId(threadId);
        }
    
        if (threadId == null || task.hasNoThreads()) {
            Timeout timeout = task.getTimeout();
            if (timeout != null) {
                timeout.cancel();
            }
            EXPIRATION_RENEWAL_MAP.remove(getEntryName());
        }
    }
    

    释放锁的操作中 有一步操作是从 EXPIRATION_RENEWAL_MAP 中获取 ExpirationEntry 对象,然后将其remove,结合watch dog中的续期前的判断:

    EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ent == null) {
        return;
    }
    

    如果释放锁操作本身异常了,watch dog 还会不停的续期吗?不会,因为无论释放锁操作是否成功,EXPIRATION_RENEWAL_MAP中的目标 ExpirationEntry 对象已经被移除了,watch dog 通过判断后就不会继续给锁续期了。


    Redisson实现分布式锁(1)---原理

    Redisson 官方文档

    谈谈基于Redis分布式锁(下)- Redisson源码解析

    https://www.cnblogs.com/keeya/p/14332131.html

    相关文章

      网友评论

        本文标题:Redisson分布式锁使用即源码解读

        本文链接:https://www.haomeiwen.com/subject/srmscrtx.html