美文网首页
Redisson重连后WatchDog失效问题解决

Redisson重连后WatchDog失效问题解决

作者: 猫清扬 | 来源:发表于2020-11-09 15:06 被阅读0次

    Redisson分布式锁提供了WatchDog功能,如果你使用了分布式锁且没有设置超时时间Ression会为你设置一个默认的超时时间,且在你没有主动释放锁之前会不断续期。这样既可以保证在持锁期间的代码不会被其他线程执行,也可以防止死锁的发生。

    不过最近在做项目的时候发现我的Redisson断线重连后WatchDog居然失效了。跟了一下Redisson的代码发现了原因,在这里分享一下。

    问题重现

    String name = "REDIS_LOCK"
    try{
       if(!redissonClient.getLock(name).tryLock()){
         return;
       }
       doSomething();
    }catch(Exception e){
       RLock rLock = redissonClient.getLock(name);
       if (rLock.isLocked() && rLock.isHeldByCurrentThread()) {
           rLock.unlock();
       }
    }
    
    

    项目中用的是tryLock(),线程会不断地尝试拿到锁,拿到锁之后线程就会开始执行业务代码。当一个线程拿到锁之后不主动释放,WatchDog就会生效,也就是说只要客户端不与redis中断连接WatchDog就会一直为这个锁续时,其他线程就永远不会执行到你的业务代码。这个时候我们让网络断开一段时间,Redisson就会报以下这个错,我们使用的REDIS_LOCK这个锁在redis里面过了30秒就会自动失效。

    2020-11-06 14:56:53.682 [redisson-timer-4-1] ERROR org.redisson.RedissonLock - Can't update lock REDIS_LOCK expiration
    org.redisson.client.RedisResponseTimeoutException: Redis server response timeout (3000 ms) occured after 3 retry attempts. Increase nettyThreads and/or timeout settings. Try to define pingConnectionInterval setting. Command: null, params: null, channel: [id: 0x1e676dd8, L:/192.168.20.49:58477 - R:/192.168.2.21:6379]
        at org.redisson.command.RedisExecutor$3.run(RedisExecutor.java:333)
        at io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:672)
        at io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:747)
        at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:472)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)
    

    当我们网络正常后程序再执行上面的代码,某个线程持有的REDIS_LOCK这个锁就会在30秒后自动失效,也就是说WatchDog不再为你续时了。这个可能是Redisson的一个bug,目前用的是最新的redisson 3.13.6 版本,可能未来的新版本不会有这个问题。

    分析原因

    下载redisson源码打开RedissonLock这个类,找到我们用的tryLock方法

        @Override
        public boolean tryLock() {
            return get(tryLockAsync());
        }
    

    发现trylock()lock()最终实现的方法是tryAcquireOnceAsync()这个方法,我们看一下这个方法的逻辑

    private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
            //判断有没有设置超时时间(-1表示没有设置)
            if (leaseTime != -1) {
                //异步执行redis加锁脚本
                return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
            }
            //异步执行redis加锁脚本,且根据异步结果判断是否加锁成功
            RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(waitTime,
                                                        commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),//这里获取watchdog的配置时间来作为锁的超时时间
                                                        TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
            ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
                if (e != null) {
                    return;
                }
       
                // lock acquired
                //redis脚本执行成功就会执行watchdog的需时任务
                if (ttlRemaining) {
                    scheduleExpirationRenewal(threadId);
                }
            });
            return ttlRemainingFuture;
        }
    
    

    当没有设置锁的超时时间且加锁成功的时候就会执行scheduleExpirationRenewal(threadId)这个方法。

      private void scheduleExpirationRenewal(long threadId) {
            ExpirationEntry entry = new ExpirationEntry();
            ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
            
            if (oldEntry != null) {
                oldEntry.addThreadId(threadId);
            } else {
                entry.addThreadId(threadId);
                //重新续时逻辑
                renewExpiration();
            }
        }
    

    WatchDog重新续时逻辑

    private void renewExpiration() {
            ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ee == null) {
                return;
            }
            
            Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
                @Override
                public void run(Timeout timeout) throws Exception {
                    ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                    if (ent == null) {
                        return;
                    }
                    Long threadId = ent.getFirstThreadId();
                    if (threadId == null) {
                        return;
                    }
                    
                    RFuture<Boolean> future = renewExpirationAsync(threadId);
                    future.onComplete((res, e) -> {
                        if (e != null) {
                            //报错了timer就不会再执行了
                            log.error("Can't update lock " + getName() + " expiration", e);
                            return;
                        }
                        
                        if (res) {
                            // reschedule itself
                            renewExpiration();
                        }
                    });
                }
            }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
            ee.setTimeout(task);
        }
    

    可以看到renewExpiration()方法核心是一个timer定时任务, 每次执行完延迟watchdog配置时间/3的时间再执行一次。也就是说watchdog默认配置是30000毫秒,这里就是就是每十秒执行一次。但要注意是这个定时任务并不会一直执行下去。

           if (e != null) {
              //报错了timer就不会再执行了
               log.error("Can't update lock " + getName() + " expiration", e);
               return;
           }
                        
           if (res) {
                // reschedule itself
               renewExpiration();
           }
    

    当上一次redis续时脚本发生异常的时候就不再执行了,也就是我们在文章开头看到的那个错误ERROR org.redisson.RedissonLock - Can't update lock REDIS_LOCK expiration。这个设计也是合理的,可以防止资源浪费,那么程序重新trylock()成功的时候应该为重新启动这个定时任务才对。但其实不是,scheduleExpirationRenewal方法是有判断的

            ExpirationEntry entry = new ExpirationEntry();
            ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
            //当ExpirationEntry在EXPIRATION_RENEWAL_MAP里存在就只会添加线程ID,不会重新执行续时逻辑
            if (oldEntry != null) {
                oldEntry.addThreadId(threadId);
            } else {
                entry.addThreadId(threadId);
                //重新续时逻辑
                renewExpiration();
            }
    

    可以看到核心判断是getEntryName()这个方法,作为key存放在EXPIRATION_RENEWAL_MAP中,如果getEntryName()一直不变renewExpiration()就永远不会再执行。问题应该就出在这里。

     public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
            super(commandExecutor, name);
            this.commandExecutor = commandExecutor;
            this.id = commandExecutor.getConnectionManager().getId();
            this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
            this.entryName = id + ":" + name;
            this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
        }
    
        protected String getEntryName() {
            return entryName;
        }
    

    可以看到this.entryName = id + ":" + name;,其中idRedissonClient创建生成的一个UUID,name就是我们使用锁的名字。我们一般会把RedissonClient的单例对象注入到spring容器里,id在springboot启动后就不会再变了。我们每使用一个分布式锁都会起一个固定的name。也就是说在锁名称不变的情况下entryName也不会变,redisson在重新加锁的时候判断entryName已经存在就不会再续时了。

    总结一下:不管是trylock()还是lock()方法,同一个锁redisson会设置一个watchdog给它续时,并把续时信息缓存起来,正常情况下执行unlock()会清除这个缓存。但当客户端与redis断开连接后报"Can't update lock " + getName() + " expiration"错之后watchdog就会失效,断线重连后再执行trylock()或者lock()方法后会因为这个锁的缓存不再执行watchdog的续时逻辑。

    解决办法

    1.增加watchdog超时时长
       @Bean(destroyMethod = "shutdown")
        public RedissonClient redisson(RedissonProperties properties) throws IOException {
            ObjectMapper mapper = new ObjectMapper();
            String jsonString = mapper.writeValueAsString(properties);
            Config config = Config.fromJSON(jsonString);
            config.setLockWatchdogTimeout(150000);
            return Redisson.create(config);
        }
    

    watchdog默认超时时间是30000毫秒,它的执行逻辑是30000/3毫秒执行一次续时,也就是说断线后在1-10000毫秒期间重连成功watchdog下次执行后就不会再报错。我们可以把默认的30000毫秒改成150000毫秒,可以提供断线重连的容错几率。但这样并不能完全解决这个问题。

    2.修改redisson源码
     private void renewExpiration() {
            ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ee == null) {
                return;
            }
            
            Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
                @Override
                public void run(Timeout timeout) throws Exception {
                    ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                    if (ent == null) {
                        return;
                    }
                    Long threadId = ent.getFirstThreadId();
                    if (threadId == null) {
                        return;
                    }
                    
                    RFuture<Boolean> future = renewExpirationAsync(threadId);
                    future.onComplete((res, e) -> {
                        if (e != null) {
                            log.error("Can't update lock " + getName() + " expiration", e);
                            EXPIRATION_RENEWAL_MAP.remove(getEntryName()); //添加异常删除缓存逻辑
                            return;
                        }
                        
                        if (res) {
                            // reschedule itself
                            renewExpiration();
                        }
                    });
                }
            }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
            
            ee.setTimeout(task);
        }
    

    修改RedissonLock类里的renewExpiration()方法,在 if (e != null) {}判断里加上EXPIRATION_RENEWAL_MAP.remove(getEntryName())清除缓存逻辑,这样断线重连后就不会因为缓存问题不再执行renewExpiration()这个方法了。

    相关文章

      网友评论

          本文标题:Redisson重连后WatchDog失效问题解决

          本文链接:https://www.haomeiwen.com/subject/tpfivktx.html