美文网首页分布式@架构师编程技巧分布式服务
Redis实现高并发扣减库存,秒杀功能(可线上使用)

Redis实现高并发扣减库存,秒杀功能(可线上使用)

作者: 运气爆棚lsw | 来源:发表于2023-02-18 13:51 被阅读0次

    常见秒杀方案设计:
    1.数据库行锁
    2.分布式锁+分段锁提升效率
    3.Redis单线程机制,将库存放在Redis里面使用
    set count 1000
    decrby count 1 扣减库存,返回正数就可扣减库存
    4.Redis+Lua脚本,查询库存和扣减库存放到Lua脚本里面去执行
    这是一个原子操作,解决高并发下线程安全问题
    总结:简单利用redis的LUA脚本功能,一次性操作,实现原子性

    Redis+Lua实现高并发秒杀功能

    1、导入相关依赖
            <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
                <exclusions>
                    <exclusion>
                        <groupId>io.lettuce</groupId>
                        <artifactId>lettuce-core</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>3.12.0</version>
            </dependency>
    
           <dependency>
                <groupId>org.redisson</groupId>
                <artifactId>redisson</artifactId>
                <version>3.13.4</version>
            </dependency>
    
    
    2、RedisConfig Bean初始化配置
    
    import com.fasterxml.jackson.annotation.JsonAutoDetect;
    import com.fasterxml.jackson.annotation.PropertyAccessor;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import org.apache.commons.lang3.StringUtils;
    import org.redisson.config.SingleServerConfig;
    
    import org.redisson.Redisson;
    import org.redisson.api.RedissonClient;
    import org.redisson.config.Config;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
    import org.springframework.data.redis.serializer.StringRedisSerializer;
    
    @Configuration
    public class RedisConfig {
    
        @Value("${spring.redis.host}")
        private String host;
    
        @Value("${spring.redis.port}")
        private String port;
    
        @Value("${spring.redis.password}")
        private String password;
    
        @Bean(destroyMethod = "shutdown")
        public RedissonClient redissonClient() {
            final Config config = new Config();
    
            SingleServerConfig singleServerConfig = config.useSingleServer()
                    .setAddress("redis://" + host + ":" + port);
            if (StringUtils.isNotBlank(password)) {
                singleServerConfig.setPassword(password);
            }
            System.out.println("------------- redisson -----------------------");
            System.out.println(config.getTransportMode());
            return Redisson.create(config);
        }
    
    
        /**
         * 重写Redis序列化方式,使用Json方式:
         * 数据存储到Redis的时候,我们的键(key)和值(value)都是通过Spring提供的Serializer序列化到数据库的
         * RedisTemplate默认使用的是JdkSerializationRedisSerializer
         * StringRedisTemplate默认使用的是StringRedisSerializer
         * <p>
         * Spring Data JPA为我们提供了下面的Serializer:
         * GenericToStringSerializer、Jackson2JsonRedisSerializer
         * JacksonJsonRedisSerializer、JdkSerializationRedisSerializer、OxmSerializer、StringRedisSerializer。
         * 在此我们将自己配置RedisTemplate并定义Serializer
         */
        @Bean
        public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
            RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
            redisTemplate.setConnectionFactory(redisConnectionFactory);
    
            Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer =
            new Jackson2JsonRedisSerializer<Object>(Object.class);
    
            ObjectMapper om = new ObjectMapper();
            om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            jackson2JsonRedisSerializer.setObjectMapper(om);
    
            // 设置值(value)的序列化采用Jackson2JsonRedisSerializer。
            redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
            redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
            // 设置键(key)的序列化采用StringRedisSerializer。
            redisTemplate.setKeySerializer(new StringRedisSerializer());
            redisTemplate.setHashKeySerializer(new StringRedisSerializer());
    
            redisTemplate.afterPropertiesSet();
            return redisTemplate;
        }
    
    }
    
    3、Redis+Lua脚本实现秒杀扣减库存
    public interface IStockCallback {
    
        /**
         * 扣减Redis库存
         *
         * @param batchNo 商品唯一编号
         * @param expire  过期时间
         * @param num     扣减库存数量
         * @return 剩余库存数量
         */
        long getStock(String batchNo, long expire, int num);
    
        /**
         * 初始化库存
         *
         * @param commodityId 业务
         * @return 库存
         */
        int initStock(long commodityId);
    }
    

    下面是下单扣减库存业务的代码块,在扣库存的时候,不能超发,也不能扣到负数,
    然后再同步到MYSQL里,初始化库存数量,这个可以从DB里取实际的量,
    LUA脚本保证原子性,查询剩余库存和扣减逻辑是一个原子性操作

    @Slf4j
    @Service
    public class RedisStockService implements IStockCallback {
    
        /**
         * 库存还未初始化
         */
        public static final long UNINITIALIZED_STOCK = -3L;
        /**
         * 判断商品是否存在KEY标识
         */
        public static final long EXIST_FLAG = -2L;
        /**
         * 配置库存Redis缓存Key前缀
         */
        public static final String REDIS_KEY = "REDIS_KEY:STOCK:";
        /**
         * 执行扣库存的Lua脚本
         */
        public static final String STOCK_LUA;
        /**
         * Redisson 客户端
         */
        @Resource
        private RedissonClient redissonClient;
        /**
         * Redis 客户端
         */
        @Resource
        private RedisTemplate<String, Integer> redisTemplate;
    
        static {
            /*
             * @desc 扣减库存Lua脚本
             * 库存(stock)-1:表示不限库存
             * 库存(stock) 0:表示没有库存
             * 库存(stock)大于0:表示剩余库存
             *
             * @params 库存key
             * @return
             *      -3:库存未初始化
             *      -2:库存不足
             *      -1:不限库存
             *      大于等于0: 剩余库存(扣减之后剩余的库存), 直接返回-1
             */
            final StringBuilder strBuilder = new StringBuilder();
            strBuilder.append("if (redis.call('exists', KEYS[1]) == 1) then");
            strBuilder.append("    local stock = tonumber(redis.call('get', KEYS[1]));");
            strBuilder.append("    local num = tonumber(ARGV[1]);");
            strBuilder.append("    if (stock == -1) then");
            strBuilder.append("        return -1;");
            strBuilder.append("    end;");
            strBuilder.append("    if (stock >= num) then");
            strBuilder.append("        return redis.call('incrby', KEYS[1], 0 - num);");
            strBuilder.append("    end;");
            strBuilder.append("    return -2;");
            strBuilder.append("end;");
            strBuilder.append("return -3;");
            STOCK_LUA = strBuilder.toString();
        }
    
        /**
         * 执行扣减库存业务
         *
         * @param batchNo 库存唯一标识
         * @param expire  库存过期时间
         * @param num     扣减库存的数量
         * @return 返回扣减库存后剩余库存数量
         */
        @Override
        public long getStock(String batchNo, long expire, int num) {
            // 商品库存唯一标识
            final String key = REDIS_KEY + batchNo;
    
            /*
             * 从redis中获取key对应的过期时间;
             * 1、如果该值有过期时间,就返回相应的过期时间;
             * 2、如果该值没有设置过期时间,就返回-1;
             * 3、如果没有该值,就返回-2;
             *
             * 注意:这里为了方便模拟,实际线上。通过缓存预热的方式通过DB查询实际的库存数据
             * 添加到Redis中
             */
            Long expire1 = redisTemplate.opsForValue().getOperations().getExpire(key);
            if (Objects.equals(EXIST_FLAG, expire1)) {
                redisTemplate.opsForValue().set(key, 100, expire, TimeUnit.SECONDS);
                System.out.println("Redis无初始库存,设置库存数据 = " + expire1);
            }
    
            // 初始化商品库存
            Integer stock = redisTemplate.opsForValue().get(key);
    
            // 设置分布式锁
            final RLock rLock = redissonClient.getLock(REDIS_KEY + ":LOCK");
            try {
                if (rLock.tryLock(1, TimeUnit.SECONDS)) {
                    stock = redisTemplate.opsForValue().get(key);
                    log.info("--- 当前Key:[{}]加锁成功,当前最新库存:{}---", key, stock);
                    // 调一次扣库存的操作
                    Long stock1 = stock(key, num);
                    System.out.println("stock1 = " + stock1);
    
                    stock = redisTemplate.opsForValue().get(key);
                    int batchNoLock = Objects.requireNonNull(stock);
                    log.info("--- 当前剩余库存:{}", batchNoLock);
                }
            } catch (Exception e) {
                log.error(e.getMessage(), e);
            } finally {
                if (rLock != null && rLock.isHeldByCurrentThread()) {
                    rLock.unlock();
                }
            }
            return stock;
        }
    
        /**
         * 扣库存这步特别注意,分布式连接有问题,需要依赖包里,去掉lettuce组件
         * 初始化库存数量,这个可以从DB里取实际的量
         *
         * @param key 库存key
         * @param num 扣减库存数量
         * @return 扣减之后剩余的库存【-3:库存未初始化; -2:库存不足; -1:不限库存; 大于等于0:扣减库存之后的剩余库存】
         */
        private Long stock(String key, int num) {
            // 脚本里的KEYS参数
            List<String> keys = new ArrayList<>();
            keys.add(key);
    
            // 脚本里的ARGV参数
            List<String> argvList = new ArrayList<>();
            argvList.add(Integer.toString(num));
    
            // 执行扣减库存LUA脚本
            return redisTemplate.execute((RedisCallback<Long>) connection -> {
                Object nativeConnection = connection.getNativeConnection();
                // 集群模式
                if (nativeConnection instanceof JedisCluster) {
                    return (Long) ((JedisCluster) nativeConnection).eval(STOCK_LUA, keys, argvList);
                }
                // 单机模式
                else if (nativeConnection instanceof Jedis) {
                    return (Long) ((Jedis) nativeConnection).eval(STOCK_LUA, keys, argvList);
                }
                return UNINITIALIZED_STOCK;
            });
        }
    
        /**
         * 获取初始的库存
         * 初始化库存数量,这个可以从DB里取实际的量
         *
         * @param commodityId 业务ID
         * @return 初始库存
         */
        @Override
        public int initStock(long commodityId) {
            // TODO 这里做一些初始化库存的操作
            return 30;
        }
    }
    
    3、调用接口并发Controller,测试分布式库存扣减
       @Resource
        private RedisStockService redisStockService;
    
        /**
         * 下单扣减库存
         *
         * @param stockDTO 下单请求参数
         * @return 扣减库存结果
         */
        @PostMapping("/buyProductCreateOrder")
        public Object buyProductCreateOrder(@RequestBody StockDTO stockDTO) {
            try {
                return redisStockService.getStock(stockDTO.getBatchNo(), stockDTO.getExpire(), stockDTO.getNum());
            } catch (Exception e) {
                return e.getMessage();
            }
        }
    

    相关文章

      网友评论

        本文标题:Redis实现高并发扣减库存,秒杀功能(可线上使用)

        本文链接:https://www.haomeiwen.com/subject/tyhbkdtx.html