美文网首页消息专题RocketMQ
十、RocketMQ Transaction example 事

十、RocketMQ Transaction example 事

作者: ASD_92f7 | 来源:发表于2019-04-12 15:32 被阅读33次

    一、概述

    参考资料:
    http://rocketmq.apache.org/docs/transaction-example/
    官网上说,是一个两阶段事务提交的实现
    使用限制:
    1、事务消息不支持批量或者延迟消息
    2、对于UNKNOW的消息,RocketMQ会尝试回调15次,超过15次会扔掉消息,并记录错误日志;可以修改broker的transactionCheckMax属性来改变这个次数,如果不想扔掉消息(或者想做一些其他的操作),也可以重写AbstractTransactionCheckListener类来修改默认的扔掉消息的行为。
    3、如果返回状态为UNKNOW,那么会每 60秒 执行一次事务状态检查回调函数,这个时间可以通过修改broker的transactionTimeout参数来改变,或者在发送消息的时候修改用户参数:CHECK_IMMUNITY_TIME_IN_SECONDS,这个参数优先于transactionTimeout。
    4、事务消息可能被检查或者消费多次?
    5、反正RocketMQ开源版本还是会丢数据,如果你一点儿都不想丢,建议使用同步的 double wirte机制。
    6、Producer 发送者的ID不能和其他非事务型生产者的ID共用

    二、RocketMQ事务的三种状态

    1、LocalTransactionState.UNKNOW
    未知状态,RocketMQ会调用事务状态检查函数(checkLocalTransaction),来最终决定状态
    2、LocalTransactionState.COMMIT_MESSAGE:事务成功提交,消息会被发送并消费
    3、LocalTransactionState.ROLLBACK_MESSAGE:事务回滚,消息会被扔掉

    三、流程图

    流程图

    四、创建一个事务消息发送类 TransactionProducer

    与之前例子不同的地方有三个:
    1、使用TransactionMQProducer来声明生产者
    2、需要新建一个线程池 executorService ,并绑定到 producer
    3、需要声明一个TransactionListener ,并绑定到 producer,这个类会在后面说明
    还需要注意的是,发送完消息后,不要立即关闭 producer,否则无法测试回调的情况,这里就简单调用 Thread.sleep(1000 * 60 * 60);

    package com.asd.rocket.controller.transaction;
    
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.client.producer.TransactionListener;
    import org.apache.rocketmq.client.producer.TransactionMQProducer;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    import java.io.UnsupportedEncodingException;
    import java.util.concurrent.*;
    
    /**
     * @author zhangluping@sinosoft.com.cn
     * @date 2019/4/12 10:00
     */
    public class TransactionProducer {
    
        public static void main(String[] args) throws MQClientException, InterruptedException {
            TransactionListener transactionListener = new TransactionListenerImpl();
            TransactionMQProducer producer = new TransactionMQProducer("tg");
            ExecutorService executorService = new ThreadPoolExecutor(2, 5,
                    100, TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(2000), r -> {
                        Thread thread = new Thread(r);
                        thread.setName("client-transaction-msg-check-thread");
                        return thread;
            });
            producer.setNamesrvAddr("10.1.11.155:9876");
            producer.setExecutorService(executorService);
            producer.setTransactionListener(transactionListener);
            producer.start();
    
            String[] tags = new String[] {"TagA"};
            for (int i = 0; i < 1; i++) {
                try {
                    Message msg =
                            new Message("qqq", tags[i % tags.length], "KEY" + i,
                                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                    System.out.printf("%s%n", sendResult);
    
                    Thread.sleep(10);
                } catch (MQClientException | UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
            }
            // 保持会儿 对于UNKNOW的消息需要在1分钟后回调
            Thread.sleep(1000 * 60 * 60);
            producer.shutdown();
        }
    }
    

    五、TransactionListener 事务监听类

    使用一个ConcurrentHashMap来模拟存储事务执行的状态,并返回UNKONW状态,在生产中,由于网络问题或者其他问题,会产生UNKONW状态,这里直接模拟这种状态,当然也可以直接返回COMMIT_MESSAGE或者ROLLBACK_MESSAGE,如果返回后面这两种状态,checkLocalTransaction回调函数不会执行。
    这个类主要有两个方法:
    executeLocalTransaction:本地事务执行方法,系统的业务逻辑(扣钱加钱等事务性操作)
    checkLocalTransaction:本地事务状态检查回调方法,模拟从ConcurrentHashMap获取本地事务的执行情况,并根据执行情况决定最终的事务提交状态
    可以修改 localTrans.put(msg.getTransactionId(), status1); 这行的状态,来测试各种情况:
    1、status0 = 0; 消息会回调15次,然后扔掉,消费者收不到消息;
    2、status1 = 1; 消息会在第一次回调后,发送给消费者并被消费;
    3、status2 = 2; 消息会在第一次回调后,扔掉消息,消费者收不到消息。

    package com.asd.rocket.controller.transaction;
    
    import org.apache.rocketmq.client.producer.LocalTransactionState;
    import org.apache.rocketmq.client.producer.TransactionListener;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageExt;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     * @author zhangluping@sinosoft.com.cn
     * @date 2019/4/12 10:03
     */
    public class TransactionListenerImpl implements TransactionListener {
        /**
         * 生产环境可以考虑存储到Redis 或者 MongoDB中
         */
        private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
    
        /**
         * 执行本地方法
         * @param msg 消息内容
         * @param arg 参数
         * @return 提交状态 可选:COMMIT_MESSAGE,ROLLBACK_MESSAGE, UNKNOW
         */
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            System.out.println("执行本地事务!"+" 事务ID:"+msg.getTransactionId());
            // 模拟事务未知状态
            int status0 = 0;
            // 模拟事务提交成功
            int status1 = 1;
            // 模拟事务回滚
            int status2 = 2;
            localTrans.put(msg.getTransactionId(), status1);
            return LocalTransactionState.UNKNOW;
        }
    
        /**
         * 本地事务回查函数
         * 每60s对 RMQ_SYS_TRANS_HALF_TOPIC 的消息进行一次检查
         * 即上面方法返回 UNKNOW 状态
         * @param msg 消息
         * @return 本地事务提交状态 COMMIT_MESSAGE,ROLLBACK_MESSAGE, UNKNOW
         */
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            System.out.println("执行事务执行状态回调函数,事务ID:"+msg.getTransactionId());
            Integer status = localTrans.get(msg.getTransactionId());
            if (null != status) {
                switch (status) {
                    case 0:
                        return LocalTransactionState.UNKNOW;
                    case 1:
                        return LocalTransactionState.COMMIT_MESSAGE;
                    case 2:
                        return LocalTransactionState.ROLLBACK_MESSAGE;
                    default:
                        System.out.println("我也不知道该干啥了");
                }
            }
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    }
    

    六、消费者

    就是一个普通的消费者

    package com.asd.rocket.controller.chapter2;
    
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.MessageQueueSelector;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageQueue;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    import java.util.List;
    
    /**
     * @author zhangluping@sinosoft.com.cn
     * @date 2019/4/10 16:12
     */
    public class OrderedProducer {
        public static void main(String[] args) throws Exception {
            //Instantiate with a producer group name.
            DefaultMQProducer producer = new DefaultMQProducer("example_group_name");
            producer.setNamesrvAddr("10.1.11.155:9876");
            //Launch the instance.
            producer.start();
            String[] tags = new String[] {"TagA", "TagB", "TagC"};
            for (int i = 0; i < 100; i++) {
                int orderId = i % 10;
                byte[] messageBody =  ("orderId=" + orderId + ",message="+i).getBytes(RemotingHelper.DEFAULT_CHARSET);
                //Create a message instance, specifying topic, tag and message body.
                Message msg = new Message("qqq", tags[i % tags.length], "KEY" + i,messageBody);
                final int f_i = i;
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        System.out.println("Topic:"+tags[f_i % tags.length]+",订单:"+arg+",消息:"+f_i+" 使用的Queue为:"+mqs.get(index).getQueueId());
                        return mqs.get(index);
                    }
                }, orderId);
            }
            //server shutdown
            producer.shutdown();
        }
    
    }
    

    相关文章

      网友评论

        本文标题:十、RocketMQ Transaction example 事

        本文链接:https://www.haomeiwen.com/subject/nyucwqtx.html