美文网首页我的微服务
Cloud-借助消息队列解决分布式事务

Cloud-借助消息队列解决分布式事务

作者: 享学课堂 | 来源:发表于2020-02-06 13:41 被阅读0次

    作者:享学课堂讲师
    转发请注明出处

    前言

    这次先介绍一下RabbitMQ的基本概念

    核心概念


    重要组成部分说明如下:
    Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue。(邮局部门/快递公司)
    Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
    (邮局的快递员/快递公司快递员)
    Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。(收件人家的信箱/菜鸟驿站)
    Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。(寄信人/寄货人)
    Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。(收信人/收货人)

    -----发送消息-----
    1、生产者和Broker建立TCP连接。
    2、生产者和Broker建立通道。
    3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
    4、Exchange将消息转发到指定的Queue(队列)

    ----接收消息-----
    1、消费者和Broker建立TCP连接
    2、消费者和Broker建立通道
    3、消费者监听指定的Queue(队列)
    4、当有消息到达Queue时Broker默认将消息推送给消费者。
    5、消费者接收到消息。

    工作流程

    生产者将数据通过RabbitMQ-client发送到RabbitMQ-server中的exchange,exchange根据路由配置,分发给Queue,消费者从Queue拿到数据

    分布式事务的产生

    多个系统相互配合工作,产生数据一致性问题。
    例如外卖场景中,下单中心,运单中心两个系统要配合工作,必须保证两个系统数据一致性。
    错误的解决方案:
    使用API接口调用,下单中心插入数据,调用运单中心的API接口处理数据,并启动事务回滚。
    咋一看这场景没有什么问题,毕竟有事务回滚,一起成功一起失败,但其实存在API调用超时的情况,此时下单中心以为调用失败回滚,而运单中心只是超时仍会继续执行程序,从而造成两个系统数据不一致。
    假设API调用成功,也有可能是在订单中心提交事务时失败了,此时订单中心回滚,而API已经调用,下单中心的数据已经产生,数据不一致。

    使用消息队列解决分布式事务


    问题的核心就是保证可靠生产与可靠消费
    可靠生产:下单中心处理数据和状态表更改应该保证事务一致。生产者往消息队列发送数据时,在本地建立一张状态表,看是否成功发送给队列。利用RabbitMQ的确认机制看是否重发还是定时扫描状态表重发,保证可靠生产。兜底方案还是定时扫描状态表。
    代码
    /**
         * 分单处理队列
         */
        public static final String QUEUE_NAME_TRANSACTION = "xucheng.distribute.queue";
        
        /**
         * 分单处理交换机
         */
        public static final String EXCHANGE_NAME_TRANSACTION = "xucheng.distribute.exchange";
        
        /**
         * 消息队列服务进程,此进程包括两个部分:Exchange和Queue。
         */
        public static final String ROUTE_NAME_TRANSACTION = "xucheng.distribute.route";
        
        
        /**
         * 补单队列
         */
        //public static final String CREATE_QUEUE_NAME_TRANSACTION = "xucheng.order.reCreate.queue";
     
        /**
         * 1、交换机绑定到分单队列
         * @return
         */
         @Bean
         public DirectExchange transExchange() {
             return new DirectExchange(EXCHANGE_NAME_TRANSACTION);
         }
    
        /**
         *2、分单队列
         *
         * @return
         */
        @Bean
        public Queue scoreQueue() {
            return new Queue(QUEUE_NAME_TRANSACTION, true);
        }
       
         /**
          * 3、Binding
          * @return
          */
        @Bean
        public Binding bindingExchangeOrderReceiverQueue() {
            //通过Binding将Exchange与Queue关联起来。  
            return BindingBuilder.bind(scoreQueue()).to(transExchange()).with(ROUTE_NAME_TRANSACTION);
    }
    
    

    可靠消费:消费端开启手动ACK,给MQ队列发送确认信息。对每条数据进行记录,一旦有异常记录可以试着去重试重新要求MQ再发数据,但不能重试太多次。可以将数据主键插入数据库,这样同一数据就不会执行两次,或者使用redis记录数据操作。
    代码:

    喜欢请关注「享学课堂online」官方账号
    第一时间获取最新资讯信息哟~

    相关文章

      网友评论

        本文标题:Cloud-借助消息队列解决分布式事务

        本文链接:https://www.haomeiwen.com/subject/xaoyxhtx.html