美文网首页一些收藏
电商高并发秒杀4 缓存库存异步化与事务型消息

电商高并发秒杀4 缓存库存异步化与事务型消息

作者: 香沙小熊 | 来源:发表于2020-03-24 15:02 被阅读0次

    1、高效交易验证

    • 用户风控策略优化:策略缓存模型优化

    策略缓存模型化,将对应的风控内容做到redis缓存里面,例如是否异地登录、账号异常,将风控的策略通过异步的方式写入对应缓存中,在实时查询过程中做一个风控策略的实时拦截;

    • 活动检验策略优化:引入活动发布流程,模型缓存化,紧急下线能力

    引入活动发布流程,模型缓存化,紧急下线能力; 运营发现活动有异常,在后台将对应的活动进行修改,比如将活动提前结束。若线上在redis的缓存没有正常过期,即便修改了活动时间,但是用户还是可以以活动秒杀价格交易,因此需要一个紧急下线能力。所以运营人员至少要在活动开始前半个小时将活动发布上去,半个小时内足够进行缓存的预热。在后设计一个紧急下线的接口,用代码实现可以清除redis内的缓存。当redis内无法查询状态,就会去数据库内查询活动状态,从而达到紧急下架的能力;

    2. 缓存库存模型

    2.1 库存行锁优化

    itemId需要创建唯一索引

    alter table item_stock add unique index
    item_id_index(item_id)
    
    2.2 扣减库存缓存化-Redis预热
    活动发布同步库存进缓存
        public void publishPromo(Integer promoId) {
            //通过活动id获取活动
            PromoDO promoDO = promoDOMapper.selectByPrimaryKey(promoId);
            if(promoDO.getItemId() == null || promoDO.getItemId().intValue() == 0){
                return;
            }
            ItemModel itemModel = itemService.getItemById(promoDO.getItemId());
    
            /**
             * 将库存同步到redis内
             * 注意:缺少锁库存操作,这段时间内商品可能会被售卖
             *
             */
            redisTemplate.opsForValue().set(PROMO_ITEM_STOCK +itemModel.getId(), itemModel.getStock());
        }
    
    问题: 数据库记录不一致,缓存中修改了但是数据库中的数据没有进行修改;
    2.3 异步消息队列rocketmq同步数据库
    • 活动发布同步库存进缓存;
    • 下单交易减缓存库存;
    • 异步消息扣减数据库内库存;
    • 可以让C端用户完成购买商品的高效体验,又能保证数据库最终的一致性;
    缓存库存接入异步化
            /**
             *再去完成对应的下单事务型消息机制
             **/
            if (!mqProducer.transactionAsyncReduceStock(userModel.getId(), itemId, promoId, amount, stockLogId)) {
                throw new BusinessException(EmBusinessError.UNKNOWN_ERROR, "下单失败");
            }
    
    rocketmq:

    高性能,高并发,分布式消息中间件;
    典型应用场景:分布式事务,异步解耦;
    RocketMQ原理 Producer解决消息生产的问题,Consumer消息的消费端 Broker 相当于一个中间人,由topic和MessageQueue组成,任何一条rocketmq的消息都是隶属于某一个topic,一个topic可以被一个messagebroker管理,也可以被多个messagebroker管理;

    image.png

    部署模型

    image.png
    2.4 实际扣redis减库存在mq 的producer 事务消息)中
    image.png

    3.事务型消息应用

    什么叫事务性呢?就是保证数据库的事务提交,只要事务提交了就一定会保证消息发送成功。数据库内事务回滚了,消息必定不发送,事务提交未知,消息也处于一个等待的状态;

    rocketmq为我们提供TransactionMQProducer API的支持。这个api在发送消息的时候主要做了三件事:

    1,先发送需要发送的消息到消息中间件broker,并获取到该message的transactionId。在第一次发送的时候,该消息的状态为LocalTransactionState.UNKNOW
    2,处理本地事物。
    3,根据本地事物的执行结果,结合transactionId,找到该消息的位置,在mq中标志该消息的最终处理结果。

    image.png
     @PostConstruct
        public void init() throws MQClientException {
            //做mq producer的初始化
            producer = new DefaultMQProducer(groupName);
            producer.setNamesrvAddr(nameAddr);
            //设置发送超时时间,默认是3000
            producer.setSendMsgTimeout(10000);
            producer.start();
            producer.setVipChannelEnabled(false);
            transactionMQProducer = new TransactionMQProducer("transaction_producer_group");
            transactionMQProducer.setNamesrvAddr(nameAddr);
            transactionMQProducer.start();
    
            transactionMQProducer.setTransactionListener(new TransactionListener() {
                @Override
                public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                    /**
                     *  真正创建订单
                     */
                    Integer itemId = (Integer) ((Map) arg).get("itemId");
                    Integer promoId = (Integer) ((Map) arg).get("promoId");
                    Integer userId = (Integer) ((Map) arg).get("userId");
                    Integer amount = (Integer) ((Map) arg).get("amount");
                    String stockLogId = (String) ((Map) arg).get("stockLogId");
    
                    try {
                        orderService.createOrder(userId, itemId, promoId, amount, stockLogId);
                    } catch (BusinessException e) {
                        e.printStackTrace();
                        //设置对应的stockLog为回滚状态
                        StockLogDO stockLogDO = stockLogDOMapper.selectByPrimaryKey(stockLogId);
                        stockLogDO.setStatus(3);
                        stockLogDOMapper.updateByPrimaryKeySelective(stockLogDO);
                        return LocalTransactionState.ROLLBACK_MESSAGE;
                    }
                    return LocalTransactionState.COMMIT_MESSAGE;
                }
    
                @Override
                public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                    /**
                     * 根据是否扣减库存成功,来判断要返回COMMIT,ROLLBACK还是继续UNKNOWN
                     */
                    String jsonString = new String(msg.getBody());
                    Map<String, Object> map = JSON.parseObject(jsonString, Map.class);
                    Integer itemId = (Integer) map.get("itemId");
                    Integer amount = (Integer) map.get("amount");
                    String stockLogId = (String) map.get("stockLogId");
                    StockLogDO stockLogDO = stockLogDOMapper.selectByPrimaryKey(stockLogId);
                    /**
                     * 兼容异常
                     */
                    if (stockLogDO == null) {
                        return LocalTransactionState.UNKNOW;
                    }
                    /**
                     * status=2 日志提交正常
                     */
                    if (stockLogDO.getStatus().intValue() == 2) {
                        return LocalTransactionState.COMMIT_MESSAGE;
                    } else if (stockLogDO.getStatus().intValue() == 1) {
                        return LocalTransactionState.UNKNOW;
                    }
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }
            });
        }
    
        /**
         * 事务型同步库存扣减消息
         */
        public boolean transactionAsyncReduceStock(Integer userId, Integer itemId, Integer promoId, Integer amount, String stockLogId) {
            Map<String, Object> bodyMap = new HashMap<>();
            bodyMap.put("itemId", itemId);
            bodyMap.put("amount", amount);
            bodyMap.put("stockLogId", stockLogId);
    
            Map<String, Object> argsMap = new HashMap<>();
            argsMap.put("itemId", itemId);
            argsMap.put("amount", amount);
            argsMap.put("userId", userId);
            argsMap.put("promoId", promoId);
            argsMap.put("stockLogId", stockLogId);
    
            Message message = new Message(topicName, "increase",
                    JSON.toJSON(bodyMap).toString().getBytes(StandardCharsets.UTF_8));
    
            /**
             * 拿到消息发送的结果
             * 进行 回滚与提交的操作
             */
            TransactionSendResult sendResult = null;
            try {
                /**
                 * 事务型消息
                 */
                sendResult = transactionMQProducer.sendMessageInTransaction(message, argsMap);
            } catch (MQClientException e) {
                e.printStackTrace();
                return false;
            }
            if (sendResult.getLocalTransactionState() == LocalTransactionState.ROLLBACK_MESSAGE) {
                return false;
            } else {
                return sendResult.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE
            }
    
        }
    
    

    4、操作流水 库存数据库最终一致性保证

    对于操作型数据:log data,意义是库存扣减的操作记录下来,便于追踪库存操作流水具体的状态;根据这个状态去做对应的回滚,或者查询对应的状态,使很多异步型的操作可以在操作型数据上,例如编译人员在后台创建的一些配置。

    主业务数据:master data,ItemModel就是主业务数据,记录了对应商品的主数据;ItemStock对应的库存也是主业务数据;

    发起异步消息mq之前先保存日志数据

            /**
             * 加入库存流水init状态
             * 并拿到StockLogid
             */
            String stockLogId = itemService.initStockLog(itemId, amount);
            /**
             *再去完成对应的下单事务型消息机制
             **/
            if (!mqProducer.transactionAsyncReduceStock(userModel.getId(), itemId, promoId, amount, stockLogId)) {
                throw new BusinessException(EmBusinessError.UNKNOWN_ERROR, "下单失败");
            }
    

    发起异步消息mq 中 ,任何环节失败日志数据都要设置为回滚或者失败状态

    image.png

    相关文章

      网友评论

        本文标题:电商高并发秒杀4 缓存库存异步化与事务型消息

        本文链接:https://www.haomeiwen.com/subject/kulwyhtx.html