前言
在日常的业务开发中,为了保证线上服务的安全性,往往同一个服务会部署多个服务器,以防止其中一个服务器宕机导致的服务不可用。
在这个背景下,有些接口又不允许同一时刻有多个线程同时访问,或者说某两个逻辑不允许同一时刻有多个线程同时访问,比如说两个人pk时,不能同时使用同一种道具。在此时,就需要分布式锁来保证。
实现原理
常见的redis分布式锁实现原理其实非常简单。
利用redis的单线程,具备互斥的特性,通过setnx、exist等来实现简单的分布式锁。
以下会介绍几个自主实现的redis分布式锁。
以下通过spring-data-redis中jedis客户端的方式来进行示例
- 快速失败
尝试获取锁,已经加锁则直接失败
@Resource
private RedisTemplate redisTemplate;
private Boolean tryLock() {
return redisTemplate.opsForValue().setIfAbsent("lock", "true", 10, TimeUnit.SECONDS);
}
类似幂等性的一个功能,当然也可以做如果未获取锁完成其他业务逻辑的功能。
- 等待获取锁
尝试获取锁,如果已经加锁,则一直等待
@Resource
private RedisTemplate redisTemplate;
private Boolean checkLock() {
try {
while (!redisTemplate.opsForValue().setIfAbsent("lock", "true", 10, TimeUnit.SECONDS)) {
Thread.sleep(10);
}
//业务逻辑
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
redisTemplate.delete("lock");
}
return true;
}
在某些场景下,必须获取到锁,然后进行下一步的业务逻辑时需要,但是如果获取锁的线程运行时间过长,会造成该线程一直等待,当并发高时,会造成异常。
- 等待指定时间获取锁
尝试获取锁,如果已经加锁,则等待指定时间,再次尝试获取锁,超过指定次数,则自动认为获取锁
@Resource
private RedisTemplate redisTemplate;
private Boolean checkLockLimitTimes(Integer times) {
int tryTimes = 0;
try {
while (!redisTemplate.opsForValue().setIfAbsent("lock", "true", 10, TimeUnit.SECONDS)) {
Thread.sleep(10);
tryTimes++;
if (tryTimes >= times) {
break;
}
}
//业务逻辑
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
redisTemplate.delete("lock");
}
return true;
}
在这种模式下,等待超时的情况下,业务会自动获取锁,完成业务之后释放锁。
但是也会存在问题,A获取锁,B尝试获取锁超时并获取锁,A完成业务释放锁,此时,会把B的锁同时释放,如果此时有C进来,就会获取锁,而且不等待。
这种模式在一些并发要求并不高的情况下,还是相对适用的,对并发要求高的场景不建议适用。
- 获取锁超时拒绝
尝试获取锁,超过指定次数则自动获取失败
@Resource
private RedisTemplate redisTemplate;
private Boolean checkLockWithTimesReject(Integer times) {
int tryTimes = 0;
try {
while (!redisTemplate.opsForValue().setIfAbsent("lock", "true", 10, TimeUnit.SECONDS)) {
Thread.sleep(10);
tryTimes++;
if (tryTimes >= times) {
return false;
}
}
//业务逻辑
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
redisTemplate.delete("lock");
}
return true;
}
这个模式是在上面未获取锁则失败的一个扩展版本,并不是未获取锁直接拒绝,而是有一个尝试的过程,一定程序下相对友好,不过对项目压力也会有一点。
以上是一些自己实现的简单redis分布式锁,能够达到部分业务需求,不过也同时存在问题:
1.如果不是用spring-data-redis,而是用原生的jedis,因为setnx和expire是分开运行的,容易setnx之后expire失败,则会导致key不会自动失效,如果解锁的代码异常没走到,则所有后续的都获取不到锁。
2.如果程序获取锁之后,运行时间超过锁过期时间,则会导致锁被其他线程获取,在一些要求比较高的场景,就会出现问题。
3.A获取锁,B尝试获取锁超时并获取锁,A完成业务释放锁,此时,会把B的锁同时释放,如果此时有C进来,就会获取锁,而且不等待。
redisson客户端
Redisson使用非阻塞的I/O和基于Netty框架的事件驱动的通信层,其方法调用是异步的。Redisson的API是线程安全的,所以可以操作单个Redisson连接来完成各种操作。
Redisson在2019年1月16日成为GitHub里星星最多的Redis Java客户端。
Jedis仅支持基本的数据类型如:String、Hash、List、Set、Sorted Set。
Redisson不仅提供了一系列的分布式Java常用对象,基本可以与Java的基本数据结构通用,还提供了许多分布式服务,其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service)。还实现了可重入锁(Reentrant Lock)、公平锁(Fair Lock、联锁(MultiLock)、 红锁(RedLock)、 读写锁(ReadWriteLock)等,还提供了许多分布式服务。
对于从jedis迁移到redisson的使用者来说,前期迁移过程会比较痛苦,需要从原生的指令对应到redisson中的方法,具体的对应关系可以查阅以下网页Redis命令和Redisson对象匹配列表
分布式锁
常见的分布式锁实现方式有三种:
- 基于数据库实现分布式锁
- 基于zookeeper实现分布式锁
- 基于Redis缓存实现分布式锁
以上三种方式都可以实现分布式锁,其中,从健壮性考虑, 用 zookeeper 会比用 Redis 实现更好,但从性能角度考虑,基于 Redis 实现性能会更好,而且zookeeper对使用者来说上手较慢,如何选择,还是取决于业务需求。
分布式锁需满足的四个要求
- 互斥性。在任意时刻,只有一个客户端能持有锁。
- 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
- 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了,即不能误解锁。
- 具有容错性。只要大多数Redis节点正常运行,客户端就能够获取和释放锁。
这里会讲解一下redisson分布式锁的实现
public void tryLock(Long waitTime, Long leaseTime, String lockKey) {
RLock lock = redissonClient.getLock(lockKey);
try {
lock.tryLock(waitTime, leaseTime, TimeUnit.SECONDS);
} catch (InterruptedException e) {
System.out.println("==================");
}
}
tryLock
中尝试获取锁是通过tryAcquire
,tryAcquire
内部通过调用tryLockInnerAsync
实现申请锁的逻辑。申请锁并返回锁有效期还剩余的时间,如果为空说明锁未被其它线程申请则直接获取并返回,如果获取到时间,则进入等待竞争逻辑。
加锁流程图如下:
其中比较重要的lua语言源码如下:
//如果不存在Sting keys 则设置
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
//设置过期key
//如果Collections.singletonList(getName())的key不存在
//将Collections.singletonList(getName())为key,getLockName(threadId)为field加1
//将Collections.singletonList(getName())设置过期时间为internalLockLeaseTime
//返回null
//如果Collections.singletonList(getName())为key,getLockName(threadId)为field存在
//将Collections.singletonList(getName())为key,getLockName(threadId)为field加1
//将Collections.singletonList(getName())设置过期时间为internalLockLeaseTime
//返回null
//直接获取将Collections.singletonList(getName())过期时间
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
同样的,解锁的关键性源码如下:
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
//判断key为getName(),field为getLockName(threadId)是否存在
//不存在则返回null
//使key为getName(),field为getLockName(threadId)减1
//如果大于0
//则更新可以过期时间为internalLockLeaseTime
//返回false
//小于等于0
//移除key
//发布key为getChannelName(),值为LockPubSub.UNLOCK_MESSAGE的数据
//返回true
//否则返回null
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
具体流程图如下:
解锁流程图.png
阅读redisson实现分布式锁源码过程中,会发现比较重要的一些代码.
例如:为了防止无用的获取锁,会按照等待时间和过期时间的最小值,通过JDK并发的信号量工具Semaphore
来阻塞进行,以减少无用获取
if (ttl >= 0 && ttl < time) {
subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
同时,通过redis发布订阅功能来实现当获取锁的线程完成时做到信号量的激活。
public RFuture<E> subscribe(String entryName, String channelName) {
//同一时刻只允许一个线程进入
AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
RPromise<E> newPromise = new RedissonPromise<>();
semaphore.acquire(() -> {
if (!newPromise.setUncancellable()) {
semaphore.release();
return;
}
E entry = entries.get(entryName);
if (entry != null) {
entry.acquire();
semaphore.release();
entry.getPromise().onComplete(new TransferListener<E>(newPromise));
return;
}
E value = createEntry(newPromise);
value.acquire();
E oldValue = entries.putIfAbsent(entryName, value);
if (oldValue != null) {
oldValue.acquire();
semaphore.release();
oldValue.getPromise().onComplete(new TransferListener<E>(newPromise));
return;
}
RedisPubSubListener<Object> listener = createListener(channelName, value);
service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
});
return newPromise;
}
另外,redisson还提供了,为了获取锁的线程就算超过锁的过期时间,仍然持有锁,提供了过期时间更新的功能。
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
});
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
renewExpiration();
}
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
总结
通过 Redisson 实现分布式可重入锁,比纯自己通过set key value px milliseconds nx +lua 实现的效果更好些,虽然基本原理都一样,因为通过分析源码可知,RedissonLock是可重入的,并且考虑了失败重试,可以设置锁的最大等待时间, 在实现上也做了一些优化,减少了无效的锁申请,提升了资源的利用率。(通过future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
实现)
需要特别注意的是,RedissonLock
同样没有解决 节点挂掉的时候,存在丢失锁的风险的问题。而现实情况是有一些场景无法容忍的,所以 Redisson 提供了实现了redlock算法的RedissonRedLock
,RedissonRedLock
真正解决了单点失败的问题,代价是需要额外的为 RedissonRedLock
搭建Redis环境。
通常,出现主从切换
、集群脑裂
时,可能存在有两个线程同时获取锁的情况,具体原因可以自行百度查看。
所以,如果业务场景可以容忍这种小概率的错误,则推荐使用 RedissonLock, 如果无法容忍,则推荐使用 RedissonRedLock
。
RedissonRedLock
具体实现在org.redisson.RedissonMultiLock#tryLock
中,简单的讲就是通过多个RLock
锁,同步都获取到锁来才算真正获取锁,这样能够在一定情况下避免因为节点挂掉或者主从切换时,导致的锁问题。
网友评论