美文网首页
8:RocketMq实战 分布式事务消息架构讲解(文末有项目连

8:RocketMq实战 分布式事务消息架构讲解(文末有项目连

作者: _River_ | 来源:发表于2021-04-19 23:10 被阅读0次
    官方文档:
    http://rocketmq.apache.org/docs/transaction-example/
    
    1:什么是分布式事务:
     单体事务:强一致性 干性事务
     分布式事务:最终一致性性  柔性事务
    
    什么是分布式事务
    来源:单体应用一>拆分为分布式应用
    一个接口需要调用多个服务,且操作不同的数据库,数据一致性难保障,
    
    分布式事务常见解决方案:
         2PC :两阶段提交,基于XA协议
         TCC : Try、Confirms Cancel
    
    2:RocketMQ4.X分布式事务消息架构讲解
    RocketMQ事务消息:
        RocketMQ提供分布事务功能,通过RocketMQ事务消息能达到分布式事务的最终一致 
        
    半消息 Half Message
        Producer已经将消息成功发送到了Broker端,但是服务端未收到生产者对该消息的二次确认,
        此时该消息被标记成“暂不能投递”(暂时不能被Consumer消费)状态,处于该种状态下的消息即半消息。
    
    消息回查:
        由于网络闪断、生产者应用重启等原因,导致某条事务消息的 二次确认步骤(第4步) 丢失,消息队列 
        RocketMQ服务端通过扫描发现某条消息长期处于“半消息”时,
        需要主动向消息生产者询问 该消息的最终状态(Commit或是Rollback),该过程即消息回查。
    
    如果步骤4丢失 则需要执行步骤 5  6  7   
    
    RocketMQ事务消息的状态
        COMMIT_MESSAGE:提交事务消息,消费者可以消费此消息
        ROLLBACK_MESSAGE:回滚事务消息,消息会在broker中删除,消费者不能消费
        UNKNOW: Broker需要回查确认消息的状态
    
    关于事务消息能否成功投递到Broker节点
        事务消息producer端的生产方式和普通消息是一样的,确保消息能发送到Broker节点(具有重试机制)
    
    关于事务消息的消费
        事务消息consumer端的消费方式和普通消息是一样的,RocketMQ能保证消息能被consumer收到
       (消息重试等机制,最后也存在consumer消费失败的情况,这种情况出现的概率极低)
    
    3:RocketMQ4.X分布式事务消息的存在的问题
    RocketMQ4.X分布式事务的解决了什么
    仅是解决 消息的发送方的事务 以及  确保在消息发送方在事务成功之后 把消息投递到订阅端
    
    发送方=生产者 订阅端=消费者
    
    但并不包括 订阅端的事务失败后回滚 发送方的事务
    但并不包括 订阅端的事务失败后回滚 发送方的事务
    但并不包括 订阅端的事务失败后回滚 发送方的事务
    
    而这个对于消息队列来说  其实并不是的问题:
    因为我们的消息队列的目标 就是进行异步削峰
    
    什么是消息队列:
    我发送方的消息 只管发  你订阅端的逻辑成不成功和我有什么关系
    
    如果要两个服务之间的数据库需要实现最终一致性,即是保证两个服务的事务的一致成功或者失败,
    那就是要等两个服务的逻辑都完全走完,事务完全提交,这压根就不是异步削峰了。    
    完全可以采用调接口的方法进行,而不是采用发消息的方式。
    
    4:TransactionProducer
    生产者:
        主要需要把DefaultMQProducer 换成 TransactionMQProducer 
        并设置使用TransactionListener 进行本地事务的监听
    
    注意:消费者完全不用修改(只需要修改订阅的 topic就行了)
    
    @Component
    public class TransactionProducer {
        
    //        DefaultMQProducer就是我们最普通的生产者
    //        DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
    //        TransactionMQProducer 继承了 DefaultMQProducer
        
        public void sendMessageInTransaction() throws MQClientException, InterruptedException {
    
            TransactionMQProducer transactionMQProducer = new TransactionMQProducer();
            //该生产者所在group
            transactionMQProducer.setProducerGroup("transaction_producer_group");
            ///如果是集群模式 以 ; 分开   "IP1:9876;IP2:9876;"
            transactionMQProducer.setNamesrvAddr("47.113.101.241:9876");
            //是否走Vip通道
            transactionMQProducer.setVipChannelEnabled(false);
            //消息同步发送失败重试次数
            transactionMQProducer.setRetryTimesWhenSendFailed(3);
    //        //消息异步发送失败重试次数
            transactionMQProducer.setRetryTimesWhenSendAsyncFailed(3);
    
    
            ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setName("client-transaction-msg-check-thread");
                    return thread;
                }
            });
    
            //使用实现类
            transactionMQProducer.setExecutorService(executorService);
    
            //创建事务实现类
            TransactionListener transactionListener = new TransactionListenerImpl();
            transactionMQProducer.setTransactionListener(transactionListener);
    
            transactionMQProducer.start();
    
            //设置Topic
            String topic = "transaction_test_topic";
    
            String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
            for (int i = 0; i < 10; i++) {
                try {
                    Message msg =
                            new Message(topic, tags[i % tags.length], "KEY" + i,
                                    ("Hello RocketMQ OrderNo: " +i).getBytes(RemotingHelper.DEFAULT_CHARSET));
    
                    //消息在事务里面发送出去了
                    SendResult sendResult = transactionMQProducer.sendMessageInTransaction(msg, null);
                    System.out.printf("%s%n", sendResult);
    
                    Thread.sleep(10);
                } catch (MQClientException | UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
            }
    
            //生产者延时关闭 因为在需要 1分钟内才能进入 UNKNOW状态消息的的回查逻辑
            Thread.sleep(1000 * 60);
            transactionMQProducer.shutdown();
        }
    }
    
    5:TransactionListenerImpl
    TransactionListenerImpl 实现了 TransactionListener 重写了相关逻辑
    executeLocalTransaction 方法进行 本地事务完成后提交 设置Broker里面半成功消息的状态
    checkLocalTransaction  方法用于 当Broker服务认为本地事务的并没有进行提交 或者 提交了UNKNOW后 进行回查
    
    @Slf4j
    public class TransactionListenerImpl implements TransactionListener {
    
        private AtomicInteger transactionIndex = new AtomicInteger(0);
        private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
    
        /**
         * when send transactional prepare(half) message succeed,
         * this method will be invoked to execute local transaction.
         * 当发送半消息成功后
         * 可以设置该消息在broker的状态为
         *      COMMIT_MESSAGE(Broker端直接消费)
         *      ROLLBACK_MESSAGE(Broker端直接回滚)
         *      UNKNOW(1分钟内才能 进入checkLocalTransaction的回查逻辑)
         *      null  (立刻进入反查逻辑)
         *
         * @param msg
         * @param otherParam
         * @return
         */
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object otherParam) {
    
            log.info("executeLocalTransaction:{} ", msg.getTransactionId());
    
            //模拟生成订单号  0 到 9
            //使用orderNo进行各种业务处理  然后返回一个 status
            //这里暂时设置根据orderNo执行各种业务逻辑后  返回的status为如下
            Integer orderNo = transactionIndex.getAndIncrement();
            Integer status = orderNo % 3;
    
            //在这里模拟本地事务的状态
            //有四种状态  UNKNOW  COMMIT_MESSAGE   ROLLBACK_MESSAGE  以及null
            //如果是 UNKNOW 会在00秒后进入checkLocalTransaction 逻辑
            //如果是 null   会立即进入checkLocalTransaction 逻辑
    
            if (null != status) { switch (status) {
                    case 0:
    //                  订单号  0  3  6 9  成功
                        localTrans.put(msg.getTransactionId(), orderNo);
                        return LocalTransactionState.COMMIT_MESSAGE;
                    case 1:
    //                   订单号  1  4  7  失败
                        localTrans.put(msg.getTransactionId(), orderNo);
                        return LocalTransactionState.ROLLBACK_MESSAGE;
                    case 2:
                        //该msg.getTransactionId() 需要在回调时使用
    //                    订单号  2   5   8  进入回调
                        localTrans.put(msg.getTransactionId(), orderNo);
                        return LocalTransactionState.UNKNOW;
                    default:
                        //该msg.getTransactionId() 需要在回调时使用
                        localTrans.put(msg.getTransactionId(), orderNo);
                        return null;
                }
            }
            return null;
        }
    
        /**
         * When no response to prepare(half) message. broker will send check message to check the transaction status,
         * and this method will be invoked to get local transaction status.
         *
         *  在进行发送消息后 到到Broker变成半消息状态
         *  执行完executeLocalTransaction  其状态是  UNKNOW 或者 null  则需要进入回查处理
         *
         * @param msg
         * @return
         */
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
    
            String transactionId = msg.getTransactionId();
            Integer orderNo =  localTrans.get(transactionId);
            log.info("执行的逻辑 checkLocalTransaction:{}  orderNo:{} ",transactionId,orderNo);
    
            //根据orderNo进行逻辑回查处理
            //进行回查的结果 只能是 COMMIT_MESSAGE(成功) 或者  ROLLBACK_MESSAGE(失败)
            //这里暂时设置根据orderNo执行各种业务逻辑后  返回为如下
            if (orderNo % 2 == 0) {
                log.info("执行的逻辑 checkLocalTransaction: {}  orderNo:{},COMMIT_MESSAGE", msg.getTransactionId(),orderNo);
                return LocalTransactionState.COMMIT_MESSAGE;
    
            }else
            {
                log.info("执行的逻辑 checkLocalTransaction: {}  orderNo:{},ROLLBACK_MESSAGE", msg.getTransactionId(),orderNo);
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
    }
    
    6:测试结果
    模拟生成订单号  0 到 9
    在 executeLocalTransaction(本地事务)    订单号  0  3  6 9  成功  修改消息状态为可以被消费
    在 executeLocalTransaction(本地事务)    订单号  1  4  7    失败    这些消息直接被丢弃
    

    订单号  2   5   8  进入回调
    在 checkLocalTransaction(进行反查)    订单号  2  8   经过反查后确认成功    修改消息状态为可以被消费
    在 checkLocalTransaction(进行反查)    订单号  5      经过反查后确认失败    这些消息直接被丢弃
    
    7:transactionMQProducer.sendMessageInTransaction(核心方法)
    1:设置为半消息
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
    
    2:和普通发送消息一样发送
    sendResult = this.send(msg);
    
    3:判断transactionListener 是否为空 不为空则执行重写的 executeLocalTransaction
    if (transactionListener != null) {    
        log.debug("Used new transaction API");    
        localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
    }
    
    4:endTransaction 方法
        事务设置消息相关的状态(4种)
        通过Oneway的方式告诉Borker端 修改里面半消息的状态
    
    5:在一个异步的方法里面 checkLocalTransaction(回查)
         LocalTransactionState 为UNKNOW的状态的消息进行回查
    

    项目连接

    请配合项目代码食用效果更佳:
    项目地址:
    https://github.com/hesuijin/hesuijin-study-project
    Git下载地址:
    https://github.com.cnpmjs.org/hesuijin/hesuijin-study-project.git
    
    rocketmq-module项目模块下 transactionRocketMQ包下
    

    相关文章

      网友评论

          本文标题:8:RocketMq实战 分布式事务消息架构讲解(文末有项目连

          本文链接:https://www.haomeiwen.com/subject/kurblltx.html