美文网首页
@TransactionEventListener整合Rabbi

@TransactionEventListener整合Rabbi

作者: prodigalHeart | 来源:发表于2020-09-14 14:59 被阅读0次
前言

最近有反馈一些业务出现数据不一致的问题,在检测代码后,主要是一些业务下RabbitMq使用不当,在DB事务没有提交,数据没落库,但是mq消费者却已经执行情况,最终导致数据不一致。

伪代码

不推荐在service层推送消息,尽管sendMessage放在最后。都有可能消息执行比事务提交快。

@Transactional
public void save(){
      //业务保存
    save(Object obj);
    // 发送mq消息
     sendMessage(Object obj);
    // 或者处理其它。。。
}
整合@TransactionEventListener

Spring提供了一个注解@TransactionEventListener,将这个注解标注在某个方法上,那么就将这个方法声明为了一个事务事件处理器,而具体的事件类型则是由TransactionalEventListener.phase属性进行定义的。以下部分声明:

@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@EventListener
public @interface TransactionalEventListener {

    /**
     * 默认AFTER_COMMIT:当前事务commit之后 
     */
    TransactionPhase phase() default TransactionPhase.AFTER_COMMIT;

    /**
     * 默认false:当前方法如果没有事务,相应的事务事件监听器将不会执行。
     */
    boolean fallbackExecution() default false;
}

/**
 * TransactionPhase 介绍
*/
public enum TransactionPhase {
    // 指定目标方法在事务commit之前执行
    BEFORE_COMMIT,

    // 指定目标方法在事务commit之后执行
    AFTER_COMMIT,

    // 指定目标方法在事务rollback之后执行
    AFTER_ROLLBACK,
    
    // 指定目标方法在事务完成时执行,这里的完成是指无论事务是成功提交还是回滚
    AFTER_COMPLETION
}
方案一改进版

借助TransactionalEventListener,目的让消息在当前事务提交之后推送。

  • 定义消息事件
/**
 * 消息推送事件
 */
public class MessageEvent<T> extends ApplicationEvent {

    /**
     * 交换机
     */
    private String exchange;

    /**
     * 路由key
     */
    private String routingKey;

    public MessageEvent(T source, String routingKey) {
        super(source);
        this.exchange = "";
        this.routingKey = routingKey;
    }

    public MessageEvent(T source, String exchange, String routingKey) {
        super(source);
        this.routingKey = routingKey;
        this.exchange = exchange;
    }
  
    public T getSource() {
        return (T) source;
    }

    public String getExchange() {
        return exchange;
    }

    public String getRoutingKey() {
        return routingKey;
    }
}
  • 消息发布
/**
 * 消息发布
 */
@Component
public class MessagePublisher implements ApplicationContextAware {

    private ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    public void sendMessage(String routingKey, Object object) {
        applicationContext.publishEvent(new MessageEvent(object, routingKey));
    }

    public void sendMessage(String routingKey, String message) {
        applicationContext.publishEvent(new MessageEvent(message, routingKey));
    }

    public void sendMessage(String exchange, String routingKey, Object message) {
        applicationContext.publishEvent(new MessageEvent(message, exchange, routingKey));
    }
}

  • 消息监听
@Slf4j
@Component
public class MessageEventListener {

    @Autowired
    private MqService mqService;

    /**
     * 推送Mq消息
     * 设置fallbackExecution = true,不管前面是否开启事务,事务事件监听器都执行
     */
    @TransactionalEventListener(fallbackExecution = true)
    public void consumer(MessageEvent notifyEvent) {
        mqService.sendMessage(notifyEvent.getExchange(), notifyEvent.getRoutingKey(), notifyEvent.getSource());
    }
}
  • 模拟TransactionalEventListener打印结果
    @Autowired
    private MessagePublisher messagePublisher;

    @Transactional
    public void save(Record record) throws InterruptedException {
        this.insertSelective(record);
        messagePublisher.publishMessage(new MessageEvent(record, MessageContant.EXCHANGE_NAME, MessageContant.ROUTING_NAME));

        Thread.sleep(2000L);
        log.info("--------------》 执行其它业务结束");
    }
5f1a78cb2a7ac.png
方案二-改造版
@Component
public class MqServiceImpl implements ApplicationContextAware,MqService {

    private ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
    
    @Override
    public void sendMessage(String routingKey, Object object) {
         applicationContext.publishEvent(new MessageEvent(object, routingKey));
    }

    @Override
    public void sendMessage(String routingKey, String message) {
         applicationContext.publishEvent(new MessageEvent(message, routingKey));
    }

    @Override
    public void sendMessage(String exchange, String routingKey, Object message) {
        applicationContext.publishEvent(new MessageEvent(message, exchange, routingKey));
    }
}
总结

一般推荐写法,消息跟事务独立开来,先提交事务,然后发送mq,或者放在controller层。引入@TransactionEventListener对目前已有代码改造方便、修改代码最少。

相关文章

网友评论

      本文标题:@TransactionEventListener整合Rabbi

      本文链接:https://www.haomeiwen.com/subject/dwlzektx.html