美文网首页
2022-04-04_rocketmq事务回查自定义essage

2022-04-04_rocketmq事务回查自定义essage

作者: kikop | 来源:发表于2022-04-04 21:24 被阅读0次

    20220404_rocketmq事务回查自定义essageCheckListener学习笔记

    1概述

    1.1事务消息回查

    rocketmq版本:V4.8.0

    为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionalMessageCheckListener 类来修改这个行为。

    本文涉及的内容如下:

    1. 修改代理端配置broker.conf,单个消息的检查次数(默认15次)
    2. 自定义日志回查超时后处理逻辑AbstractTransactionalMessageCheckListener
    3. 手动修改代理端对应的Jar包rocketmq-broker(增加CheckListener和SPI配置)
    4. 模拟生产端发送事务型消息

    2代码示例

    2.1单个消息的检查次数

    // D:\mqexperiment\rocketmqdebug\myrocketmqhome\conf\broker-adebug.properties
    #事务回查次数,默认15次
    transactionCheckMax = 3
    

    2.2自定义日志回查超时后处理逻辑

    package org.apache.rocketmq.broker.transaction.queue;
    
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.logging.InternalLogger;
    import org.apache.rocketmq.logging.InternalLoggerFactory;
    
    /**
     * @author kikop
     * @version 1.0
     * @project myproducerserver
     * @file MyAbstractTransactionalMessageCheckListener
     * @desc 自定义日志回查超时后处理逻辑
     * @date 2022/4/4
     * @time 9:30
     * @by IDE IntelliJ IDEA
     */
    public class MyAbstractTransactionalMessageCheckListener extends DefaultTransactionalMessageCheckListener {
    
        private static final InternalLogger log = InternalLoggerFactory.getLogger("RocketmqTransaction");
    
        @Override
        public void resolveDiscardMsg(MessageExt messageExt) {
    
            log.error("begin resolveDiscardMsg...");
            // 1.topic移走
            super.resolveDiscardMsg(messageExt);
            // 2.记录自定义日志
            log.info("Put checked-too-many-time half message to TRANS_CHECK_MAXTIME_TOPIC OK. Restored in queueOffset={}, commitLogOffset={}, real topic={}",
                    new Object[]{messageExt.getQueueOffset(),
                            messageExt.getCommitLogOffset(),
                            messageExt.getUserProperty("REAL_TOPIC")});
            log.error("end resolveDiscardMsg...");
    
        }
    
    
    }
    
    

    2.3编写spi配置文件

    // E:\workdirectory\OpenSourceStudy\rocketmq-all-4.8.0-source-release\broker\src\main\resources\META-INF\service\org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener
    org.apache.rocketmq.broker.transaction.queue.MyAbstractTransactionalMessageCheckListener
    

    2.4修改rocketmq-broker-4.8.0.jar

    1.在META-INF\service增加文件:AbstractTransactionalMessageCheckListener
    2.将MyAbstractTransactionalMessageCheckListener.class文件放在位置:org.apache.rocketmq.broker.transaction.queue
    

    2.5编写生产端事务回查方法

    package com.kikop.mycomponent.listener;
    
    import com.kikop.utils.DateUtils;
    import org.apache.rocketmq.client.producer.LocalTransactionState;
    import org.apache.rocketmq.client.producer.TransactionListener;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.atomic.AtomicInteger;
    
    
    public class MyTransactionListenerImpl implements TransactionListener {
        private AtomicInteger transactionIndex = new AtomicInteger(0);
    
        // key:transactionId
        // value:state
        private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
    
        /**
         * 执行本地事务
         * 当发送半消息成功时,mqServer根据返回值决定是否提交事务,只执行一次
         *
         * @param msg
         * @param arg
         * @return
         */
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            System.out.println(String.format("开始执行本地事务,transactionId:%s, executeLocalTransaction:%s",
                    msg.getTransactionId(), DateUtils.getTime()));
            //  直接模拟事务执行结果未知,触发 rocketmq 定时事务回查
            int value = transactionIndex.getAndIncrement();
    //        int status = value % 3;
            int status = 0;
            localTrans.put(msg.getTransactionId(), status);
            return LocalTransactionState.UNKNOW;
        }
    
        /**
         * 如果已经成功,则不需再次检查
         * 获取本地事务状态
         * 默认检查15次,周期:1分钟
         * 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次
         *
         * @param msg
         * @return
         */
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            System.out.println(String.format("开始执行事务回查,transactionId:%s, checkLocalTransaction:%s", msg.getTransactionId(), DateUtils.getTime()));
            Integer status = localTrans.get(msg.getTransactionId());
            if (null != status) {
                switch (status) {
                    case 0:
                        return LocalTransactionState.UNKNOW;
                    case 1:
                        return LocalTransactionState.COMMIT_MESSAGE;
                    case 2:
                        return LocalTransactionState.ROLLBACK_MESSAGE;
                }
            }
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    }
    
    

    2.6编写生产端测试代码

    @Override
        public void sendTransactionMsg() {
    
            String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
    
            for (int i = 0; i < 1; i++) {
                try {
                    Message msg =
                            new Message("myTransactionTopic2", tags[i % tags.length], "KEY" + i,
                                    ("Hello kikop " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
    
                    // 生成一个事务ID
                    // 发送的时候会自动调用一次 executeLocalTransaction
                    SendResult sendResult = transactionMQProducer.sendMessageInTransaction(msg, null);
                    System.out.printf("%s%n", sendResult);
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } catch (MQClientException | UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
            }
        }
    

    2.7测试

    2.7.1生产端日志

    // 1.消息发送
    SendResult [sendStatus=SEND_OK, msgId=7F000001192C14DAD5DC13F38BCC0000, offsetMsgId=null, messageQueue=MessageQueue [topic=myTransactionTopic2, brokerName=broker-adebug, queueId=2], queueOffset=0]
    // 2.执行本地事务
    开始执行本地事务,transactionId:7F000001192C14DAD5DC13F38BCC0000, executeLocalTransaction:2022-04-04 20:58:48
    
    // 3次事务回查
    开始执行事务回查,transactionId:7F000001192C14DAD5DC13F38BCC0000, checkLocalTransaction:2022-04-04 20:59:15
    开始执行事务回查,transactionId:7F000001192C14DAD5DC13F38BCC0000, checkLocalTransaction:2022-04-04 21:00:15
    开始执行事务回查,transactionId:7F000001192C14DAD5DC13F38BCC0000, checkLocalTransaction:2022-04-04 21:01:15
    

    2.7.2代理端日志

    2022-04-04 21:02:15 ERROR TransactionalMessageCheckService - begin resolveDiscardMsg...
        
    2022-04-04 21:02:15 ERROR TransactionalMessageCheckService - MsgExt:MessageExt [brokerName=null, queueId=0, storeSize=351, queueOffset=3, sysFlag=0, bornTimestamp=1649077128141, bornHost=/192.168.174.110:53893, storeTimestamp=1649077275581, storeHost=/192.168.174.110:10911, msgId=C0A8AE6E00002A9F0000000000001441, commitLogOffset=5185, bodyCRC=2146650276, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='RMQ_SYS_TRANS_HALF_TOPIC', flag=0, properties={REAL_TOPIC=myTransactionTopic2, TRANSACTION_CHECK_TIMES=3, KEYS=KEY0, TRAN_MSG=true, UNIQ_KEY=7F000001192C14DAD5DC13F38BCC0000, CLUSTER=rocketmq-clusterdebug, WAIT=true, PGROUP=please_rename_unique_group_name, TAGS=TagA, REAL_QID=2}, body=[72, 101, 108, 108, 111, 32, 107, 105, 107, 111, 112, 32, 48], transactionId='null'}] has been checked too many times, so discard it by moving it to system topic TRANS_CHECK_MAXTIME_TOPIC
    2022-04-04 21:02:15 INFO TransactionalMessageCheckService - create new topic TopicConfig [topicName=TRANS_CHECK_MAX_TIME_TOPIC, readQueueNums=1, writeQueueNums=1, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]
    2022-04-04 21:02:15 INFO brokerOutApi_thread_1 - register broker[0]to name server 127.0.0.1:9876 OK
    2022-04-04 21:02:15 INFO TransactionalMessageCheckService - Put checked-too-many-time half message to TRANS_CHECK_MAXTIME_TOPIC OK. Restored in queueOffset=3, commitLogOffset=5185, real topic=myTransactionTopic2
    2022-04-04 21:02:15 INFO TransactionalMessageCheckService - Put checked-too-many-time half message to TRANS_CHECK_MAXTIME_TOPIC OK. Restored in queueOffset=3, commitLogOffset=5185, real topic=myTransactionTopic2
        
    2022-04-04 21:02:15 ERROR TransactionalMessageCheckService - end resolveDiscardMsg...
    

    参考

    1RocketMQ源码解析(搭建环境)

    https://inetyoung.blog.csdn.net/article/details/109036959

    2消息中间件 RocketMQ 源码解析 —— 调试环境搭建

    https://blog.csdn.net/qiujiavip/article/details/99842544

    3RocketMQ源码分析之消息存储

    https://zhuanlan.zhihu.com/p/58728454

    相关文章

      网友评论

          本文标题:2022-04-04_rocketmq事务回查自定义essage

          本文链接:https://www.haomeiwen.com/subject/imhhsrtx.html