1,Redis使用setnx 实现
2,Redisson 分布式锁;
Redis基于 setnx 实现分布式锁原理:
Redis Setnx 实现分布式锁:
Setnx key value
Redis Setnx(SET if Not eXists) 命令在指定的key 不存在时,为key设置指定的值。
设置成功,返回1, 不成功返回0.
Redis具有先天性,能够保证线程安全问题,多个redis客户端最终只有一个Redis客户端设置成功。
Setnx: key=mayiktRedisLock value=1;该key 如果不存在的时候 执行结果返回1 java层面返回true.
Setnx: key=mayiktRedisLock value=1;该key 如果存在的时候 执行结果返回0 java层面返回false.
set之间的区别:
如果该key 不存在的时候,直接创建,如果存在的时候覆盖。
原理:
获取锁原理:
多个redis客户端执行setnx指令,设置一个相同的Rediskey,谁能创建key成功,谁能获取锁。
如果该key已经存在的情况下,在创建的时候就会返回false。
释放原理:
就是删除key.
Redis实现分布式锁如何避=避免死锁的问题?
如果Redis客户端(获取锁的jvm)宕机的话,如何避免死锁的问题?
zk如何避免该问题?先天性解决了该问题。
可以设置过期时间,过期后该key自动删除。
获取到锁的jvm 业务执行时间>过期key的时间如何处理?
续命:开启一个定时任务实现续命,当我们的业务逻辑没有执行完毕的时候,就会延长过期key的时间。
一直不断续命的情况下,也会发生死锁的问题。
设定续命的次数,续命多次如果还没有执行完业务逻辑的情况下,就应该回滚业务,主动释放锁。
如果当前线程已经获取到锁的情况下,不需要重复获取锁,而是直接复用。
如何考虑避免死锁的问题。
对我们的key 设置 设置锁的过期时间,避免死锁的问题。
如何确保该锁是自己创建,被自己删除。
当我们在执行set的时候value为uuid,如果删除的uuid与该uuid值保持一致,则是自己获取的锁,可以被自己删除。
Redis key 过期了,但是业务还没有执行完毕如何处理;
当redis的过期了,应该采取续命设计,继续延长时间,如果续命多次还是失败的情况下,为了避免死锁的问题,应该主动释放锁和当前的事务操作。
相关核心代码:
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @ClassName RedisLockImpl
* @Author 蚂蚁课堂余胜军 QQ644064779 www.mayikt.com
* @Version V1.0
**/
@Component
@Slf4j
public class RedisLockImpl implements RedisLock {
private String redisLockKey = "mayiktLock";
private Long timeout = 3000L;
@Autowired
private StringRedisTemplate stringRedisTemplate;
private static Map<Thread, RedisLockInfo> lockCacheMap = new ConcurrentHashMap<>();
private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
@Override
public boolean tryLock() {
Thread cuThread = Thread.currentThread();
RedisLockInfo redisLockInfo = lockCacheMap.get(cuThread);
if (redisLockInfo != null && redisLockInfo.isState()) {
log.info("<<重入锁,直接从新获取锁成功>>");
return true;
}
Long startTime = System.currentTimeMillis();
for (; ; ) {
// 1.创建setnx
String lockId = UUID.randomUUID().toString();
Long expire = 30L;
Boolean getLock = stringRedisTemplate.opsForValue().setIfAbsent(redisLockKey, lockId, expire, TimeUnit.SECONDS);
if (getLock) {
log.info("<<获取锁成功>>");
// 将该锁缓存到Map集合中 实现重入锁
lockCacheMap.put(cuThread, new RedisLockInfo(lockId, cuThread, expire));
return true;
}
// 2.继续循环重试获取 判断是否已经超时重试
long endTime = System.currentTimeMillis();
if (endTime - startTime > timeout) {
return false;
}
//3.避免频繁重试 调用阻塞方法等待
try {
Thread.sleep(100);
} catch (Exception e) {
}
}
}
public RedisLockImpl() {
//开始定时任务实现续命
// this.scheduledExecutorService.scheduleAtFixedRate(new LifeExtensionThread(), 0, 5, TimeUnit.SECONDS);
}
@Override
public boolean releaseLock() {
log.info("<<释放锁成功>>");
RedisLockInfo redisLockInfo = lockCacheMap.get(Thread.currentThread());
if (redisLockInfo == null) {
return false;
}
boolean state = redisLockInfo.isState();
if (!state) {
return false;
}
String redisLockId = stringRedisTemplate.opsForValue().get(redisLockKey);
if (StringUtils.isEmpty(redisLockId)) {
return false;
}
if (!redisLockId.equals(redisLockInfo.getLockId())) {
log.info("<<非本线程自己的锁,无法删除>>");
return false;
}
Boolean delete = stringRedisTemplate.delete(redisLockKey);
if (!delete) {
return false;
}
return lockCacheMap.remove(redisLockKey) != null ? true : false;
}
/**
* 续命次数设计
*/
class LifeExtensionThread implements Runnable {
@Override
public void run() {
lockCacheMap.forEach((k, lockInfo) -> {
// 判断线程是否为终止状态,如果是为终止状态 则开始对key实现续命
Thread lockThread = lockInfo.getLockThread();
if (!lockInfo.isState() && lockThread.isInterrupted()) {
log.info("获取锁失败或者当前获取锁线程已经成功执行完方法");
return;
}
Integer lifeCount = lockInfo.getLifeCount();
//开始实现续命 为了避免续命为了避免续命多次还是无法释放锁 则应该回滚业务 主动释放锁
if (lifeCount > 3) {
// 移除不在继续续命
lockCacheMap.remove(lockThread);
// 回滚当前线程事务
// 停止该线程
return;
}
// 开始延长时间
stringRedisTemplate.expire(redisLockKey, lockInfo.getExpire(), TimeUnit.SECONDS);
});
}
}
}
Redis过期了,但是业务还没有执行完毕如何处理:
采用续命设计:
看门狗线程--续命线程。
获取锁成功后,应该提前开启一个续命的线程,
检测如果当前业务逻辑还没有执行完毕的情况下,应该不断的延迟过期key的时间。
续命设计: 死锁问题,限制次数。
如果续命多次的情况下,还没有释放锁,则,
1,主动回滚当前线程对应的事务。
2,主动释放锁,
3,主动将该线程通知。
全局续命,
开启一个全局的线程,续命所有的过期key,不合理。
局部续命(增量续命)
只要获取锁成功之后,就开启一个定时任务线程续命。
定时任务每次续命间隔的时间至少小于Redis过期key的时间。
每隔10s续命一次
Redisson设计:
key过期的时候30s
每隔10s续命一次,
当redis 过期了,应该采用续命设计,继续延长时间,如果续命多次还是失败的情况下,为了避免死锁的问题,应该主动释放锁和当前事务。
续命设计增量续命方式。
集群问题:
Redis集群,主节点宕机后如何处理?
Redis集群数据同步,采用异步的方式。
优点: 效率比较高。
缺点: 写的操作效率比较高,有可能存在数据不同步的问题。
zk集群数据同步,采用异步同步的方式。
优点: 保证每个子节点的数据的同步。
缺点: 每次做些的操作的效率比较低。
产生背景:
jvm01 连接到主的redis 做setnx操作的时候,异步将数据同步给redis,意味着jvm01获取锁成功,正好主redis宕机了,redis集群自动开启哨兵机制,就会选举从节点中某个redis为主redis,就会出现2个jvm获取锁成功,违背了分布式锁原子性特征。
思考如何解决:
1,redis集群数据同步改为同步的形式,效率偏低。
2,Redis红锁。
原理;、
1,构建Redis集群没有主从之分,Redis节点都可能为主节点;
2,获取锁的时候,当客户端(JVM)会向多个不同的redis服务端执行setnx操作,只要有一半的redis服务器执行成功,则表示锁成功,和zk数据同步思想一样。
zk数据同步是在zk领导节点实现
Redis是有客户端实现。
考虑问题:
需要设置连接redis超时时间5-50毫秒,时间越短越好,能够减少每个集群redis节点过期延迟。
考虑:
为了防止写入某个redis一直阻塞,需要考虑设置一个超时时间,5-50毫秒
如果无法写入的情况下,直接切换到下一个redis实例,为了防止客户端一直阻塞,影响获取锁的成本。
注意:redis集群个数最好是基数3.
实际上就是zk集群方式。
Redis集群中数据同步,采用异步的形式,当我们连接的主角redis做写的操作的时候,会异步的形式将数据同步给其他从redis,从而可以提高效率,使用ap模式
zk集群数据同步,采用同步模式,当我们连接到主的zk节点,做写的操作的时候,会同步的形式将数据同步给其他的zk从节点。有可能会阻塞,效率比较低,但是可以严格保证数据一致性的问题,使用cp模式
在使用redis实现分布式锁的时候,如果主的redis宕机后,有可能其他从的redis节点会选举主redis节点,有可能会发生多个jvm都会获取到该分布式锁,产生问题。
image.png如何解决该问题呢?
Redisson 采用红锁解决。
需要考虑的问题:
如何客户端给多个redis服务器设置key,总耗时时间>过期key如何处理?
RedLock(红锁)实现原理》
redis的分布式锁算法采用红锁机制,红锁需要至少三个以上Redis独立节点,这些节点相互之间可以不需要存在主从之分,每个redis保证独立即可。
脑裂:
获取锁:
客户单会在每个redis 实例创建锁,只需要满足一半的redis节点能够获取锁成功,就表示加锁成功。
该方案: 导致获取锁的时间成本可能非常高。
原理:
1.客户端使用相同的key,在从所有的Redis节点获取锁。
2,客户端需要设置超时时间。连接redis设置不成功的情况下立即切换到下一个Redis实例,防止一直阻塞。
3,客户端需要计算获取锁的总耗时,客户端至少需要有N/2+1节点获取锁成功,且总耗时时间小于锁的过期时间才能获取锁成功。
4,如果客户端最终获取锁失败,必须所有节点释放锁。
RedLock(红锁)环境搭建
构建Redis集群环境
不需要设置redis集群的主从关系。
网友评论