美文网首页
RocketMQ分布式事务消息 代码

RocketMQ分布式事务消息 代码

作者: Zal哥哥 | 来源:发表于2020-12-28 11:07 被阅读0次

    1. 分布式事务消息介绍

    简单的说,就是一次大的操作由不同的小操作组成,这些小的操作分布在不同的服务器上,且属于不同的应用,分布式事务需要保证这些小操作要么全部成功,要么全部失败。

    本质上来说,分布式事务就是为了保证不同数据库的数据一致性。

    2. RocketMQ4.X分布式事务消息架构讲解

    • RocketMQ事务消息:
      RocketMQ提供分布式事务功能,通过RocketMQ事务消息能达到分布式事务的最终一致性

    • 半消息HalfMessage:
      暂不能投递的消息(暂不能消费),Producer已经将消息成功发送Broker端,但是服务端未收到生产者对消息的二次确认,此时该消息被标记成"暂不能投递状态",处于该种状态下的消息即半消息

    • 消息回查:
      由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列RocketMQ服务端通过扫描发现某条消息长期处于“半消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback),该过程即消息回查。

    • 整体交互流程:


      在这里插入图片描述

      1. Producer向broker端发送消息
      2. 服务端将消息持久化成功之后,向发送方ACK确认消息已经发送成功,此时消息为半消息
      3. 发送方开始执行本地事务逻辑
      4. 发送方根据本地事务执行结果向服务端提交二次确认(Commit或者Rollback),服务端收到Commit状态则将半消息标记为可投递,订阅方最终将收到该消息;服务端收到Rollback状态则删除半消息,订阅方将不会接受该消息
      5. 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查
      6. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果
      7. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照4对半消息进行操作

    • RocketMQ事务消息的状态:
      1. COMMIT_MESSAGE: 提交事务消息,消费者可以消费此消息
      2. ROLLBACK_MESSAGE:回滚事务消息,消息会在broker中删除,消费者不能消费
      3. UNKNOW:Broker需要回查确认消息的状态

    • 关于事务消息的消费:
      事务消息consumer端的消费方式和普通消息是一样的,RocketMQ能保证消息能被consumer收到(消息重试机制,最后也存在consumer消费失败的情况,这种情况出现的概率极低,而且消费端消费失败使用之前博客中讲解的失败重试机制)

    3. 代码实现

    3.1 Producer代码

    package com.pj.boot.jms;
    
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.LocalTransactionState;
    import org.apache.rocketmq.client.producer.TransactionListener;
    import org.apache.rocketmq.client.producer.TransactionMQProducer;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.springframework.stereotype.Component;
    
    import java.util.concurrent.*;
    
    @Component
    public class TransacationProducer {
    
        private String producerGroup = "trac_producer_group";
        // 事务监听器,执行本地事务
        TransactionListener transactionListener = new TransactionListenerImpl();
    
        TransactionMQProducer producer = null;
    
        // 创建自定义线程池
        /**
         * @param corePoolSize   池中所保存的核心线程数
         * @param maximumPoolSize   池中允许的最大线程池
         * @param keepActiveTime    非核心线程空闲等待新任务的最长时间
         * @param timeunit          keepActiveTime参数的时间单位
         * @param blockingqueue     队列任务
         */
        private ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100,
                TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory()
        {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });
    
        public TransacationProducer(){
    
            producer = new TransactionMQProducer(producerGroup);
    
            //指定NameServer地址,多个地址以 ; 隔开
            //如 producer.setNamesrvAddr("192.168.100.141:9876;192.168.100.142:9876;192.168.100.149:9876");
    
            producer.setNamesrvAddr(JmsConfig.NAME_SERVER);
            producer.setExecutorService(executorService);
            producer.setTransactionListener(transactionListener);
            start();
        }
    
        public TransactionMQProducer getProducer(){
            return this.producer;
        }
    
        /**
         * 对象在使用之前必须要调用一次,只能初始化一次
         */
        public void start(){
            try {
                this.producer.start();
            } catch (MQClientException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 一般在应用上下文,使用上下文监听器,进行关闭
         */
        public void shutdown(){
            this.producer.shutdown();
        }
    }
    
    class TransactionListenerImpl implements TransactionListener {
        /**
         * 半消息发送成功触发此方法来执行本地事务
         * @param message  消息
         * @param o 发送消息时传递的参数
         * @return
         */
        @Override
        public LocalTransactionState executeLocalTransaction(Message message, Object o) {
            System.out.println("====executeLocalTransaction=======");
            String body = new String(message.getBody());
            String key = message.getKeys();
            String transactionId = message.getTransactionId();
            System.out.println("transactionId="+transactionId+", key="+key+", body="+body);
            // 执行本地事务begin TODO
    
            // 执行本地事务end TODO
    
            int status = Integer.parseInt(o.toString());
    
            //二次确认消息,然后消费者可以消费
            if(status == 1){
                return LocalTransactionState.COMMIT_MESSAGE;
            }
    
            //回滚消息,broker端会删除半消息
            if(status == 2){
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
    
            //broker端会进行回查消息,再或者什么都不响应
            if(status == 3){
                return LocalTransactionState.UNKNOW;
            }
            return null;
        }
    
        /**
         * 当没有响应时准备(半)消息。broker将发送检查消息来检查事务状态,并将调用此方法来获取本地事务状态。broker回查本地事务
         * @param messageExt
         * @return
         */
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
            System.out.println("====checkLocalTransaction=======");
            String body = new String(messageExt.getBody());
            String key = messageExt.getKeys();
            String transactionId = messageExt.getTransactionId();
            System.out.println("transactionId="+transactionId+", key="+key+", body="+body);
    
            //要么commit 要么rollback
    
            //可以根据key去检查本地事务消息是否完成
    
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    }
    
    

    3.2 Consumer代码

    package com.pj.boot.jms;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.MessageSelector;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.springframework.stereotype.Component;
    
    import java.io.UnsupportedEncodingException;
    import java.util.List;
    
    @Component
    public class PayConsumer {
    
        private DefaultMQPushConsumer consumer;
    
        private String consumerGroup = "pay_consumer_group";
    
        public  PayConsumer() throws MQClientException {
    
            consumer = new DefaultMQPushConsumer(consumerGroup);
            consumer.setNamesrvAddr(JmsConfig.NAME_SERVER); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    
            consumer.subscribe(JmsConfig.TOPIC, "*");
    
            consumer.registerMessageListener( new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    try {
                        Message msg = msgs.get(0);
                        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
    
                        String topic = msg.getTopic();
                        String body = new String(msg.getBody(), "utf-8");
                        String tags = msg.getTags();
                        String keys = msg.getKeys();
                        System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);
    
                        // 告诉broker消息消费成功
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    } catch (UnsupportedEncodingException e) {
    
                        e.printStackTrace();
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
            });
    
            consumer.start();
            System.out.println("consumer start ...");
        }
    
    }
    
    

    3.3 发送消息

     // 生产时建议再加一个key值
            Message message = new Message(JmsConfig.TOPIC,tag, ("hello xdclass rocketmq = "+tag).getBytes() );
            /**
             * 发送半消息
             * 第一个参数:消息
             * 第二个参数:param,消息回查时会使用到
             */
            SendResult sendResult = transacationProducer.getProducer().sendMessageInTransaction(message, otherParam);
    
    

    3.4 注意

    TransactionMQProducer的groupName要唯一,不能和普通的producer一样

    https://blog.csdn.net/pjsdsg/article/details/104326323

    相关文章

      网友评论

          本文标题:RocketMQ分布式事务消息 代码

          本文链接:https://www.haomeiwen.com/subject/btpznktx.html