美文网首页
RocketMQ分布式事务消息

RocketMQ分布式事务消息

作者: JBryan | 来源:发表于2020-04-27 10:31 被阅读0次

    1、RocketMQ事务消息概念

    RocketMQ事务消息:
    RocketMQ 提供分布事务功能,通过 RocketMQ 事务消息能达到分布式事务的最终一致

    半消息Half Message:
    暂不能投递的消息(暂不能消费),Producer已经将消息成功发送到了Broker端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半消息

    消息回查:
    由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列 RocketMQ 服务端通过扫描发现某条消息长期处于“半消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该过程即消息回查。

    整体交互流程

    事务.jpg

    1、Producer向broker端发送消息。
    2、服务端将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
    3、发送方开始执行本地事务逻辑。
    4、发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半消息,订阅方将不会接受该消息
    5、在断网或者是应用重启的特殊情况下,上述步骤 4 提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查
    6、发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果
    7、发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半消息进行操作

    RocketMQ事务消息的状态
    COMMIT_MESSAGE: 提交事务消息,消费者可以消费此消息
    ROLLBACK_MESSAGE:回滚事务消息,消息会在broker中删除,消费者不能消费
    UNKNOW:Broker需要回查确认消息的状态

    关于事务消息的消费
    事务消息consumer端的消费方式和普通消息是一样的,RocketMQ能保证消息能被consumer收到(消息重试等机制,最后也存在consumer消费失败的情况,这种情况出现的概率极低)。

    2、示例

    SpringBoot整合RocketMQ请移步:https://www.jianshu.com/p/0028969d5e17
    新建TransactionProducer

    package com.ljessie.rocketmqdemo.jms;
    
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.LocalTransactionState;
    import org.apache.rocketmq.client.producer.TransactionListener;
    import org.apache.rocketmq.client.producer.TransactionMQProducer;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.springframework.stereotype.Component;
    
    import java.util.concurrent.*;
    
    @Component
    public class TransactionProducer {
    
    
        private TransactionListener transactionListener = new TransactionListenerImpl();
        private TransactionMQProducer producer = null;
    
        public TransactionProducer(){
            producer = new TransactionMQProducer("transation_pay_producer_group");
            producer.setNamesrvAddr("192.168.10.105:9876;192.168.10.104:9876");
            producer.setTransactionListener(transactionListener);
            //自定义线程池,指定线程名称
            ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setName("client-transaction-msg-check-thread");
                    return thread;
                }
            });
            producer.setExecutorService(executorService);
    
            start();
        }
    
        public TransactionMQProducer getProducer(){
            return this.producer;
        }
    
        public void start(){
            try {
                this.producer.start();
            } catch (MQClientException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 一般在应用上下文,使用上下文监听器,进行关闭
         */
        public void shutDown(){
            this.producer.shutdown();
        }
    
    
    }
    
    class TransactionListenerImpl implements TransactionListener{
    
        /**
         * 执行本地事务
         * @param message
         * @param o:producer.sendMessageInTransaction(msg,o),生产者发送消息时,传递过来的第二个参数
         * @return
         */
        @Override
        public LocalTransactionState executeLocalTransaction(Message message, Object o) {
            System.out.println("--------模拟执行本地事务--------");
            System.out.println(message);
    
            Integer i = Integer.parseInt(o.toString());
            //二次确认消息,然后消费者可以消费
            if(i==1){
                //本地事务执行成功
                return LocalTransactionState.COMMIT_MESSAGE;
            }else if(i==2){
                //本地事务执行失败,回滚消息,broker会删除半消息
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }else{
                //broker端会进行回查消息
                return LocalTransactionState.UNKNOW;
            }
    
        }
    
        /**
         *
         * @param messageExt
         * @return 要么rollback,要么commit
         */
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
            System.out.println("--------进入回查方法--------");
            System.out.println(messageExt);
    
            //根据业务id key检查本地事务消息是否完成
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    }
    

    PayController

    package com.ljessie.rocketmqdemo.controller;
    
    import com.ljessie.rocketmqdemo.domain.ProductOrder;
    import com.ljessie.rocketmqdemo.jms.JMSConfig;
    import com.ljessie.rocketmqdemo.jms.PayProducer;
    import com.ljessie.rocketmqdemo.jms.TransactionProducer;
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.MessageQueueSelector;
    import org.apache.rocketmq.client.producer.SendCallback;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageQueue;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.ArrayList;
    import java.util.List;
    
    @RestController
    @RequestMapping("/api/")
    public class PayController {
    
        @Autowired
        private TransactionProducer transactionProducer;
    
        /**
         * 发送事务消息
         * @param tag
         * @return
         * @throws InterruptedException
         * @throws RemotingException
         * @throws MQClientException
         * @throws MQBrokerException
         */
        @RequestMapping("transaction_test")
        public Object transactionTest(String tag,int arg) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
            Message msg = new Message(JMSConfig.TOPIC,tag,"","transaction_test".getBytes());
            SendResult sendResult = transactionProducer.getProducer().sendMessageInTransaction(msg,arg);
            return  sendResult;
        }
    }
    

    PayConsumer

    package com.ljessie.rocketmqdemo.jms;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
    import org.springframework.stereotype.Component;
    
    import java.io.UnsupportedEncodingException;
    import java.util.List;
    
    @Component
    public class PayConsumer {
    
        private DefaultMQPushConsumer consumer;
    
        public PayConsumer() throws MQClientException {
            consumer = new DefaultMQPushConsumer(JMSConfig.consumerGroup);
            consumer.setNamesrvAddr(JMSConfig.nameServer);
            //从队首开始消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            //订阅主题,第二个参数*代表注册这个主题下面的所有tag
            consumer.subscribe(JMSConfig.TOPIC,"*");
            //默认是集群模式,可以更改为广播,更改为广播之后,不支持重试
    //        consumer.setMessageModel(MessageModel.BROADCASTING);
            //配置消费监听器
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    MessageExt message = list.get(0);
                    int retryTimes = message.getReconsumeTimes();
                    try {
                        System.out.println("重试次数:"+retryTimes);
                        System.out.println("Consumer消费成功:"+new String(message.getBody(),"utf-8"));
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    } catch (Exception e) {
                        System.out.println("消费异常");
                        //如果重复次数超过两次不成功,则人工介入。广播方式不提供重试机制。
                        if(retryTimes >= 2){
                            System.out.println("复次数超过两次不成功,人工介入");
                            //告诉Broker消费成功
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        }
                        e.printStackTrace();
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
    
                }
            });
            consumer.start();
        }
    }
    
    

    相关文章

      网友评论

          本文标题:RocketMQ分布式事务消息

          本文链接:https://www.haomeiwen.com/subject/qkljwhtx.html