Redis setnx命令
格式:setnx key value
作用:将key的值设置成value,当且仅当key不存在,若给定的key已经存在,则setnx不需要任何动作
//使用演示
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent("key",value);
案例:修改库存
@RestController("/test")
public class test {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@RequestMapping("/deductStock")
public String deductStock() {
String lockKet = "lockKey";
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKet, "czy");
if (!result) {
return "error_code";
}
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣减成功,剩余库存:" + realStock);
}else{
System.out.println("扣减失败,库存不足");
}
stringRedisTemplate.delete(lockKet);
return "end";
}
}
程序问题:如果中间业务出现了问题,则锁就不会被释放,会造成程序阻塞的情况
上述问题修复:
@RequestMapping("/deductStock")
public String deductStock() {
String lockKet = "lockKey";
//加try catch finally
try {
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKet, "czy");
if (!result) {
return "error_code";
}
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣减成功,剩余库存:" + realStock);
} else {
System.out.println("扣减失败,库存不足");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//程序不管执行是否成功到最后释放锁
stringRedisTemplate.delete(lockKet);
}
return "end";
}
问题:如果在执行过程中运维kill -9(操作系统从内核级别强制杀死一个进程)终止了程序,此时锁也是没有释放的
上述问题修复:
@RequestMapping("/deductStock")
public String deductStock() {
String lockKet = "lockKey";
try {
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKet, "czy");
//加过期时间
stringRedisTemplate.expire(lockKet,10, TimeUnit.SECONDS);
if (!result) {
return "error_code";
}
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣减成功,剩余库存:" + realStock);
} else {
System.out.println("扣减失败,库存不足");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
stringRedisTemplate.delete(lockKet);
}
return "end";
}
问题:解决了死锁宕机,但如果在设置超时时间的时候kill -9,设置过期时间没生效,要保证下面两行代码原子性操作
上述问题修复:
//加过期时间
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKet, "czy");
stringRedisTemplate.expire(lockKet,10, TimeUnit.SECONDS);
//变为这种写法
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKet, "zhuge",10,TimeUnit.SECONDS);
问题:虽然解决了死锁宕机原子操作,但还有问题,就是线程1业务时间超过了加锁时间,然后锁释放了,线程2获取到这把锁接着执行,此时线程1执行完毕,然后执行解锁操作,此时解的是线程2的锁,然后线程3...线程4...以此类推,相当于无限套娃,没加锁(解锁的必须是当前加锁的人)
上述问题修复:
@RequestMapping("/deductStock")
public String deductStock() {
String lockKet = "lockKey";
String clientId = UUID.randomUUID().toString();
try {
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKet, clientId,10,TimeUnit.SECONDS);
if (!result) {
return "error_code";
}
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣减成功,剩余库存:" + realStock);
} else {
System.out.println("扣减失败,库存不足");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//去判断是不是当前线程,如果是则执行删锁操作(解锁的必须是当前加锁的人)
if(stringRedisTemplate.opsForValue().get(lockKet).equals(clientId)){
stringRedisTemplate.delete(lockKet);
}
}
return "end";
}
问题:还是有问题的,这个锁设置的时间依旧不妥,如果服务器1运行这段代码宕机了,其他的服务器要等这个锁失效了才能继续执行
解决思路(续命)
- 当线程加锁成功执行业务逻辑的时候,在后台整分线程,分线程中搞一个定时任务,定时任务每段时间检查主线程持有的这把锁,在redis中存不存在,如果还存在,则把这个锁(key)的时间重新延长30秒。(定时任务的时间不能超过锁设置的过期时间),当线程执行结束了,把锁解开了,定时任务扫描这把锁解开了,此时定时任务结束。
- 到最后还是要使用redisson开源框架
Redisson配置:
pom文件
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.6.5</version>
</dependency>
config配置
@Configuration
public class RedissonConfig {
@Bean
public Redisson redisson(){
Config config = new Config();
//单机
config.useSingleServer()
.setAddress("redis://127.0.0.1:6379")
.setDatabase(0);
//通过源码分析我们知道,默认情况下,加锁的时间是30秒.如果加锁的业务没有执行完,
// 那么到 30-10 = 20秒的时候,就会进行一次续期,把锁重置成30秒.
//那业务的机器万一宕机了呢?宕机了定时任务跑不了,就续不了期,那自然30秒之后锁就解开了呗
//该参数仅在没有leaseTimeout 参数定义的情况下获取锁时使用。如果看门狗没有将其延长到下一个 <code>lockWatchdogTimeout</code> 时间间隔,则锁定将在 <code>lockWatchdogTimeout</code> 之后过期。
//config.setLockWatchdogTimeout(3000L);//设置看门狗机制的默认锁释放时间,默认30秒
return (Redisson)(Redisson.create(config));
}
}
Redisson分布式锁的使用(简单的演示)
@Autowired
private Redisson redisson;
@RequestMapping("/deductStock")
public String deductStock() {
String lockKet = "lockKey";
RLock redissonLock = redisson.getLock(lockKet);
try {
//加锁
redissonLock.lock();
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣减成功,剩余库存:" + realStock);
} else {
System.out.println("扣减失败,库存不足");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//解锁
redissonLock.unLock();
}
return "end";
}
看门狗的失效问题
//源码显示如果设置了leaseTime,便不会走看门狗机制
private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Boolean ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining) {
scheduleExpirationRenewal(threadId);
}
}
})
// 具有Watch Dog 自动延期机制 默认续30s
lock.tryLock(10, TimeUnit.SECONDS); // 拿锁失败时会不停的重试
// 没有Watch Dog ,10s后自动释放
lock.lock(10, TimeUnit.SECONDS); // 尝试拿锁100s后停止重试,返回false
// 没有Watch Dog ,10s后自动释放
lock.tryLock(100, 10, TimeUnit.SECONDS); //2. 公平锁 保证 Redisson 客户端线程将以其请求的顺序获得锁
从CAP角度剖析redis和zookeeper锁架构异同
问题:在redis的主从模式或者哨兵模式或者集群下,如果线程1在主节点redis服务上加锁,服务器立刻返回给程序加锁成功,然后线程1开始业务逻辑执行,然后此时主节点Redis挂了,然后开始Redis开始选举主节点,此时线程2执行,然后去被选举的Redis服务器上去加锁,此时Redis服务器上是没有上次加的锁的(因为服务器之间数据同步是异步的,这个场景出现在同步还没有完成然后主节点Redis服务挂了),就再次加锁成功,造成线程安全问题。
- CAP理论:C 一致性 A 可用性 P分区容错性
- redis满足AP zookeeper满足CP
解决:
- zookeeper强一致性,如果你的key写入到主的zookeeper当中,他不会立刻去返回给客户端,而是先同步服务器的数据再返回(至少半数)。
- zookeeper主节点挂了没关系,它会从从节点中选举一个新的,而且zookeeper的底层集群架构原理有一个ZAB协议(原子广播协议),这个协议会帮你一定会选举某个节点会被选举成功。
- redis使用Redlock(红锁)即客户端发送加锁请求,超过半数redis节点(对等关系,相互没有依赖和主从关系)加锁成功才算加锁成功。
- 格外补偿机制。
红锁实现:
@RequestMapping("/redLock")
public String redLock() {
String lockKey = "product_01";
//这里需要自己实例化不同的Redis实例的redisson客户端连接,这里只是伪代码用一个redisson客户端简化了
RLock lock1 = redisson.getLock(lockKey);
RLock lock2 = redisson.getLock(lockKey);
RLock lock3 = redisson.getLock(lockKey);
/**
* 根据多个Rlock对象构建RedissonRedLock(最核心的差别就在这)
*/
RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
try {
/**
* waitTime:尝试获取锁的最大等待时间,超过这个时间,则认为获取锁失败
* leaseTime:锁的持有时间,超过这个时间锁会自动失效(这个值设置为大于业务处理时间,确保锁在有效期内业务能够完成)
*/
boolean res = redLock.tryLock(10, 30, TimeUnit.SECONDS);
if (res) {
//执行业务逻辑
}
} catch (Exception e) {
throw new RuntimeException("lock fail");
} finally {
//无论如何到最后一定要解锁
redLock.unlock();
}
return "end";
}
结论:
分布式锁在设计的语义角度适合并发是相违背的,本来是并行执行的,在底层给你排了个队变成串行执行。(在大促的情况下,并发特别高老板如果让你优化代码怎么办?答:把老板炒了),具体需要去学习Reids优化了。
网友评论