美文网首页
Redis高并发架构实战

Redis高并发架构实战

作者: YonchanLew | 来源:发表于2021-05-05 17:57 被阅读0次

    (1)先来一个小案例作为切入点

    /*
    这里记为代码一
    */
    @RestController
    public class IndexController {
    
        @Autowired
        private Redisson redisson;
        @Autowired
        private StringRedisTemplate stringRedisTemplate;    //组件spring-boot-starter-data-redis
    
        @RequestMapping("/deduct_stock")
        public String deductStock(){
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));   //可以理解为jedis.get("stock")
            if(stock > 0){
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("Stock", realStock + "");     //可以理解为jedis.set(key,value)
                System.out.println("扣减成功,剩余库存:" + realStock);
            }else{
                System.out.println("扣减失败,库存不足");
            }
    
            return "end";
        }
    }
    

    然后在redis中搞一个库存为200


    现在很明显,代码一 存在线程安全问题,会有可能读到都是200,然后都减1后设置为199,就不对了。
    很多同学都会想到加一把锁

    (2)synchronized

    /*
    代码二
    */
    public String deductStock(){
        synchronized (this){
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));   //可以理解为jedis.get("stock")
            if(stock > 0){
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("Stock", realStock + "");     //可以理解为jedis.set(key,value)
                System.out.println("扣减成功,剩余库存:" + realStock);
            }else{
                System.out.println("扣减失败,库存不足");
            }
        }
    
        return "end";
    }
    

    这样的确是只能有一个线程执行操作,确实是线程安全了。但是它只能在单机环境下运行,只能锁住一个tomcat,分布式的时候就不行了。


    (3)分布式锁

    这时,应该考虑分布式锁。SETNX(SET if Not eXists)。和set的区别是:
    set tuling A
    set tuling B
    结果会是B
    setnx tuling A
    setnx tuling B
    结果会是A

    /*
    代码三
    */
    public String deductStock(){
    
        String lockKey = "product_101";
        //如果返回false,说明redis中有这个key了,不做任何操作。如果返回true说明执行这个命令之前没有这个key,并设置成功了
        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "tuling");     //就理解为jedis.setnx(key,value)
        if(!result){
            return "error_code";    //给前端错误码,当前系统繁忙,请稍后再试
        }
    
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));   //可以理解为jedis.get("stock")
        if(stock > 0){
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("Stock", realStock + "");     //可以理解为jedis.set(key,value)
            System.out.println("扣减成功,剩余库存:" + realStock);
        }else{
            System.out.println("扣减失败,库存不足");
        }
    
        stringRedisTemplate.delete(lockKey);
    
        return "end";
    }
    

    redis那边是单线程操作的,会排队,只有排队头的可以设置成功,后面的设置不成功,这样入门级的分布式锁设计完了。大家想想还有没有问题?
    这个时候还是存在问题,当获取到锁的线程有异常,导致没法删除key,就会导致其他线程获取不到锁,就算能捕获异常,但如果是系统挂了呢,运维重启呢

    /*
    代码四
    */
    public String deductStock(){
    
        String lockKey = "product_101";
        //如果返回false,说明redis中有这个key了,不做任何操作。如果返回true说明执行这个命令之前没有这个key,并设置成功了
        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "tuling");     //就理解为jedis.setnx(key,value)
        if(!result){
            return "error_code";    //给前端错误码,当前系统繁忙,请稍后再试
        }
    
        try{
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));   //可以理解为jedis.get("stock")
            if(stock > 0){
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("Stock", realStock + "");     //可以理解为jedis.set(key,value)
                System.out.println("扣减成功,剩余库存:" + realStock);
            }else{
                System.out.println("扣减失败,库存不足");
            }
        }finally {
            stringRedisTemplate.delete(lockKey);
        }
    
        return "end";
    }
    

    示例 代码四 还是存在问题,大家先想想解决方法。

    (4)锁超时

    这样的话,可以加一个超时时间来解决,给key一个超时时间,即使系统挂了,一段时间之后,其他机器还是能正常访问

    /*
    代码五
    */
    public String deductStock(){
    
        String lockKey = "product_101";
        //如果返回false,说明redis中有这个key了,不做任何操作。如果返回true说明执行这个命令之前没有这个key,并设置成功了
        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "tuling");     //就理解为jedis.setnx(key,value)
        stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
    
        if(!result){
            return "error_code";    //给前端错误码,当前系统繁忙,请稍后再试
        }
    
        try{
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));   //可以理解为jedis.get("stock")
            if(stock > 0){
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("Stock", realStock + "");     //可以理解为jedis.set(key,value)
                System.out.println("扣减成功,剩余库存:" + realStock);
            }else{
                System.out.println("扣减失败,库存不足");
            }
        }finally {
            stringRedisTemplate.delete(lockKey);
        }
    
        return "end";
    }
    

    大家想想 代码五 还有问题吗?

    (5)加锁操作原子性

    假设设置了key之后,正准备设置超时时间,但系统挂了,那还是回到之前的问题了,得保证原子性。应该使用setIfAbsent的其他重载方法,有一个是可以同时设置超时时间的

    /*
    代码六
    */
    public String deductStock(){
    
        String lockKey = "product_101";
        //如果返回false,说明redis中有这个key了,不做任何操作。如果返回true说明执行这个命令之前没有这个key,并设置成功了
    //        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "tuling");     //就理解为jedis.setnx(key,value)
    //        stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
    
        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "tuling", 10, TimeUnit.SECONDS);
    
        if(!result){
            return "error_code";    //给前端错误码,当前系统繁忙,请稍后再试
        }
    
        try{
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));   //可以理解为jedis.get("stock")
            if(stock > 0){
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("Stock", realStock + "");     //可以理解为jedis.set(key,value)
                System.out.println("扣减成功,剩余库存:" + realStock);
            }else{
                System.out.println("扣减失败,库存不足");
            }
        }finally {
            stringRedisTemplate.delete(lockKey);
        }
    
        return "end";
    }
    

    大家思考一下,代码六 还有没有问题?
    遇到高并发的时候,通常执行会比较慢,慢执行啊,中间sql语句执行很慢这样,假设执行完这个方法需要15秒,当线程执行了10秒的时候,由于设置了超时时间是10秒,并且是高并发场景,这个时候key就删除了,另外的线程就获取了锁

    这样就相当于锁永久失效。虽然把过期时间放大是可以避免,但还是无法彻底解决问题。
    本质是自己加的锁被别人解掉了,所以解决就是锁只能自己解锁

    /*
    代码七
    */
    public String deductStock(){
    
        String lockKey = "product_101";
        //如果返回false,说明redis中有这个key了,不做任何操作。如果返回true说明执行这个命令之前没有这个key,并设置成功了
    //        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "tuling");     //就理解为jedis.setnx(key,value)
    //        stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
    
        String clientId = UUID.randomUUID().toString();
    
        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 10, TimeUnit.SECONDS);
    
        if(!result){
            return "error_code";    //给前端错误码,当前系统繁忙,请稍后再试
        }
    
        try{
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));   //可以理解为jedis.get("stock")
            if(stock > 0){
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("Stock", realStock + "");     //可以理解为jedis.set(key,value)
                System.out.println("扣减成功,剩余库存:" + realStock);
            }else{
                System.out.println("扣减失败,库存不足");
            }
        }finally {
            if(clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))){
                stringRedisTemplate.delete(lockKey);
            }
        }
    
        return "end";
    }
    

    代码七 按上面的例子,锁是自己过期的,这代码只是能保证线程1无法删除线程2的锁,但线程1和线程2还是同时在跑啊。这个时间还有问题,但是先不管,先放放,因为不是想要引申的内容,要继续思考这个代码还有除时间外的什么其他问题?
    就是finally中的两行代码非原子,写并发代码和写高并发代码时的区别,应该要习惯性的在代码之间空几行,表明这里执行有时间差,非原子。
    假设执行判断完clientId确实是等于当前线程的value,假设这时刚好是9.9秒,突然发生卡顿,但这个if判断已经是true了,正准备delete的时候,卡顿了,这时已经过了10秒,线程2已经获取了锁,然后线程1执行delete,又出问题了,仍然是线程1删除了线程2的锁。
    怎么处理?

    (6)锁续命

    锁续命:通常是这样处理的,有一个分线程定时任务,用来监测线程还是否持有锁,还持有的就延长锁的过期时间,例如锁超时是30秒,那么分线程每10秒判断一下,线程还是否持有锁,还持有就更新过期时间,不能说是延长,是按当前时间又重新设置30秒过期,当不持有了,定时任务就结束,分线程也结束。
    redisson:操作redis的客户端,有很多分布式功能,其中就有分布式锁。想起了吧?代码一 中就已经引入了redisson

    /*
    代码八
    */
    public String deductStock(){
    
        String lockKey = "product_101";
        //如果返回false,说明redis中有这个key了,不做任何操作。如果返回true说明执行这个命令之前没有这个key,并设置成功了
    //        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "tuling");     //就理解为jedis.setnx(key,value)
    //        stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
    
        /*String clientId = UUID.randomUUID().toString();
        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 10, TimeUnit.SECONDS);
        if(!result){
            return "error_code";    //给前端错误码,当前系统繁忙,请稍后再试
        }*/
    
        RLock redissonLock = redisson.getLock(lockKey);
    
        try{
            //加锁
            redissonLock.lock();    //理解为执行了setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS)
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));   //可以理解为jedis.get("stock")
            if(stock > 0){
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("Stock", realStock + "");     //可以理解为jedis.set(key,value)
                System.out.println("扣减成功,剩余库存:" + realStock);
            }else{
                System.out.println("扣减失败,库存不足");
            }
        }finally {
            redissonLock.unlock();
            /*if(clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))){
                stringRedisTemplate.delete(lockKey);
            }*/
        }
    
        return "end";
    }
    

    redisson加锁核心lua脚本


    KEYS[1]:product_101
    ARGV[2]:getLockName(threadId)
    ARGV[1]:internalLockLeaseTime(初始化是30秒)
    可以看到第250行和251行,就相当于 代码五 中的

    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "tuling");     //就理解为jedis.setnx(key,value)
    stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
    

    而这两行代码是不具有原子性的,线程不安全。Lua脚本可以保证原子性

    锁续命:

    /*
    https://github.com/redisson/redisson/blob/redisson-3.6.5/redisson/src/main/java/org/redisson/RedissonLock.java
    redisson-3.6.5 RedissonLock.java,其他版本会不太不一样,但原理应该不变吧
    */
        private void scheduleExpirationRenewal(final long threadId) {
            if (expirationRenewalMap.containsKey(getEntryName())) {
                return;
            }
    
            Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
                @Override
                public void run(Timeout timeout) throws Exception {
    
                    RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                                "return 1; " +
                            "end; " +
                            "return 0;",
                              Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    
                    future.addListener(new FutureListener<Boolean>() {
                        @Override
                        public void operationComplete(Future<Boolean> future) throws Exception {
                            expirationRenewalMap.remove(getEntryName());
                            if (!future.isSuccess()) {
                                log.error("Can't update lock " + getName() + " expiration", future.cause());
                                return;
                            }
    
                            if (future.getNow()) {
                                // reschedule itself
                                scheduleExpirationRenewal(threadId);
                            }
                        }
                    });
                }
            }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
            if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
                task.cancel();
            }
        }
    

    它延迟 internalLockLeaseTime / 3 秒执行run方法,为它重新设置expire为 internalLockLeaseTime
    commandExecutor.evalWriteAsync返回了一个future,然后future又添加监听器,最后执行当前方法scheduleExpirationRenewal(threadId);,就是一直重复续命,又再延迟调用,相当于定时任务。

    到目前为止,基本就没有什么坑了,redisson已经是填了很多坑,可以放心使用 代码八 进行实现。
    但是,还有点问题,假设有多个请求在执行redissonLock.lock()加锁,只能有一个线程在处理,其他都得等着,系统就会很慢,存在性能问题,该怎么优化能做到双十一能用的级别?

    (7)zookeeper

    redis一般都是有主从架构的,基本不会是单机使用


    redis主节点马上告诉客户端加锁成功,线程1就执行业务代码逻辑,然后redis准备把key同步给从节点时候,结果主节点挂了,某个从节点选举成为新的Master主节点,来了个线程3访问新的主节点加锁,线程3就发现没有product_101这个key,又可以加锁成功了,线程1业务逻辑还没执行完毕,线程3就开始执行,就又出现了问题


    主从架构锁失效的问题,可以用zookeeper来实现分布式锁,和redis类似,是树形结构。redis更多的实现是AP架构,zookeeper更多的实现是CAP架构。
    zookeeper的话,当要写一个key,不是就立即返回成功的,会先把key同步给集群的其他节点,子节点会返回同步成功的信息,主节点会判断是否已经有超过半数的子节点都同步成功,这时才告诉客户端成功了,是为了保证一致性,牺牲了及时响应,但它能保证那些已经同步了子节点才能成功leader,redis就没有这个机制,也就是线程3再来请求leader的时候,必然会有key,加锁就不成功,解决了上述问题。
    但如果不使用zookeeper,就是要使用redis来解决呢?(因为redis的并发比zookeeper高不少)如果要高并发,就用redis,就有上述主从锁问题,如果要保证健壮性就用zookeeper,但牺牲了并发数。

    (8)Redlock

    硬是要使用redis的话,看看Redlock


    redis没有主从关系,是对等的,往每个节点发送加锁命令,只有超过半数的节点返回成功才认为客户端加锁成功,和zookeeper原理类似。但这种方式不推荐,原来是一个redis节点,现在搞多个,要半数加锁成功,对我们加锁性能受一定影响,这样的话,还不如用zookeeper,因为redlock还有不少问题。

    @RequestMapping("/redlock")
    public String redlock(){
    
        String lockKey = "product_101";
        RLock lock1 = redisson.getLock(lockKey);
        RLock lock2 = redisson.getLock(lockKey);
        RLock lock3 = redisson.getLock(lockKey);
    
        //根据多个RLock对象构件RedissonRedLock
        RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
    
        try{
            /*
            * waitTimeout 尝试获取锁的最大等待时间,超过这个数,则认为获取锁失败
            * leaseTime   锁的持有时间,超过这个时间锁会自动失败(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)
            * */
            boolean res = redLock.tryLock(10, 30, TimeUnit.SECONDS);
            if(res){
                //成功获取锁,处理业务
            }
        }catch (Exception e) {
            throw  new RuntimeException("lock fail");
        }finally {
            //无论如何,最后都要解锁
            redLock.unlock();
        }
    
        return "end";
    }
    

    回到 代码八,redissonLock.lock();会导致其他线程等待,也就是分布式锁把并行请求变串行化执行了。那么如何提升分布式锁性能?

    (9)分段锁

    模仿ConcurrentHashMap,分段锁。
    假设product_101的数量是200,那么可以分十段,
    product_101_1=20
    product_101_2=20
    product_101_3=20
    ......
    product_101_10=20
    200个库存分10个key存到redis中去,让每个线程去减不同的段位的库存,如果不够减的话就减一下个段位,实现的话有点难,但可以理解这个思想,就不再去扩展了。

    (10)缓存数据库双写不一致

    接下来说redis作为缓存使用的时候,常见问题有:缓存无底洞、缓存穿透、缓存雪崩、缓存失效、热点key倾斜、热点key重建、缓存数据库双写不一致。
    这里针对缓存数据库双写不一致的问题说一下。
    什么是缓存数据库双写不一致?

    看上去线程1写数据库,然后更新缓存,线程2写数据库,然后更新缓存,没有什么问题,但如果线程1操作较慢(小卡顿)呢?


    有些人就会说,通常不会直接更新缓存,而是把缓存删掉,即更新就删缓存,读数据的时候再设置缓存,的确这样是比较好,因为每次写完就更新缓存的话,如果不读缓存,相当于白更新。


    但这样还是有问题


    还有什么方法解决?

    (11)解决双写不一致的方法

    延迟双删:删缓存删两次,删除之后sleep(一段时间)后再删一次
    但这种方法只能说是减少,并不能解决问题,并且还让所有的写请求都得sleep一段时间

    内存队列:用hash运算把操作路由到某个队列中顺序执行。是可以解决,但复杂,写不好很可能有性能问题或是bug

    还有没有其他解决方法?

    问题的本质就是操作过程中不是原子性,如果(写数据库-删除缓存)是不可分割的操作,(查缓存-查数据库-更新缓存)是不可分割的操作,即在操作前加分布式锁,操作完后解锁,所有线程的操作为队列,把多个并发执行的线程串行化


    直接这样上锁,性能肯定是有问题的,怎么优化?

    (12)读多写少的情况

    直接上分布式锁会有问题,使用读写锁
    读写锁:读操作加读锁,写操作加写锁,读操作不互斥,写锁跟读锁、写锁跟写锁互斥。
    由于很多系统都是读多写少的情况,所以可以提高性能

    /*
    代码九
    */
    @RequestMapping("/get_stock")
    public String getStock(@RequestParam("clientId") Long clientId) throws InterruptedException{
    
        String lockKey = "product_101";
    
        RReadWriteLock readWriteLock = redisson.getReadWriteLock(lockKey);
        RLock rLock = readWriteLock.readLock();
    
        rLock.lock();
        System.out.println("获取读锁成功:client="+clientId);
        String stock = stringRedisTemplate.opsForValue().get("stock");
        if(StringUtils.isEmpty(stock)){
            System.out.println("查询数据库库存为10。。。");
            Thread.sleep(5000);
            stringRedisTemplate.opsForValue().set("stock", 10);
        }
        rLock.unlock();
        System.out.println("释放读锁成功:client="+clientId);
    
        return "end";
    }
    
    @RequestMapping("/update_stock")
    public String updateStock(@RequestParam("clientId") Long clientId) throws InterruptedException{
    
        String lockKey = "product_101";
    
        RReadWriteLock readWriteLock = redisson.getReadWriteLock(lockKey);
        RLock writeLock = readWriteLock.writeLock();
    
        writeLock.lock();
        System.out.println("获取写锁成功:client="+clientId);
        System.out.println("修改商品101的数据库库存为6。。。");
        stringRedisTemplate.delete("stock");
        Thread.sleep(5000);
        writeLock.unlock();
        System.out.println("释放写锁成功:client="+clientId);
    
        return "end";
    }
    

    原理就是lua脚本为每个key设置一个mode的值来记录是read还是write。
    RedissonWriteLock.java


    但如果读多写也多的情况呢,怎么处理?
    不采用上面的方法,仍然是给缓存过期时间,然后操作的时候直接操作数据库。例如在页面上看到的库存,其实很多时候都是和数据库的值不一致的,就是为了实现高并发,又要用数据库又要用缓存,只能牺牲一致性,牺牲一致性其实关系并不大,想一想,假设一致的话,加入购物车、下订单,中间是有时间差的,这个时候可能就没有了库存了,对用户来说是不一致,但对程序来说,程序以及保证了一致,只是意义不大,所以牺牲一致性来提高性能。假设过期时间是一分钟,那在这一分钟内可能是不一致,但如果一分钟后库存不变,又读取更新了缓存,这个时候就变一致了,只需要确保在下单的时候是用db的数据即可。

    (13)读多写多的情况

    如果是读多写多,又要保证缓存数据库一致性,怎么办?
    对读多写多的场景,就不应该用缓存,直接操作数据库就好了,对吧。
    也有方法既使用缓存,又应对读多写多的场景,中间件canal。后面就学不着了,需要报课。。。。以后再看看

    相关文章

      网友评论

          本文标题:Redis高并发架构实战

          本文链接:https://www.haomeiwen.com/subject/dvicdltx.html