美文网首页
Redsson源码分析

Redsson源码分析

作者: 半吊子a | 来源:发表于2022-01-03 10:52 被阅读0次

分布式锁四个特征
1、互斥性,任务时刻只有一个进程取得锁
2、防死锁,惹加锁进程已经失去联系,则锁需要释放
3、加锁和解锁需要同一个进程,
4、锁需要锁缓期
分布式锁方案
1、基于Redis进行分布式锁
2、基于Zookeeper进行分布式锁
3、基于Mysql进行分布式锁

下面分析基于Redis进行分布式锁,加锁流程
1、基于Redis执行命令是单线程,定义一个唯一Key,value可以是uuid,用来保护加锁和解锁是同一进程
2、利用set Nx 、exits等命令进行加锁,再设置key超时时间,避免死锁
3、这样子可以实现分布式锁,但是它没有锁缓期的功能

官方推荐使用Redsson
1、我们先引用对应的jar包


image.png

2、再配置一下RedissonConfig,我使用了简单的配置,如果我对其他配置进行设置,可以看Config里面有东东。


image.png
image.png
image.png
3、这样子一个分布式锁就完成了

Redsson源码分析

image.png
我们进去tryLock方法
image.png
一直从代码找,找到tryAcquireAsync,你发现leaseTime,其实它就是一个Watch Dog机制
image.png
在tryLockInnerAsync方法中,发现加锁是执行一个加锁lua脚本,大概如下
if (redis.call('exists', KEYS[1]) == 0) //检查key是否存在
then redis.call('hincrby', KEYS[1], ARGV[2], 1); //hash 加1
redis.call('pexpire', KEYS[1], ARGV[1]); //设置有效期,耗秒级
return nil;
end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) //检查hash 结构,key是否存在
then redis.call('hincrby', KEYS[1], ARGV[2], 1); //hash 加1
redis.call('pexpire', KEYS[1], ARGV[1]); //设置有效期,耗秒级
return nil;
end;
return redis.call('pttl', KEYS[1]); //返回锁过期时间,耗秒级
或加锁成功则返回null,否则返回key的ttl,
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
  //执行lua脚本,获取锁
  Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
  if (ttl == null) {
     //获取到锁
     return true;
  } else {
     //判断是否超过获取锁的等待时间
     time -= System.currentTimeMillis() - current;
     if (time <= 0L) {
        //超过等待时间,获取锁失败
        this.acquireFailed(waitTime, unit, threadId);
        return false;
     } else {
        current = System.currentTimeMillis();

        //订阅当前线程id,释放事件
        RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
        //阻塞等待锁释放,返回false,表示当前等待时间超过锁最大等待时间,取消订阅,返回加锁失败,返回true,则不停等待
        if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
           if (!subscribeFuture.cancel(false)) {
              subscribeFuture.onComplete((res, e) -> {
                 if (e == null) {
                    this.unsubscribe(subscribeFuture, threadId);
                 }

              });
           }

           this.acquireFailed(waitTime, unit, threadId);
           return false;
        } else {
           try {
            //超过等待时间,获取锁失败
              time -= System.currentTimeMillis() - current;
              if (time <= 0L) {
                //超过等待时间,获取锁失败
                 this.acquireFailed(waitTime, unit, threadId);
                 boolean var20 = false;
                 return var20;
              } else {
                 boolean var16;
                 
                 //do..while不停尝试获取锁
                 do {
                    long currentTime = System.currentTimeMillis();
                    ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
                    if (ttl == null) {
                       var16 = true;
                       return var16;
                    }

                    time -= System.currentTimeMillis() - currentTime;
                    if (time <= 0L) {
                     //超过等待时间,获取锁失败
                       this.acquireFailed(waitTime, unit, threadId);
                       var16 = false;
                       return var16;
                    }

                    currentTime = System.currentTimeMillis();
                    // 利用共享锁来阻塞等待判断是否允许等待共享锁,允许则加入共享锁等待释放信号
                    // 1、latch其实是个信号量Semaphore,调用其tryAcquire方法会让当前线程阻塞一段时间,避免了在while循环中频繁请求获取锁;
                    // 2、该Semaphore的release方法,会在订阅解锁消息的监听器消息处理方法org.redisson.pubsub.LockPubSub#onMessage调用;当其他线程释放了占用的锁,会广播解锁消息,监听器接收解锁消息,并释放信号量,最终会唤醒阻塞在这里的线程。

                    if (ttl >= 0L && ttl < time) {

                       ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } else {
                       ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                    }

                    time -= System.currentTimeMillis() - currentTime;
                 } while(time > 0L);

                 this.acquireFailed(waitTime, unit, threadId);
                 var16 = false;
                 return var16;
              }
           } finally {
              this.unsubscribe(subscribeFuture, threadId);
           }
        }
     }
  }

}

相关文章

网友评论

      本文标题:Redsson源码分析

      本文链接:https://www.haomeiwen.com/subject/zcrxcrtx.html