一、问题描述
在member服务调用updateMember()
方法修改用户信息,该接口标注了声明式事务@Transactional
,同时,在updateMember()
方法里修改完用户数据后,发送了MQ通知,ES服务通过memberId取用户数据同步用户数据到ES库。
问题是:在发送MQ后,修改用户的事务还未提交(updateMember()
方法还未结束),MQ有可能先于事务提交前到达,也可能在事务提交后到达,如果MQ先到达,MQ消费那里取到的用户数据就会是旧的,导致数据不一致。
二、伪代码
@Service
public class MemberServiceImpl {
@Resource
private MemberDao memberDao;
@Resource
private MemberMqSend memberMqSend;
@Transactional(rollbackFor = Exception.class)
public void updateMember(Member member) {
member.setMemberName("AC");
memberDao.update(member);
//事务还未提交,MQ有可能先于事务提交前到达,也可能在事务提交后到达,如果MQ先到达,MQ消费那里取到的用户数据就会是旧的
memberMqSend.sendMq(member.getId(), "UPDATE");
}
}
@Slf4j
@Service
@RocketMQMessageListener(topic = "member",consumerGroup = "memberEs",messageModel = MessageModel.CLUSTERING)
public class MemberEsSynComponent implements RocketMQListener<MessageExt> {
@Resource
private MemberDao memberDao;
@Resource
private MemberEsDao memberEsDao;
@Override
public void onMessage(MessageExt message) {
UpdateMemberMsg msg = JSONObject.parseObject(message.getBody(), UpdateMemberMsg.class);
if (!"UPDATE".equals(msg.getAction())) {
return;
}
Member member = memberDao.findById(msg.getMemberId());
//更新ES里的Member数据,MQ可能先到达,事务还未提交,所以此处可能取到的是旧的数据
memberEsDao.update(member);
}
}
三、解决方案
将声明式事务改成手动事务,伪代码如下:
@Service
public class MemberServiceImpl {
@Resource
private MemberDao memberDao;
@Resource
private MemberMqSend memberMqSend;
@Resource
private TransactionTemplate template;
public void updateMember(Member member) {
member.setMemberName("AC");
template.execute((TransactionCallback<Object>) transactionStatus -> {
//手动提交事务
memberDao.update(member);
});
//修改member的事务已提交,再发送MQ,MQ消费端再取用户数据就会是新的
memberMqSend.sendMq(member.getId(), "UPDATE");
}
}
网友评论