前言
最近有反馈一些业务出现数据不一致的问题,在检测代码后,主要是一些业务下RabbitMq使用不当,在DB事务没有提交,数据没落库,但是mq消费者却已经执行情况,最终导致数据不一致。
伪代码
不推荐在service层推送消息,尽管sendMessage放在最后。都有可能消息执行比事务提交快。
@Transactional
public void save(){
//业务保存
save(Object obj);
// 发送mq消息
sendMessage(Object obj);
// 或者处理其它。。。
}
整合@TransactionEventListener
Spring提供了一个注解@TransactionEventListener,将这个注解标注在某个方法上,那么就将这个方法声明为了一个事务事件处理器,而具体的事件类型则是由TransactionalEventListener.phase属性进行定义的。以下部分声明:
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@EventListener
public @interface TransactionalEventListener {
/**
* 默认AFTER_COMMIT:当前事务commit之后
*/
TransactionPhase phase() default TransactionPhase.AFTER_COMMIT;
/**
* 默认false:当前方法如果没有事务,相应的事务事件监听器将不会执行。
*/
boolean fallbackExecution() default false;
}
/**
* TransactionPhase 介绍
*/
public enum TransactionPhase {
// 指定目标方法在事务commit之前执行
BEFORE_COMMIT,
// 指定目标方法在事务commit之后执行
AFTER_COMMIT,
// 指定目标方法在事务rollback之后执行
AFTER_ROLLBACK,
// 指定目标方法在事务完成时执行,这里的完成是指无论事务是成功提交还是回滚
AFTER_COMPLETION
}
方案一改进版
借助TransactionalEventListener,目的让消息在当前事务提交之后推送。
- 定义消息事件
/**
* 消息推送事件
*/
public class MessageEvent<T> extends ApplicationEvent {
/**
* 交换机
*/
private String exchange;
/**
* 路由key
*/
private String routingKey;
public MessageEvent(T source, String routingKey) {
super(source);
this.exchange = "";
this.routingKey = routingKey;
}
public MessageEvent(T source, String exchange, String routingKey) {
super(source);
this.routingKey = routingKey;
this.exchange = exchange;
}
public T getSource() {
return (T) source;
}
public String getExchange() {
return exchange;
}
public String getRoutingKey() {
return routingKey;
}
}
- 消息发布
/**
* 消息发布
*/
@Component
public class MessagePublisher implements ApplicationContextAware {
private ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
public void sendMessage(String routingKey, Object object) {
applicationContext.publishEvent(new MessageEvent(object, routingKey));
}
public void sendMessage(String routingKey, String message) {
applicationContext.publishEvent(new MessageEvent(message, routingKey));
}
public void sendMessage(String exchange, String routingKey, Object message) {
applicationContext.publishEvent(new MessageEvent(message, exchange, routingKey));
}
}
- 消息监听
@Slf4j
@Component
public class MessageEventListener {
@Autowired
private MqService mqService;
/**
* 推送Mq消息
* 设置fallbackExecution = true,不管前面是否开启事务,事务事件监听器都执行
*/
@TransactionalEventListener(fallbackExecution = true)
public void consumer(MessageEvent notifyEvent) {
mqService.sendMessage(notifyEvent.getExchange(), notifyEvent.getRoutingKey(), notifyEvent.getSource());
}
}
- 模拟TransactionalEventListener打印结果
@Autowired
private MessagePublisher messagePublisher;
@Transactional
public void save(Record record) throws InterruptedException {
this.insertSelective(record);
messagePublisher.publishMessage(new MessageEvent(record, MessageContant.EXCHANGE_NAME, MessageContant.ROUTING_NAME));
Thread.sleep(2000L);
log.info("--------------》 执行其它业务结束");
}
5f1a78cb2a7ac.png
方案二-改造版
@Component
public class MqServiceImpl implements ApplicationContextAware,MqService {
private ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Override
public void sendMessage(String routingKey, Object object) {
applicationContext.publishEvent(new MessageEvent(object, routingKey));
}
@Override
public void sendMessage(String routingKey, String message) {
applicationContext.publishEvent(new MessageEvent(message, routingKey));
}
@Override
public void sendMessage(String exchange, String routingKey, Object message) {
applicationContext.publishEvent(new MessageEvent(message, exchange, routingKey));
}
}
总结
一般推荐写法,消息跟事务独立开来,先提交事务,然后发送mq,或者放在controller层。引入@TransactionEventListener对目前已有代码改造方便、修改代码最少。
网友评论