美文网首页
@TransactionEventListener整合Rabbi

@TransactionEventListener整合Rabbi

作者: prodigalHeart | 来源:发表于2020-09-14 14:59 被阅读0次
    前言

    最近有反馈一些业务出现数据不一致的问题,在检测代码后,主要是一些业务下RabbitMq使用不当,在DB事务没有提交,数据没落库,但是mq消费者却已经执行情况,最终导致数据不一致。

    伪代码

    不推荐在service层推送消息,尽管sendMessage放在最后。都有可能消息执行比事务提交快。

    @Transactional
    public void save(){
          //业务保存
        save(Object obj);
        // 发送mq消息
         sendMessage(Object obj);
        // 或者处理其它。。。
    }
    
    整合@TransactionEventListener

    Spring提供了一个注解@TransactionEventListener,将这个注解标注在某个方法上,那么就将这个方法声明为了一个事务事件处理器,而具体的事件类型则是由TransactionalEventListener.phase属性进行定义的。以下部分声明:

    @Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @EventListener
    public @interface TransactionalEventListener {
    
        /**
         * 默认AFTER_COMMIT:当前事务commit之后 
         */
        TransactionPhase phase() default TransactionPhase.AFTER_COMMIT;
    
        /**
         * 默认false:当前方法如果没有事务,相应的事务事件监听器将不会执行。
         */
        boolean fallbackExecution() default false;
    }
    
    /**
     * TransactionPhase 介绍
    */
    public enum TransactionPhase {
        // 指定目标方法在事务commit之前执行
        BEFORE_COMMIT,
    
        // 指定目标方法在事务commit之后执行
        AFTER_COMMIT,
    
        // 指定目标方法在事务rollback之后执行
        AFTER_ROLLBACK,
        
        // 指定目标方法在事务完成时执行,这里的完成是指无论事务是成功提交还是回滚
        AFTER_COMPLETION
    }
    
    方案一改进版

    借助TransactionalEventListener,目的让消息在当前事务提交之后推送。

    • 定义消息事件
    /**
     * 消息推送事件
     */
    public class MessageEvent<T> extends ApplicationEvent {
    
        /**
         * 交换机
         */
        private String exchange;
    
        /**
         * 路由key
         */
        private String routingKey;
    
        public MessageEvent(T source, String routingKey) {
            super(source);
            this.exchange = "";
            this.routingKey = routingKey;
        }
    
        public MessageEvent(T source, String exchange, String routingKey) {
            super(source);
            this.routingKey = routingKey;
            this.exchange = exchange;
        }
      
        public T getSource() {
            return (T) source;
        }
    
        public String getExchange() {
            return exchange;
        }
    
        public String getRoutingKey() {
            return routingKey;
        }
    }
    
    • 消息发布
    /**
     * 消息发布
     */
    @Component
    public class MessagePublisher implements ApplicationContextAware {
    
        private ApplicationContext applicationContext;
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = applicationContext;
        }
    
        public void sendMessage(String routingKey, Object object) {
            applicationContext.publishEvent(new MessageEvent(object, routingKey));
        }
    
        public void sendMessage(String routingKey, String message) {
            applicationContext.publishEvent(new MessageEvent(message, routingKey));
        }
    
        public void sendMessage(String exchange, String routingKey, Object message) {
            applicationContext.publishEvent(new MessageEvent(message, exchange, routingKey));
        }
    }
    
    
    • 消息监听
    @Slf4j
    @Component
    public class MessageEventListener {
    
        @Autowired
        private MqService mqService;
    
        /**
         * 推送Mq消息
         * 设置fallbackExecution = true,不管前面是否开启事务,事务事件监听器都执行
         */
        @TransactionalEventListener(fallbackExecution = true)
        public void consumer(MessageEvent notifyEvent) {
            mqService.sendMessage(notifyEvent.getExchange(), notifyEvent.getRoutingKey(), notifyEvent.getSource());
        }
    }
    
    • 模拟TransactionalEventListener打印结果
        @Autowired
        private MessagePublisher messagePublisher;
    
        @Transactional
        public void save(Record record) throws InterruptedException {
            this.insertSelective(record);
            messagePublisher.publishMessage(new MessageEvent(record, MessageContant.EXCHANGE_NAME, MessageContant.ROUTING_NAME));
    
            Thread.sleep(2000L);
            log.info("--------------》 执行其它业务结束");
        }
    
    5f1a78cb2a7ac.png
    方案二-改造版
    @Component
    public class MqServiceImpl implements ApplicationContextAware,MqService {
    
        private ApplicationContext applicationContext;
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = applicationContext;
        }
        
        @Override
        public void sendMessage(String routingKey, Object object) {
             applicationContext.publishEvent(new MessageEvent(object, routingKey));
        }
    
        @Override
        public void sendMessage(String routingKey, String message) {
             applicationContext.publishEvent(new MessageEvent(message, routingKey));
        }
    
        @Override
        public void sendMessage(String exchange, String routingKey, Object message) {
            applicationContext.publishEvent(new MessageEvent(message, exchange, routingKey));
        }
    }
    
    总结

    一般推荐写法,消息跟事务独立开来,先提交事务,然后发送mq,或者放在controller层。引入@TransactionEventListener对目前已有代码改造方便、修改代码最少。

    相关文章

      网友评论

          本文标题:@TransactionEventListener整合Rabbi

          本文链接:https://www.haomeiwen.com/subject/dwlzektx.html