最终一致性
指的是分布式事务处理过程中,不保证多个机器的每时每刻值一致,保证最终多个事务处理的结果,达到一致。
业务场景
- 当前基层医疗系统中,添加健康服务VIP用户(新增用户信息+新增用户服务)。
- 我们对于健康档案系统里面新增一位居民用户。
- 同时还要给这VIP用户添加服务,如65岁以上的老年人,我们还要新增随访服务,以及慢病管理服务等。或者孕产妇,我们也要新增随访服务,身体数据动态检测服务以及产后护理服务等。所以这就要我们在全过程都要保证事务最终一致性。
rocketmq实现思路

- 发送者先发送半消息给broker(MQ server),消息里面存放业务需要的信息。
- broker(MQ server)接受半消息成功后给发送者返回成功的通知。
- 发送者接收到成功通知后,开启本地事务(执行executeLocalTransaction方法)。
- 如果本地事务成功, 那么发送者通知broker把半消息推送到接收者,否则取消半消息的推送。
- 如果返回失败,那么通过执行checkLocalTransaction方法,回查事务状态。
- 检查本地数据库提交结果,rollback或者commit,回查结果返回给broker。
- 如果半消息被commit,接收者收到了消息,消费该消息,执行接收者自身本地事务。如果消息消费失败,可以重试,保障数据最终一致性。
rocketmq实现分布式事务
实现之前先将用到的几个对象说明:
方法名 | 说明 |
---|---|
TransactionMQProducer | 半消息生产者 |
TransactionListener | 监听器,处理本地事务 |
LocalTransactionState | 标记事务状态 |
生产者
controller入口:
@PostMapping("/save")
public ResultModel<String> saveUser(@RequestParam("username") String username,
@RequestHeader("password") String password,
@RequestHeader("roleId") Integer roleId) {
UserBO userBO = new UserBO();
userBO.setUsername(username);
userBO.setPassword(password);
userBO.setRoleId(roleId);
return transactionProducerService.sendMsg(userBO);
}
producer:
@Slf4j
@Service("transactionProducerService")
public class TransactionProducerService {
private static final String PRODUCER_GROUP = "transaction_producer_group";
private static final String NAMESRV_ADDR = "IP:9876";
private static final String TOPIC = "transaction_user";
private TransactionMQProducer producer = null;
@Autowired
private TransactionListener transactionListener;
@PostConstruct
public void initProducer() {
producer = new TransactionMQProducer(PRODUCER_GROUP);
producer.setTransactionListener(transactionListener);
producer.setNamesrvAddr(NAMESRV_ADDR);
producer.setRetryTimesWhenSendFailed(3);
try {
producer.start();
} catch (MQClientException e) {
log.info(e.getMessage());
}
}
public ResultModel<String> sendMsg(UserBO userBO) {
ObjectMapper objectMapper = new ObjectMapper();
String userId = UUID.randomUUID().toString().replace("-", "");
userBO.setId(userId);
try {
Message message = new Message(TOPIC, null, userId, objectMapper.writeValueAsBytes(userBO));
producer.sendMessageInTransaction(message, null);
} catch (MQClientException | JsonProcessingException e) {
log.info(e.getMessage());
}
log.info("发送事务消息, topic = {}, body = {}", TOPIC, userId);
return new ResultModel<>();
}
@PreDestroy
public void shutDownProducer() {
if (producer != null) {
producer.shutdown();
}
}
}
listener:
/**
* 处理本地事务
*/
@Log
@Service("transactionListener")
public class TransactionListenerImpl implements TransactionListener {
@Autowired
private IUserService userService;
/**
* 发送消息时,发送线程执行该方法
* 执行本地事务
* @param message
* @param arg
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object arg) {
log.info("=======开启本地事务=======");
ObjectMapper objectMapper = new ObjectMapper();
LocalTransactionState state = LocalTransactionState.UNKNOW;
try {
UserBO userBO = objectMapper.readValue(message.getBody(), UserBO.class);
userService.insertUser(userBO.getId(), userBO.getUsername(), userBO.getPassword(), userBO.getRoleId());
state = LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
log.info(e.getMessage());
state = LocalTransactionState.ROLLBACK_MESSAGE;
}
return state;
}
/**
* 回查本地事务,查看本地事务的状态
* @param messageExt
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
log.info("========回查本地事务=======");
LocalTransactionState state = LocalTransactionState.UNKNOW;
ObjectMapper objectMapper = new ObjectMapper();
// 检查本地数据库操作情况
try {
UserBO userBO = objectMapper.readValue(messageExt.getBody(), UserBO.class);
ResultModel<String> resultModel = userService.checkUserExist(userBO.getId());
if (resultModel.getSuccess()) {
state = LocalTransactionState.COMMIT_MESSAGE;
} else {
state = LocalTransactionState.ROLLBACK_MESSAGE;
}
} catch (Exception e) {
log.info(e.getMessage());
}
return state;
}
}

消费者
@Slf4j
@Service
public class ConsumerService {
private static final String PRODUCER_GROUP = "transaction_producer_group";
private static final String NAMESRV_ADDR = "IP:9876";
private static final String TOPIC = "transaction_user";
private DefaultMQPushConsumer consumer = null;
@Autowired
private ITeacherService teacherService;
@PostConstruct
public void initMQConsumer() {
consumer = new DefaultMQPushConsumer(PRODUCER_GROUP);
consumer.setNamesrvAddr(NAMESRV_ADDR);
consumer.setConsumeTimeout(10000);
try {
consumer.subscribe(TOPIC, "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
log.info("QueueId Received: " + msg.getQueueId() + " Message Received: " + new String(msg.getBody()));
ObjectMapper objectMapper = new ObjectMapper();
UserBO userBO = objectMapper.readValue(msg.getBody(), UserBO.class);
teacherService.addTeacherInfo(userBO.getId(), userBO.getUsername());
}
} catch (Exception e) {
log.info(e.getMessage());
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
@PreDestroy
public void shutDownConsumer() {
if (consumer != null) {
consumer.shutdown();
}
}
}

思考
- 如果接收者发生了事务错误,就算重试了很多次,依然失败,怎么办?
思路1:可以将下游失败的事务,记录到数据库表中,利用定时器扫描数据库表,执行失败事务的接口(补偿方式)。
思路2:可以让上游回滚,那么利用传过来的id,将本次涉及到的事务还原(对账方式)。
- 如果发送者发送消息后,发现由于网络不好,消息发送超时都未收到响应,那么开启重试,重试发送之前相同消息,这样消费者就收到了两份消息。
思路1:消费者业务逻辑处理,譬如消息要我们新增一条记录,那么我们业务逻辑处理就是先去查询,如果该记录已存在,那么我们就不去新增记录。
网友评论