美文网首页
记录一次分布式锁的学习

记录一次分布式锁的学习

作者: 曹大大 | 来源:发表于2022-04-21 11:19 被阅读0次

    Redis setnx命令

    格式:setnx key value
    作用:将key的值设置成value,当且仅当key不存在,若给定的key已经存在,则setnx不需要任何动作

    //使用演示
    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent("key",value);
    

    案例:修改库存

    @RestController("/test")
    public class test {
    
        @Autowired
        private StringRedisTemplate stringRedisTemplate;
    
        @RequestMapping("/deductStock")
        public String deductStock() {
            String lockKet = "lockKey";
    
            Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKet, "czy");
            if (!result) {
                return "error_code";
            }
    
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
            if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock", realStock + "");
                System.out.println("扣减成功,剩余库存:" + realStock);
            }else{
                System.out.println("扣减失败,库存不足");
            }
            stringRedisTemplate.delete(lockKet);
            return "end";
        }
    }
    

    程序问题:如果中间业务出现了问题,则锁就不会被释放,会造成程序阻塞的情况

    上述问题修复:

    @RequestMapping("/deductStock")
        public String deductStock() {
            String lockKet = "lockKey";
            //加try catch finally
            try {
                Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKet, "czy");
                if (!result) {
                    return "error_code";
                }
    
                int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
                if (stock > 0) {
                    int realStock = stock - 1;
                    stringRedisTemplate.opsForValue().set("stock", realStock + "");
                    System.out.println("扣减成功,剩余库存:" + realStock);
                } else {
                    System.out.println("扣减失败,库存不足");
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //程序不管执行是否成功到最后释放锁
                stringRedisTemplate.delete(lockKet);
            }
            return "end";
        }
    

    问题:如果在执行过程中运维kill -9(操作系统从内核级别强制杀死一个进程)终止了程序,此时锁也是没有释放的

    上述问题修复:

     @RequestMapping("/deductStock")
        public String deductStock() {
            String lockKet = "lockKey";
    
            try {
                Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKet, "czy");
                //加过期时间
                stringRedisTemplate.expire(lockKet,10, TimeUnit.SECONDS);
                if (!result) {
                    return "error_code";
                }
    
                int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
                if (stock > 0) {
                    int realStock = stock - 1;
                    stringRedisTemplate.opsForValue().set("stock", realStock + "");
                    System.out.println("扣减成功,剩余库存:" + realStock);
                } else {
                    System.out.println("扣减失败,库存不足");
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                stringRedisTemplate.delete(lockKet);
            }
            return "end";
        }
    

    问题:解决了死锁宕机,但如果在设置超时时间的时候kill -9,设置过期时间没生效,要保证下面两行代码原子性操作

    上述问题修复:

    //加过期时间
    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKet, "czy");
     stringRedisTemplate.expire(lockKet,10, TimeUnit.SECONDS);
    //变为这种写法
    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKet, "zhuge",10,TimeUnit.SECONDS);
    

    问题:虽然解决了死锁宕机原子操作,但还有问题,就是线程1业务时间超过了加锁时间,然后锁释放了,线程2获取到这把锁接着执行,此时线程1执行完毕,然后执行解锁操作,此时解的是线程2的锁,然后线程3...线程4...以此类推,相当于无限套娃,没加锁(解锁的必须是当前加锁的人)

    上述问题修复:

    @RequestMapping("/deductStock")
        public String deductStock() {
            String lockKet = "lockKey";
    
            String clientId = UUID.randomUUID().toString();
            try {
                Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKet, clientId,10,TimeUnit.SECONDS);
                if (!result) {
                    return "error_code";
                }
    
                int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
                if (stock > 0) {
                    int realStock = stock - 1;
                    stringRedisTemplate.opsForValue().set("stock", realStock + "");
                    System.out.println("扣减成功,剩余库存:" + realStock);
                } else {
                    System.out.println("扣减失败,库存不足");
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //去判断是不是当前线程,如果是则执行删锁操作(解锁的必须是当前加锁的人)
                if(stringRedisTemplate.opsForValue().get(lockKet).equals(clientId)){
                    stringRedisTemplate.delete(lockKet);
                }
            }
            return "end";
        }
    

    问题:还是有问题的,这个锁设置的时间依旧不妥,如果服务器1运行这段代码宕机了,其他的服务器要等这个锁失效了才能继续执行

    解决思路(续命)

    • 当线程加锁成功执行业务逻辑的时候,在后台整分线程,分线程中搞一个定时任务,定时任务每段时间检查主线程持有的这把锁,在redis中存不存在,如果还存在,则把这个锁(key)的时间重新延长30秒。(定时任务的时间不能超过锁设置的过期时间),当线程执行结束了,把锁解开了,定时任务扫描这把锁解开了,此时定时任务结束。
    • 到最后还是要使用redisson开源框架

    Redisson配置:

    pom文件

             <dependency>
                <groupId>org.redisson</groupId>
                <artifactId>redisson</artifactId>
                <version>3.6.5</version>
            </dependency>
    

    config配置

    @Configuration
    public class RedissonConfig {
    
        @Bean
        public Redisson redisson(){
            Config config = new Config();
            //单机
            config.useSingleServer()
                    .setAddress("redis://127.0.0.1:6379")
                    .setDatabase(0);
    
            //通过源码分析我们知道,默认情况下,加锁的时间是30秒.如果加锁的业务没有执行完,
            // 那么到 30-10 = 20秒的时候,就会进行一次续期,把锁重置成30秒.
            //那业务的机器万一宕机了呢?宕机了定时任务跑不了,就续不了期,那自然30秒之后锁就解开了呗
            //该参数仅在没有leaseTimeout 参数定义的情况下获取锁时使用。如果看门狗没有将其延长到下一个 <code>lockWatchdogTimeout</code> 时间间隔,则锁定将在 <code>lockWatchdogTimeout</code> 之后过期。
            //config.setLockWatchdogTimeout(3000L);//设置看门狗机制的默认锁释放时间,默认30秒
            return (Redisson)(Redisson.create(config));
        }
    }
    

    Redisson分布式锁的使用(简单的演示)

    @Autowired
    private Redisson redisson;
    
    @RequestMapping("/deductStock")
        public String deductStock() {
            String lockKet = "lockKey";
    
            RLock redissonLock = redisson.getLock(lockKet);
            try {
                //加锁
                redissonLock.lock();
                int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
                if (stock > 0) {
                    int realStock = stock - 1;
                    stringRedisTemplate.opsForValue().set("stock", realStock + "");
                    System.out.println("扣减成功,剩余库存:" + realStock);
                } else {
                    System.out.println("扣减失败,库存不足");
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //解锁
                redissonLock.unLock();
            }
            return "end";
        }
    

    看门狗的失效问题

    //源码显示如果设置了leaseTime,便不会走看门狗机制
    private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId) {
            if (leaseTime != -1) {
                return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
            }
            RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
            ttlRemainingFuture.addListener(new FutureListener<Boolean>() {
                @Override
                public void operationComplete(Future<Boolean> future) throws Exception {
                    if (!future.isSuccess()) {
                        return;
                    }
     
                    Boolean ttlRemaining = future.getNow();
                    // lock acquired
                    if (ttlRemaining) {
                        scheduleExpirationRenewal(threadId);
                    }
                }
            })
    

    // 具有Watch Dog 自动延期机制 默认续30s
    lock.tryLock(10, TimeUnit.SECONDS); // 拿锁失败时会不停的重试

    // 没有Watch Dog ,10s后自动释放
    lock.lock(10, TimeUnit.SECONDS); // 尝试拿锁100s后停止重试,返回false

    // 没有Watch Dog ,10s后自动释放
    lock.tryLock(100, 10, TimeUnit.SECONDS); //2. 公平锁 保证 Redisson 客户端线程将以其请求的顺序获得锁

    从CAP角度剖析redis和zookeeper锁架构异同

    问题:在redis的主从模式或者哨兵模式或者集群下,如果线程1在主节点redis服务上加锁,服务器立刻返回给程序加锁成功,然后线程1开始业务逻辑执行,然后此时主节点Redis挂了,然后开始Redis开始选举主节点,此时线程2执行,然后去被选举的Redis服务器上去加锁,此时Redis服务器上是没有上次加的锁的(因为服务器之间数据同步是异步的,这个场景出现在同步还没有完成然后主节点Redis服务挂了),就再次加锁成功,造成线程安全问题。

    1. CAP理论:C 一致性 A 可用性 P分区容错性
    2. redis满足AP zookeeper满足CP

    解决:

    • zookeeper强一致性,如果你的key写入到主的zookeeper当中,他不会立刻去返回给客户端,而是先同步服务器的数据再返回(至少半数)。
    • zookeeper主节点挂了没关系,它会从从节点中选举一个新的,而且zookeeper的底层集群架构原理有一个ZAB协议(原子广播协议),这个协议会帮你一定会选举某个节点会被选举成功。
    • redis使用Redlock(红锁)即客户端发送加锁请求,超过半数redis节点(对等关系,相互没有依赖和主从关系)加锁成功才算加锁成功。
    • 格外补偿机制。

    红锁实现:

    @RequestMapping("/redLock")
        public String redLock() {
            String lockKey = "product_01";
            //这里需要自己实例化不同的Redis实例的redisson客户端连接,这里只是伪代码用一个redisson客户端简化了
            RLock lock1 = redisson.getLock(lockKey);
            RLock lock2 = redisson.getLock(lockKey);
            RLock lock3 = redisson.getLock(lockKey);
    
            /**
             * 根据多个Rlock对象构建RedissonRedLock(最核心的差别就在这)
             */
            RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
            try {
                /**
                 * waitTime:尝试获取锁的最大等待时间,超过这个时间,则认为获取锁失败
                 * leaseTime:锁的持有时间,超过这个时间锁会自动失效(这个值设置为大于业务处理时间,确保锁在有效期内业务能够完成)
                 */
                boolean res = redLock.tryLock(10, 30, TimeUnit.SECONDS);
                if (res) {
                    //执行业务逻辑
                }
            } catch (Exception e) {
                throw new RuntimeException("lock fail");
            } finally {
                //无论如何到最后一定要解锁
                redLock.unlock();
            }
            return "end";
        }
    

    结论:

    分布式锁在设计的语义角度适合并发是相违背的,本来是并行执行的,在底层给你排了个队变成串行执行。(在大促的情况下,并发特别高老板如果让你优化代码怎么办?答:把老板炒了),具体需要去学习Reids优化了。

    相关文章

      网友评论

          本文标题:记录一次分布式锁的学习

          本文链接:https://www.haomeiwen.com/subject/dtngertx.html