美文网首页开发设计java解决方案
如何应对高并发:悲观锁,乐观锁,Redis

如何应对高并发:悲观锁,乐观锁,Redis

作者: Wayne_Dream | 来源:发表于2019-05-27 22:22 被阅读631次

    根据上一篇Demo测试情况反映,当有多个线程同时抢购时,会发生超发现象,所谓超发现象,就是原本设置库存为30000件,但是,当抢购完成后发现库存余量变成了负数,即发货量大于库存量的情况:

    超发现象

    造成这种现象的原因:当多个线程请求数据库查询库存余量时,显示有余量,但是当进行扣减库存时,库存已经用完了,但那个线程并不知道,依旧去扣减库存,造成库存为负数的情况,于是乎就出现了超发现象。

    测试方法:根据书上是html中使用js,for循环异步请求,发现并不会造成超发现象,后改为在浏览器中同时开启多个窗口访问/test进行抢购,模拟多个用户抢购的场景,内存爆炸...

    为了解决这种问题,下面将介绍三种解决方法:

    1、悲观锁

    发生超发现象的根本原因是共享数据被多个线程所修改,无法保证其执行顺序,如果一个数据库事务读取到一个产品后,就将数据直接锁定,不允许其他线程进行读写操作,直至当前数据库事务完成才释放这条数据的锁,就不会发生超发现象,但是执行效率性能将大大下降。

    修改ProductMapper中的SQL语句:

    @Mapper
    public interface ProductMapper {
    
        @Select("SELECT id, product_name as productName, stock, price, version, note FROM t_product where id=#{id} for update")
        ProductPo getProduct(Long id);
    
        @Update("UPDATE t_product SET stock = stock - #{quantity} WHERE id = #{id}")
        int decreaseProduct(@Param("id") Long id, @Param("quantity") int quantity);
    }
    

    在select语句末尾添加了for update,这样,在数据库事务执行的过程中,就会锁定查询出来的数据,其他事务将不能再对其进行读写,单个请求直至数据库事务完成,才会释放这个,下图可见其stock为0,没有发生超发现象,但执行效率下降了,通过购买记录可以得知,相比之前没加锁慢了1/5。

    库存

    2、乐观锁

    为了解决悲观锁带来的性能下降的问题,我们来讨论一下乐观锁的原理

    乐观锁是一种不使用数据库锁和不阻塞线程并发的方案,下图是以本Demo为例的乐观锁流程:

    流程图

    这种方案就是多线程的概念CAS(Compare and Swap),然而这样的方案会引发一种ABA问题

    T1时刻:线程1读取商品库存为A

    T2时刻:线程2读取商品库存为A

    T3时刻:线程2计算购买商品总价格

    T4时刻:当前库存为A,与线程2保存的旧值一致,因此线程2可减库存(当前库存A--->B),此时线程1在当前库存为B的情况下计算剩余商品价格(单价*B)。

    T5时刻:线程2取消购买,线程2回退(当前库存B--->A),线程1计算的剩余商品价格错误。

    T6时刻:线程1比较旧值与当前数据库库存,发现都为A,返回之前计算好的(单价*B)结果,造成了错误。

    从上面的分析中看到一个现象A--->B--->A的过程,就是所谓的ABA问题,解决此问题的方法为加入版本号的限制,只要在操作过程中修改共享值,无论业务正常,回退,还是异常,版本号只能递增,不能回退递减。每次通过比较数据的版本号来查看此数据是否被修改过。

    @Mapper
    public interface ProductMapper {
    
        @Select("SELECT id, product_name as productName, stock, price, version, note FROM t_product where id=#{id}")
        ProductPo getProduct(Long id);
    
        //********************change******************************
        @Update("UPDATE t_product SET stock = stock - #{quantity}, version = version + 1 WHERE id = #{id} and version = #{version}")
        int decreaseProduct(@Param("id") Long id, @Param("quantity") int quantity, @Param("version") int version);
    }
    
         @Override
         // 启动Spring数据库事务机制
         @Transactional
         public boolean purchase(Long userId, Long productId, int quantity) {
             // 获取产品
             ProductPo product = productMapper.getProduct(productId);
             // 比较库存和购买数量
             if (product.getStock() < quantity) {
             // 库存不足
             return false;
             }
             //**************************change*******************************
             // 扣减库存,加入了version
             productMapper.decreaseProduct(productId, quantity, product.getVersion());
             //***************************************************************
             // 初始化购买记录
             PurchaseRecordPo pr = this.initPurchaseRecord(userId, product, quantity);
             // 插入购买记录
             purchaseRecordMapper.insertPurchaseRecord(pr);
             return true;
         }
    
    产品表

    发现stock并没有降为0,原因是加入了版本号的判断,所以大量的请求得到了失败的结果,而且失败率有点高。要解决这个方法,就设定为如果失败,就重试,直至成功,但是这样又会造成大量SQL执行,影响性能,所以一般可以使用限制时间或者重入次数的方法来克服。

    时间戳限制重入的乐观锁:

    将一个请求限制在100ms的生存期,如果在100ms内发生版本号冲突而导致不能更新的,则会重新尝试请求,否则请求失败。

    修改service下PurchaseserviceImpl的purchase方法

     @Override
         // 启动Spring数据库事务机制
         @Transactional
         public boolean purchase(Long userId, Long productId, int quantity) {
    
             long start = System.currentTimeMillis();
             while (true){
                 long end = System.currentTimeMillis();
                 if (end - start >100){
                     return false;
                 }
                 // 获取产品
                 ProductPo product = productMapper.getProduct(productId);
                 // 比较库存和购买数量
                 if (product.getStock() < quantity) {
                     // 库存不足
                     return false;
                 }
                 // 扣减库存
                 int result = productMapper.decreaseProduct(productId, quantity, product.getVersion());
    
                 // 如果数据更新失败,说明数据在多线程中被其他线程修改
                 // 导致失败,着通过循环重入尝试购买商品
                 if (result == 0){
                     continue;
                 }
                 // 初始化购买记录
                 PurchaseRecordPo pr = this.initPurchaseRecord(userId, product, quantity);
                 // 插入购买记录
                 purchaseRecordMapper.insertPurchaseRecord(pr);
                 return true;
             }
         }
    

    这种方法在测试中效果并不是很好,执行速度很慢,冲突现象并没有减少,反而增多,可能是我测试方法并不好,只开了三个网页来模拟并发,不太懂JS,Demo用的JS是发送异步请求的,但用单窗口测试了好多次都没出现超发现象,只能人肉模拟并发。

    限定次数重入的乐观锁:

         @Override
         // 启动Spring数据库事务机制
         @Transactional
         public boolean purchase(Long userId, Long productId, int quantity) {
    
             long start = System.currentTimeMillis();
             for (int i=0; i<3; i++){
                 // 获取产品
                 ProductPo product = productMapper.getProduct(productId);
                 // 比较库存和购买数量
                 if (product.getStock() < quantity) {
                     // 库存不足
                     return false;
                 }
                 // 扣减库存
                 int result = productMapper.decreaseProduct(productId, quantity, product.getVersion());
    
                 // 如果数据更新失败,说明数据在多线程中被其他线程修改
                 // 导致失败,着通过循环重入尝试购买商品
                 if (result == 0){
                     continue;
                 }
                 // 初始化购买记录
                 PurchaseRecordPo pr = this.initPurchaseRecord(userId, product, quantity);
                 // 插入购买记录
                 purchaseRecordMapper.insertPurchaseRecord(pr);
                 return true;
             }
             return false;
         }
    

    这种方式比上一种限定时间好,速度和单纯使用乐观锁差不多,并且消除了冲突。

    3、Redis处理高并发

    在高并发环境中,直接操作数据库的方式过于缓慢,因为数据库是一个写入磁盘的过程,这个速度没有写入内存的Redis快,Redis的机制也能够帮助我们克服超发现象,但是,因为其命令方式运算能力比较薄弱,所以往往采用Redis Lua去代替它原有的命令方式。Redis Lua在Redis的执行中是局内原子性的,但他被执行时不会被其他客户端发送过来的命令打断,通过这样一种机制,可以在需要高并发的环境下考虑使用Redis去代替数据库作为响应用户的数据载体。但是Redis存储具有不稳定性,所以还需要有一定的机制将Redis存储的数据刷入数据库。

    下面先来配置一下Redis:

    application.properties

    #配置redis
    spring.redis.jedis.pool.min-idle=5
    spring.redis.jedis.pool.max-active=10
    spring.redis.jedis.pool.max-idle=10
    spring.redis.jedis.pool.max-wait=2000
    spring.redis.port=6379
    spring.redis.host=127.0.0.1
    #我的Redis没有密码
    #spring.redis.password=123456
    spring.redis.timeout=1000
    

    pom.xml

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
        <version>2.1.4.RELEASE</version>
        <exclusions>
            <!--不依赖Redis的异步客户端lettuce -->
            <exclusion>
                <groupId>io.lettuce</groupId>
                <artifactId>lettuce-core</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <!--引入Redis的客户端驱动jedis -->
    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
    </dependency>
    

    PurchaseServiceImpl.java,使用Redis Lua响应请求

    @Service
    public class PurchaseServiceImpl implements PurchaseService {
    
    
        @Autowired
        private ProductMapper productMapper = null;
    
        @Autowired
        private PurchaseRecordMapper purchaseRecordMapper = null;
       
        private PurchaseRecordPo initPurchaseRecord(Long userId, ProductPo product, int quantity) {
            PurchaseRecordPo pr = new PurchaseRecordPo();
            pr.setNote("购买日志,时间:" + System.currentTimeMillis());
            pr.setPrice(product.getPrice());
            pr.setProductId(product.getId());
            pr.setQuantity(quantity);
            double sum = product.getPrice() * quantity;
            pr.setSum(sum);
            pr.setUserId(userId);
            return pr;
        }
    
        @Autowired
        StringRedisTemplate stringRedisTemplate = null;
        String purchaseScript =
                // 先将产品编号保存到集合中
                " redis.call('sadd', KEYS[1], ARGV[2]) \n"
                        // 购买列表
                        + "local productPurchaseList = KEYS[2]..ARGV[2] \n"
                        // 用户编号
                        + "local userId = ARGV[1] \n"
                        // 产品key
                        + "local product = 'product_'..ARGV[2] \n"
                        // 购买数量
                        + "local quantity = tonumber(ARGV[3]) \n"
                        // 当前库存
                        + "local stock = tonumber(redis.call('hget', product, 'stock')) \n"
                        // 价格
                        + "local price = tonumber(redis.call('hget', product, 'price')) \n"
                        // 购买时间
                        + "local purchase_date = ARGV[4] \n"
                        // 库存不足,返回0
                        + "if stock < quantity then return 0 end \n"
                        // 减库存
                        + "stock = stock - quantity \n"
                        + "redis.call('hset', product, 'stock', tostring(stock)) \n"
                        // 计算价格
                        + "local sum = price * quantity \n"
                        // 合并购买记录数据
                        + "local purchaseRecord = userId..','..quantity..','"
                        + "..sum..','..price..','..purchase_date \n"
                        // 保存到将购买记录保存到list里
                        + "redis.call('rpush', productPurchaseList, purchaseRecord) \n"
                        // 返回成功
                        + "return 1 \n";
        // Redis购买记录集合前缀
        private static final String PURCHASE_PRODUCT_LIST = "purchase_list_";
        // 抢购商品集合
        private static final String PRODUCT_SCHEDULE_SET = "product_schedule_set";
        // 32位SHA1编码,第一次执行的时候先让Redis进行缓存脚本返回
        private String sha1 = null;
    
        @Override
        public boolean purchaseRedis(Long userId, Long productId, int quantity) {
            // 购买时间
            Long purchaseDate = System.currentTimeMillis();
            Jedis jedis = null;
            try {
                // 获取原始连接
                jedis = (Jedis) stringRedisTemplate
                        .getConnectionFactory().getConnection().getNativeConnection();
                // 如果没有加载过,则先将脚本加载到Redis服务器,让其返回sha1
                if (sha1 == null) {
                    sha1 = jedis.scriptLoad(purchaseScript);
                }
                // 执行脚本,返回结果
                Object res = jedis.evalsha(sha1, 2, PRODUCT_SCHEDULE_SET,
                        PURCHASE_PRODUCT_LIST, userId + "", productId + "",
                        quantity + "", purchaseDate + "");
                Long result = (Long) res;
                return result == 1;
            } finally {
                // 关闭jedis连接
                if (jedis != null && jedis.isConnected()) {
                    jedis.close();
                }
            }
        }
    
        @Override
        // 当运行方法启用新的独立事务运行
        @Transactional(propagation = Propagation.REQUIRES_NEW)
        // 保存购买记录,持久化到数据库
        public boolean dealRedisPurchase(List<PurchaseRecordPo> prpList) {
            for (PurchaseRecordPo prp : prpList) {
                purchaseRecordMapper.insertPurchaseRecord(prp);
                productMapper.decreaseProduct(prp.getProductId(), prp.getQuantity());
            }
            return true;
        }
    }
    

    使用定时机制,定时将数据持久化到数据库:

    首先设置启动文件:

    @SpringBootApplication(scanBasePackages = "com.wayne.springboot")
    @MapperScan(annotationClass = Mapper.class, basePackages = "com.wayne.springboot")
    // 启动springboot的定时机制,为此需要一个定时的方法来提供服务
    // 把Redis的数据导入到数据库
    @EnableScheduling
    public class SpringBootShoppingApplication{
    
        public static void main(String[] args) {
            SpringApplication.run(SpringBootShoppingApplication.class, args);
        }
    }
    

    一个定时的方法来提供服务把Redis的数据导入到数据库:

    import com.wayne.springboot.pojo.PurchaseRecordPo;
    import com.wayne.springboot.service.PurchaseService;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.BoundListOperations;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Service;
    
    import java.sql.Timestamp;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Set;
    
    @Service
    public class TaskServiceImpl implements TaskService {
        @Autowired
        private StringRedisTemplate stringRedisTemplate = null;
        @Autowired
        private PurchaseService purchaseService = null;
    
        private static final String PRODUCT_SCHEDULE_SET = "product_schedule_set";
        private static final String PURCHASE_PRODUCT_LIST = "purchase_list_";
        // 每次取出1000条,避免一次取出消耗太多内存
        private static final int ONE_TIME_SIZE = 1000;
    
        @Override
        // 每天半夜1点钟开始执行任务
    //    @Scheduled(cron = "0 0 1 * * ?")
        // 下面是用于测试的配置,每分钟执行一次任务
        @Scheduled(fixedRate = 1000 * 30)
        public void purchaseTask() {
            System.out.println("定时任务开始......");
            Set<String> productIdList
                    = stringRedisTemplate.opsForSet().members(PRODUCT_SCHEDULE_SET);
            List<PurchaseRecordPo> prpList =new ArrayList<>();
            for (String productIdStr : productIdList) {
                Long productId = Long.parseLong(productIdStr);
                String purchaseKey = PURCHASE_PRODUCT_LIST + productId;
                BoundListOperations<String, String> ops
                        = stringRedisTemplate.boundListOps(purchaseKey);
                // 计算记录数
                long size = stringRedisTemplate.opsForList().size(purchaseKey);
                Long times = size % ONE_TIME_SIZE == 0 ?
                        size / ONE_TIME_SIZE : size / ONE_TIME_SIZE + 1;
                for (int i = 0; i < times; i++) {
                    // 获取至多TIME_SIZE个抢红包信息
                    List<String> prList = null;
                    if (i == 0) {
                        prList  = ops.range(i * ONE_TIME_SIZE,
                                (i + 1) * ONE_TIME_SIZE);
                    } else {
                        prList = ops.range(i * ONE_TIME_SIZE + 1,
                                (i + 1) * ONE_TIME_SIZE);
                    }
                    for (String prStr : prList) {
                        PurchaseRecordPo prp
                                = this.createPurchaseRecord(productId, prStr);
                        prpList.add(prp);
                    }
                    try {
                        // 采用该方法采用新建事务的方式,这样不会导致全局事务回滚
                        purchaseService.dealRedisPurchase(prpList);
                    } catch(Exception ex) {
                        ex.printStackTrace();
                    }
                    // 清除列表为空,等待重新写入数据
                    prpList.clear();
                }
                // 删除购买列表
    
                stringRedisTemplate.delete(purchaseKey);
                // 从商品集合中删除商品
                stringRedisTemplate.opsForSet()
                        .remove(PRODUCT_SCHEDULE_SET, productIdStr);
            }
            System.out.println("定时任务结束......");
        }
    
        private PurchaseRecordPo createPurchaseRecord(
            Long productId, String prStr) {
            String[] arr = prStr.split(",");
            Long userId = Long.parseLong(arr[0]);
            int quantity = Integer.parseInt(arr[1]);
            double sum = Double.valueOf(arr[2]);
            double price = Double.valueOf(arr[3]);
            Long time = Long.parseLong(arr[4]);
            Timestamp purchaseTime = new Timestamp(time);
            PurchaseRecordPo pr = new PurchaseRecordPo();
            pr.setProductId(productId);
            pr.setPurchaseTime(purchaseTime);
            pr.setPrice(price);
            pr.setQuantity(quantity);
            pr.setSum(sum);
            pr.setUserId(userId);
            pr.setNote("购买日志,时间:" + purchaseTime.getTime());
            return pr;
        }
    }
    

    到这里基本完成,启动项目前先启动redis服务器,并初始化Redis:

    hmset product_1 id 1 stock 10000 price 2.00

    然后启动并访问浏览器localhost:8080/test,因为设定的间隔为30s,所以等30s去查看数据库。性能相比之前要快上数倍。

    产品表

    源码存放在github-spring-boot-shopping

    相关文章

      网友评论

        本文标题:如何应对高并发:悲观锁,乐观锁,Redis

        本文链接:https://www.haomeiwen.com/subject/zpxltctx.html