美文网首页
rocketmq实现分布式事务

rocketmq实现分布式事务

作者: sunpy | 来源:发表于2022-08-19 22:03 被阅读0次

最终一致性


指的是分布式事务处理过程中,不保证多个机器的每时每刻值一致,保证最终多个事务处理的结果,达到一致。

业务场景


  1. 当前基层医疗系统中,添加健康服务VIP用户(新增用户信息+新增用户服务)。
  2. 我们对于健康档案系统里面新增一位居民用户。
  3. 同时还要给这VIP用户添加服务,如65岁以上的老年人,我们还要新增随访服务,以及慢病管理服务等。或者孕产妇,我们也要新增随访服务,身体数据动态检测服务以及产后护理服务等。所以这就要我们在全过程都要保证事务最终一致性。

rocketmq实现思路


  1. 发送者先发送半消息给broker(MQ server),消息里面存放业务需要的信息。
  2. broker(MQ server)接受半消息成功后给发送者返回成功的通知。
  3. 发送者接收到成功通知后,开启本地事务(执行executeLocalTransaction方法)。
  4. 如果本地事务成功, 那么发送者通知broker把半消息推送到接收者,否则取消半消息的推送。
  5. 如果返回失败,那么通过执行checkLocalTransaction方法,回查事务状态。
  6. 检查本地数据库提交结果,rollback或者commit,回查结果返回给broker。
  7. 如果半消息被commit,接收者收到了消息,消费该消息,执行接收者自身本地事务。如果消息消费失败,可以重试,保障数据最终一致性。

rocketmq实现分布式事务


实现之前先将用到的几个对象说明:

方法名 说明
TransactionMQProducer 半消息生产者
TransactionListener 监听器,处理本地事务
LocalTransactionState 标记事务状态

生产者

controller入口:

@PostMapping("/save")
public ResultModel<String> saveUser(@RequestParam("username") String username,
                                    @RequestHeader("password") String password,
                                    @RequestHeader("roleId") Integer roleId) {
    UserBO userBO = new UserBO();
    userBO.setUsername(username);
    userBO.setPassword(password);
    userBO.setRoleId(roleId);
    return transactionProducerService.sendMsg(userBO);

}

producer:

@Slf4j
@Service("transactionProducerService")
public class TransactionProducerService {

    private static final String PRODUCER_GROUP = "transaction_producer_group";
    private static final String NAMESRV_ADDR = "IP:9876";
    private static final String TOPIC = "transaction_user";

    private TransactionMQProducer producer = null;

    @Autowired
    private TransactionListener transactionListener;

    @PostConstruct
    public void initProducer() {
        producer = new TransactionMQProducer(PRODUCER_GROUP);
        producer.setTransactionListener(transactionListener);
        producer.setNamesrvAddr(NAMESRV_ADDR);
        producer.setRetryTimesWhenSendFailed(3);

        try {
            producer.start();
        } catch (MQClientException e) {
            log.info(e.getMessage());
        }
    }

    public ResultModel<String> sendMsg(UserBO userBO) {
        ObjectMapper objectMapper = new ObjectMapper();
        String userId = UUID.randomUUID().toString().replace("-", "");
        userBO.setId(userId);
        try {
            Message message = new Message(TOPIC, null, userId, objectMapper.writeValueAsBytes(userBO));
            producer.sendMessageInTransaction(message, null);
        } catch (MQClientException | JsonProcessingException e) {
            log.info(e.getMessage());
        }

        log.info("发送事务消息, topic = {}, body = {}", TOPIC, userId);
        return new ResultModel<>();
    }

    @PreDestroy
    public void shutDownProducer() {
        if (producer != null) {
            producer.shutdown();
        }
    }
}

listener:

/**
 * 处理本地事务
 */
@Log
@Service("transactionListener")
public class TransactionListenerImpl implements TransactionListener {

    @Autowired
    private IUserService userService;

    /**
     * 发送消息时,发送线程执行该方法
     * 执行本地事务
     * @param message
     * @param arg
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object arg) {
        log.info("=======开启本地事务=======");
        ObjectMapper objectMapper = new ObjectMapper();
        LocalTransactionState state = LocalTransactionState.UNKNOW;

        try {
            UserBO userBO = objectMapper.readValue(message.getBody(), UserBO.class);
            userService.insertUser(userBO.getId(), userBO.getUsername(), userBO.getPassword(), userBO.getRoleId());
            state = LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            log.info(e.getMessage());
            state = LocalTransactionState.ROLLBACK_MESSAGE;
        }

        return state;
    }

    /**
     * 回查本地事务,查看本地事务的状态
     * @param messageExt
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        log.info("========回查本地事务=======");
        LocalTransactionState state = LocalTransactionState.UNKNOW;
        ObjectMapper objectMapper = new ObjectMapper();

        // 检查本地数据库操作情况
        try {
            UserBO userBO = objectMapper.readValue(messageExt.getBody(), UserBO.class);
            ResultModel<String> resultModel = userService.checkUserExist(userBO.getId());

            if (resultModel.getSuccess()) {
                state = LocalTransactionState.COMMIT_MESSAGE;
            } else {
                state = LocalTransactionState.ROLLBACK_MESSAGE;
            }
        } catch (Exception e) {
            log.info(e.getMessage());
        }

        return state;
    }
}

消费者

@Slf4j
@Service
public class ConsumerService {

    private static final String PRODUCER_GROUP = "transaction_producer_group";
    private static final String NAMESRV_ADDR = "IP:9876";
    private static final String TOPIC = "transaction_user";
    private DefaultMQPushConsumer consumer = null;

    @Autowired
    private ITeacherService teacherService;

    @PostConstruct
    public void initMQConsumer() {
        consumer = new DefaultMQPushConsumer(PRODUCER_GROUP);
        consumer.setNamesrvAddr(NAMESRV_ADDR);
        consumer.setConsumeTimeout(10000);

        try {
            consumer.subscribe(TOPIC, "*");
            consumer.registerMessageListener(new MessageListenerConcurrently() {

                @Override
                public ConsumeConcurrentlyStatus consumeMessage(
                        List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    try {
                        for (MessageExt msg : msgs) {
                            log.info("QueueId Received: " + msg.getQueueId() + " Message Received: " + new String(msg.getBody()));
                            ObjectMapper objectMapper = new ObjectMapper();
                            UserBO userBO = objectMapper.readValue(msg.getBody(), UserBO.class);
                            teacherService.addTeacherInfo(userBO.getId(), userBO.getUsername());
                        }
                    } catch (Exception e) {
                        log.info(e.getMessage());
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }

                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    @PreDestroy
    public void shutDownConsumer() {
        if (consumer != null) {
            consumer.shutdown();
        }
    }
}

思考


  • 如果接收者发生了事务错误,就算重试了很多次,依然失败,怎么办?

思路1:可以将下游失败的事务,记录到数据库表中,利用定时器扫描数据库表,执行失败事务的接口(补偿方式)。

思路2:可以让上游回滚,那么利用传过来的id,将本次涉及到的事务还原(对账方式)。

  • 如果发送者发送消息后,发现由于网络不好,消息发送超时都未收到响应,那么开启重试,重试发送之前相同消息,这样消费者就收到了两份消息。

思路1:消费者业务逻辑处理,譬如消息要我们新增一条记录,那么我们业务逻辑处理就是先去查询,如果该记录已存在,那么我们就不去新增记录。

相关文章

网友评论

      本文标题:rocketmq实现分布式事务

      本文链接:https://www.haomeiwen.com/subject/ytlsgrtx.html