美文网首页面试精选
分布式事务设计 -- 本地消息表,设计与代码

分布式事务设计 -- 本地消息表,设计与代码

作者: Zal哥哥 | 来源:发表于2021-10-19 10:13 被阅读0次

    分布式事务设计

    场景

    在业务中有一处需要用户为订单付款,该业务会修改用户库的balance(用户余额表),扣减用户的余额,然后会修改订单库的order(订单表)和enterprise(企业余额表),将订单状态设置为已被支付,并增加企业的余额。这里就同时修改多个数据库,涉及到了分布式事务的问题。我最终是使用了RocketMQ的事务消息,并从外围解决了消息回查的问题。

    他人思路

    在设计我的解决方案前尝试搜索了一下别人的实现 传送门。他的解决方案是在producer和consumer方设置了两个scheduler,感觉是有些复杂的。我是在其基础上进行了简化,并解决了一些其他问题,使得整个解决方案比较完整和逻辑自洽。

    我的设计

    A和B是两个Service,A执行本地事务,B执行远程事务。A会调用B的远程服务,完成整个业务。就本项目而言,A就是用户模块的AccountService,B就是订单模块的OrderService。A和B都有一张表,存储着消息数据。从MQ的视角看来,A是消息的Producer,B是消息的Consumer。

    A(本地事务执行方,MQProducer)

    1. db
      producer_msg(msgId,body,message_status,create_time,update_time,send_times,topic) msgId这里为orderId

    2. mq
      作为producer时,注册Topic account:当执行本地事务时同时插入producer_msg,默认status都是未被消费。如果本地事务执行失败,那么直接回滚,不插入。当消息发送失败时,我们已经在producer_msg插入了记录,可以进行回查。

    3. scheduler
      A需要同步B的数据库,使得两个数据库数据一致,不同的即为确认信息发送失败的。
      消息状态有未被消费、已被消费、消费失败、超过消费失败的重试次数、超过确认消息发送失败的重试次数和已被回滚。
      A和B数据库同步维护所有消息,只是A数据库保存内容更多,比如会保存消息的body。
      如果消息已经是超过重试次数或已被消费,那么A不会再去考虑它。
      A的Scheduler会遍历A数据库,找出未被消费和消费失败的id且创建时间距离当前时间超过1min,发送给B。
      B会遍历这些id

    for(id in ids){
        如果 id 不存在,说明确认消息发送失败,
        如果 id 存在,则将该id对应的status一并返回,map.put(id,status)
    } 
    

    A 接收到map后,keySet取得所有id,拿发送过去的id减去这些id(差集),就是确认消息发送失败的消息,进行重新发送;遍历map,将本地数据库同步为B数据库。

    这个方法可能会出现消息重复,因为A刚发送消息,B该没有处理,A的Scheduler就去查询了,当然消息都没有被消费,因为A会重发刚才的消息,但是B有做消息去重,所以不会影响。

    B(远程事务执行方,MQConsumer)

    1. db
      consumer_msg(msgId,create_time. message_status,topic) msgId这里是orderId

    2. mq
      作为consumer,注册Topic account:
      当接收到消息后,查询是否被执行过,如果没有被消费过(id未找到)或者消费失败了(这里解决了消息重复消费的问题),则执行远程事务后插入/更新consumer_ msg(status为已被消费),已被消费则跳过。
      远程事务执行失败时,插入/更新consumer_ msg(status为消费失败)
      超过重试消费次数的消息也更新consumer_ msg,status为超过消费的重试次数。
      B这里就维护它所接收的消息的状态。

    消息表

    在producer这一方设计了producer_transaction_message表。


    这里写图片描述
    • msgId是消息唯一id,可以采用业务上的id来实现,比如订单id。
    • body是消息体,比如订单对象的序列化结果。
    • message_status是消息状态
    • update_time是最后更新记录时间
    • create_time是消息创建时间
    • send_times是确认消息重复发送次数
    • topic是消息主题,这里均为account

    在consumer这一方设计了consumer_transaction_message表。


    这里写图片描述

    看得出来是producer的表的部分列,其含义也是相同的。

    分布式事务实现代码

    Producer方

    MQProducerConfig(配置MQProducer)

    @Configuration
    @Slf4j
    @Getter
    public class MQProducerConfig {
        @Value("${spring.rocketmq.group-name}")
        private String groupName;
        @Value("${spring.rocketmq.namesrv-addr}")
        private String namesrvAddr;
        @Value("${spring.rocketmq.topic}")
        private String topic;
        @Value("${spring.rocketmq.confirm-message-faiure-retry-times}")  
        private Integer retryTimes;
        public static final Integer CHECK_GAP = 1; 
    
        @Bean
        public MQProducer mqProducer() throws MQClientException {
            TransactionMQProducer producer = new TransactionMQProducer(groupName);
            producer.setNamesrvAddr(namesrvAddr); 
            producer.setTransactionCheckListener(new TransactionCheckListener() {
                @Override
                public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
                    // doNothing
                    return LocalTransactionState.COMMIT_MESSAGE;
                }
            });
            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
                public void run() {
                    producer.shutdown();
                }
            }));
            producer.start();
            log.info("producer started!");
            return producer;
        }
    }
    
    

    AccountLocalTransactionExecutor(执行本地事务)

    @Component
    @Slf4j
    public class AccountLocalTransactionExecutor implements LocalTransactionExecuter {
        @Autowired
        private PayService payService;
        @Autowired
        private ProducerTransactionMessageService messageService;
    
        @Override
        public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
            try {
                String paymentPassword = (String) arg;
                OrderDO order = ProtoStuffUtil.deserialize(msg.getBody(), OrderDO.class);
                if (order.getOrderStatus() != OrderStatus.UNPAID) {
                    log.info("{} 订单状态不为unpaid", order.getId());
                    throw new OrderStateIllegalException(order.getOrderStatus().toString());
                }
                // 本地事务,减少用户账户余额
                // 抛出异常时会进行回滚,下面构造消息存储到数据库也不会被执行
                payService.decreaseAccount(order.getUser().getId(), order.getTotalPrice(), paymentPassword);
                // 保存消息至数据库
                ProducerTransactionMessageDO messageDO = ProducerTransactionMessageDO.builder()
                        .id(order.getId())
                        .body(msg.getBody())
                        .createTime(LocalDateTime.now())
                        .updateTime(LocalDateTime.now())
                        .messageStatus(MessageStatus.UNCONSUMED)
                        .topic(msg.getTopic())
                        .sendTimes(0)
                        .build();
                messageService.save(messageDO);
                // 成功通知MQ消息变更 该消息变为:<确认发送>
                return LocalTransactionState.COMMIT_MESSAGE;
            } catch (Exception e) {
                e.printStackTrace();
                log.info("本地事务执行失败,直接回滚!");
                // 失败则不通知MQ 该消息一直处于:<暂缓发送>
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
    }
    

    AccountServiceImpl(Producer支付业务入口)

    @Service
    @Slf4j
    public class AccountServiceImpl implements AccountService {
        @Autowired
        private MQProducerConfig config;
        @Autowired
        private MQProducer producer;
        @Autowired
        private AccountLocalTransactionExecutor executor;
        @Autowired
        private ProducerTransactionMessageService messageService;
        @Autowired
        private PayService payService;
    
        @Override
        public void commit(OrderDO order, String paymentPassword) {
            Message message = new Message();
            message.setTopic(config.getTopic());
            message.setBody(ProtoStuffUtil.serialize(order));
            TransactionSendResult result = null;
            try {
                result = this.producer.sendMessageInTransaction(message, executor, paymentPassword);
                log.info("事务消息发送结果:{}", result);
                log.info("TransactionState:{} ", result.getLocalTransactionState());
                // 因为无法获得executor中抛出的异常,只能模糊地返回订单支付失败信息。
                // TODO 想办法从executor中找到原生异常
            } catch (Exception e) {
                log.info("AccountService抛出异常...");
                e.printStackTrace();
            }
            if (result.getLocalTransactionState() == LocalTransactionState.ROLLBACK_MESSAGE) {
                throw new OrderPaymentException(order.getId());
            }
        }
    
        @Transactional
        @Override
        public void rollback(ProducerTransactionMessageDO message) {
            OrderDO order = ProtoStuffUtil.deserialize(message.getBody(), OrderDO.class);
            message.setMessageStatus(MessageStatus.ROLLBACK);
            message.setUpdateTime(LocalDateTime.now());
            messageService.update(message);
            payService.increaseAccount(order.getUser().getId(), order.getTotalPrice());
        }
    }
    
    

    TransactionCheckScheduler(消息回查)

    @Component
    public class TransactionCheckScheduler {
        @Autowired
        private ProducerTransactionMessageService messageService;
    
        /**
         * 每分钟执行一次事务回查
         */
        @Scheduled(fixedRate = 60 * 1000)
        public void checkTransactionMessage(){
            messageService.check();
        }
    }
    
    ProducerTransactionMessageServiceImpl(Producer消息服务提供者)
    @Slf4j
    public class ProducerTransactionMessageServiceImpl implements ProducerTransactionMessageService {
        @Autowired
        private MQProducer producer;
        @Autowired
        private MQProducerConfig config;
        @Autowired
        private ProductTransactionMessageDOMapper mapper;
        @Autowired
        private ConsumerTransactionMessageService consumerTransactionMessageService;
    
        @Transactional
        @Override
        public void save(ProducerTransactionMessageDO message) {
            mapper.insert(message);
        }
    
        @Transactional
        @Override
        public void check() {
            List<Long> all = mapper.findMessageIdsByStatusCreatedAfter(Arrays.asList(MessageStatus.UNCONSUMED, MessageStatus.CONSUME_FAILED), MQProducerConfig.CHECK_GAP);
            Map<Long, MessageStatus> statusMap = consumerTransactionMessageService.findConsumerMessageStatuses(all);
            for (Map.Entry<Long, MessageStatus> entry : statusMap.entrySet()) {
                mapper.updateByPrimaryKeySelective(ProducerTransactionMessageDO.builder().id(entry.getKey()).messageStatus(entry.getValue()).updateTime(LocalDateTime.now()).build());
            }
            all.removeAll(statusMap.keySet());
            // 此时all为确认消息发送失败的
            this.reSend(mapper.selectBatchByPrimaryKeys(all));
        }
    
        @Transactional
        @Override
        public void reSend(List<ProducerTransactionMessageDO> messages) {
            for (ProducerTransactionMessageDO messageDO : messages) {
                if (messageDO.getSendTimes() == config.getRetryTimes()) {
                    messageDO.setUpdateTime(LocalDateTime.now());
                    messageDO.setMessageStatus(MessageStatus.OVER_CONFIRM_RETRY_TIME);
                    mapper.updateByPrimaryKeySelective(messageDO);
                    continue;
                }
                Message message = new Message();
                message.setTopic(config.getTopic());
                message.setBody(messageDO.getBody());
                try {
                    SendResult result = producer.send(message);
                    messageDO.setSendTimes(messageDO.getSendTimes() + 1);
                    messageDO.setUpdateTime(LocalDateTime.now());
                    mapper.updateByPrimaryKeySelective(messageDO);
                    log.info("发送重试消息完毕,Message:{},result:{}", message, result);
                } catch (Exception e) {
                    e.printStackTrace();
                    log.info("发送重试消息时失败! Message:{}", message);
                }
            }
        }
    
        @Transactional
        @Override
        public void delete(Long id) {
            mapper.deleteByPrimaryKey(id);
        }
    
        @Transactional(readOnly = true)
        @Override
        public List<ProducerTransactionMessageDO> findByIds(List<Long> ids) {
            return mapper.selectBatchByPrimaryKeys(ids);
        }
    
        @Transactional(readOnly = true)
        @Override
        public PageInfo<ProducerTransactionMessageDO> findByQueryDTO(MessageQueryConditionDTO dto) {
            return mapper.findByCondition(dto, dto.getPageNum(), dto.getPageSize()).toPageInfo();
        }
    
        @Override
        public void update(ProducerTransactionMessageDO message) {
            mapper.updateByPrimaryKeySelective(message);
        }
    
    }
    

    Consumer

    MQConsumerConfig(配置MQConsumer)

    @Configuration
    @Slf4j
    @Getter
    public class MQConsumerConfig {
        private DefaultMQPushConsumer consumer;
    
        @Value("${spring.rocketmq.group-name}")
        private String groupName;
        @Value("${spring.rocketmq.namesrv-addr}")
        private String namesrvAddr;
        @Value("${spring.rocketmq.topic}")
        private String topic;
        @Autowired
        private AccountMessageListener accountMessageListener;
        @Value("${spring.rocketmq.consume-failure-retry-times}")
        private Integer retryTimes;
    
        @PostConstruct
        public void init() throws MQClientException {
            this.consumer = new DefaultMQPushConsumer(groupName);
            this.consumer.setNamesrvAddr(namesrvAddr);
            // 启动后从队列头部开始消费
            this.consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            this.consumer.subscribe(topic, "*");
            this.consumer.registerMessageListener(accountMessageListener);
            this.consumer.start();
            log.info("consumer started!");
        }
    }
    

    AccountMessageListener(消息接收方)

    @Component
    @Slf4j
    public class AccountMessageListener implements MessageListenerConcurrently {
        @Autowired
        private OrderService orderService;
        @Autowired
        @Qualifier("consumerTransactionMessageService")
        private ConsumerTransactionMessageService messageService;
        @Autowired
        private MQConsumerConfig config;
    
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            log.info("接收到消息数量为:{}", msgs.size());
            for (MessageExt msg : msgs) {
                ConsumerTransactionMessageDO messageDO = null;
                OrderDO order = null;
                try {
                    String topic = msg.getTopic();
                    String keys = msg.getKeys();
                    order = ProtoStuffUtil.deserialize(msg.getBody(), OrderDO.class);
                    log.info("消费者接收到消息:topic: {}, keys:{} , order: {}", topic, keys, order);
                    // 如果已经被消费过并且消费成功,那么不再重复消费(未被消费->id不存在或消费失败或超过重试次数的都会继续消费)
                    if(messageService.isMessageConsumedSuccessfully(order.getId())){
                        continue;
                    }
                    messageDO = ConsumerTransactionMessageDO.builder()
                            .id(order.getId())
                            .createTime(LocalDateTime.now())
                            .topic(msg.getTopic())
                            .build();
                    // 业务逻辑处理
                    orderService.finishOrder(order);
                    // 如果业务逻辑抛出异常,那么会跳过插入CONSUMED
                    messageDO.setMessageStatus(MessageStatus.CONSUMED);
                    // 如果是未被消费,第一次就消费成功了,则插入
                    // 如果是超过重试次数,又人工设置重试,则更新状态为已被消费
                    messageService.insertOrUpdate(messageDO);
                } catch (Exception e) {
                    e.printStackTrace();
                    // 重试次数达到最大重试次数 
                    if (msg.getReconsumeTimes() == config.getRetryTimes()) {
                        log.info("客户端重试三次,需要人工处理");
                        messageService.update(
                                ConsumerTransactionMessageDO.builder()
                                        .id(order.getId())
                                        .messageStatus(MessageStatus.OVER_CONSUME_RETRY_TIME).build()
                        );
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    } else {
                        log.info("消费失败,进行重试,当前重试次数为: {}", msg.getReconsumeTimes());
                        messageDO.setMessageStatus(MessageStatus.CONSUME_FAILED);
                        // 如果第一次消费失败,那么插入
                        // 如果之前消费失败,继续重试,那么doNothing
                        // 如果之前是超过重试次数,人工设置重试,那么将状态改为消费失败
                        messageService.insertOrUpdate(messageDO);
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }
    

    ConsumerTransactionMessageServiceImpl(Consumer消息服务提供者)

    public class ConsumerTransactionMessageServiceImpl implements ConsumerTransactionMessageService {
        @Autowired
        private ConsumerTransactionMessageDOMapper mapper;
    
        @Transactional(readOnly = true)
        @Override
        public Map<Long, MessageStatus> findConsumerMessageStatuses(List<Long> ids) {
            Map<Long, MessageStatus> result = new HashMap<>();
            for (Long id : ids) {
                MessageStatus status = mapper.findStatusById(id);
                if (status != null) {
                    result.put(id, status);
                }
            }
            return result;
        }
    
        @Transactional(readOnly = true)
        @Override
        public ConsumerTransactionMessageDO selectByPrimaryKey(Long id) {
            return mapper.selectByPrimaryKey(id);
        }
    
        @Transactional
        @Override
        public void insert(ConsumerTransactionMessageDO record) {
            mapper.insert(record);
        }
    
        @Override
        public void insertOrUpdate(ConsumerTransactionMessageDO record) {
            ConsumerTransactionMessageDO recordInDB = mapper.selectByPrimaryKey(record.getId());
            if (recordInDB == null) {
                mapper.insert(record);
            } else {
                recordInDB.setMessageStatus(record.getMessageStatus());
                mapper.updateByPrimaryKeySelective(recordInDB);
            }
        }
    
        @Transactional
        @Override
        public void insertIfNotExists(ConsumerTransactionMessageDO record) {
            if (mapper.selectByPrimaryKey(record.getId()) == null) {
                mapper.insert(record);
            }
        }
    
        @Transactional
        @Override
        public void update(ConsumerTransactionMessageDO record) {
            mapper.updateByPrimaryKeySelective(record);
        }
    
        @Transactional(readOnly = true)
        @Override
        public boolean isMessageConsumedSuccessfully(Long id) {
            MessageStatus status = mapper.findStatusById(id);
            return status == MessageStatus.CONSUMED;
        }
    }
    

    消息管理

    尚需提供一个消息的监控平台,可以搜索和查看消息的状态,尤其是需要人工处理的死信,可以回滚本地事务或重新发送。
    界面类似于下图:


    这里写图片描述

    当前仅开发了消息管理系统的数据接口,尚未开发其客户端。

    @RestController
    @RequestMapping("/message_console")
    public class MessageConsoleController {
        @Autowired
        private ProducerTransactionMessageService messageService;
        @Autowired
        private AccountService accountService;
    
        @RequestMapping(value = "/query", method = RequestMethod.POST)
        public PageInfo<ProducerTransactionMessageDO> findByQueryDTO(@RequestBody MessageQueryConditionDTO queryDTO) {
            if (queryDTO.getPageNum() == null || queryDTO.getPageNum() <= 0) {
                queryDTO.setPageNum(Integer.valueOf(PageProperties.DEFAULT_PAGE_NUM));
            }
            if (queryDTO.getPageSize() == null || queryDTO.getPageSize() <= 0) {
                queryDTO.setPageSize(Integer.valueOf(PageProperties.DEFAULT_PAGE_SIZE));
            }
            return messageService.findByQueryDTO(queryDTO);
        }
    
        @RequestMapping(value = "/reSend", method = RequestMethod.POST)
        public void reSend(@RequestBody MessageIdDTO dto) {
            List<ProducerTransactionMessageDO> messages = messageService.findByIds(dto.getIds());
            for (ProducerTransactionMessageDO messageDO : messages) {
                messageDO.setMessageStatus(MessageStatus.UNCONSUMED);
                messageDO.setSendTimes(0);
            }
            messageService.reSend(messages);
        }
    
        @RequestMapping(value = "/rollback", method = RequestMethod.POST)
        public void rollback(@RequestBody MessageIdDTO dto) {
            for (ProducerTransactionMessageDO message : messageService.findByIds(dto.getIds())) {
                accountService.rollback(message);
            }
        }
    }
    

    总结

    自上次开发完SpringBootSOASkeleton之后,就一直希望能完成一个数据库按业务分库和分布式事务的项目。大概花了两周,大概尝试了TCC和可靠消息最终一致两种方法,最终解决了分布式事务的问题。
    TCC是我首先采用的技术,使用了Github开源的ByteTCC,但花了很多时间没有跑通,另外用起来非常复杂,对业务逻辑侵入非常大,最后是放弃了,但也留下来基于ByTeTCC的完成度比较高的代码,最后以Git的一个tag结束了它的生命周期。


    这里写图片描述

    然后我考虑使用MQ,尤其是原本对事务消息有所支持的RocketMQ来实现分布式事务。因为消息回查的功能被阉割,又去阅读了其源码和他人考虑的解决方案去实现它。就目前这个解决方案而言,自我感觉是比较完善的,既不是非常复杂, 又解决了RocketMQ原来存在的很多问题。但因为还是一个学生,对分布式比较缺乏经验,如果大家能发现其中存在的问题,也希望在博客下评论或Github提issue。

    全部代码已经放到Github上,按照《Linux集群搭建》配置的环境下,代码是可以跑通的,只是确认消息发送失败这种场景很难模拟出来,这也是有待观察的。

    参考资料

    大规模SOA系统中的分布事务处事-程立
    支付宝架构与技术
    RocketMQ用户指南v3.2.4
    高并发下的幂等策略分析
    RocketMQ源码解析
    分布式开放消息系统(RocketMQ)的原理与实践

    https://blog.csdn.net/songxinjianqwe/article/details/78923482

    相关文章

      网友评论

        本文标题:分布式事务设计 -- 本地消息表,设计与代码

        本文链接:https://www.haomeiwen.com/subject/uytwoktx.html