20220404_rocketmq事务回查自定义essageCheckListener学习笔记
1概述
1.1事务消息回查
rocketmq版本:V4.8.0
为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax
参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax
) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionalMessageCheckListener
类来修改这个行为。
本文涉及的内容如下:
- 修改代理端配置broker.conf,单个消息的检查次数(默认15次)
- 自定义日志回查超时后处理逻辑AbstractTransactionalMessageCheckListener
- 手动修改代理端对应的Jar包rocketmq-broker(增加CheckListener和SPI配置)
- 模拟生产端发送事务型消息
2代码示例
2.1单个消息的检查次数
// D:\mqexperiment\rocketmqdebug\myrocketmqhome\conf\broker-adebug.properties
#事务回查次数,默认15次
transactionCheckMax = 3
2.2自定义日志回查超时后处理逻辑
package org.apache.rocketmq.broker.transaction.queue;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
/**
* @author kikop
* @version 1.0
* @project myproducerserver
* @file MyAbstractTransactionalMessageCheckListener
* @desc 自定义日志回查超时后处理逻辑
* @date 2022/4/4
* @time 9:30
* @by IDE IntelliJ IDEA
*/
public class MyAbstractTransactionalMessageCheckListener extends DefaultTransactionalMessageCheckListener {
private static final InternalLogger log = InternalLoggerFactory.getLogger("RocketmqTransaction");
@Override
public void resolveDiscardMsg(MessageExt messageExt) {
log.error("begin resolveDiscardMsg...");
// 1.topic移走
super.resolveDiscardMsg(messageExt);
// 2.记录自定义日志
log.info("Put checked-too-many-time half message to TRANS_CHECK_MAXTIME_TOPIC OK. Restored in queueOffset={}, commitLogOffset={}, real topic={}",
new Object[]{messageExt.getQueueOffset(),
messageExt.getCommitLogOffset(),
messageExt.getUserProperty("REAL_TOPIC")});
log.error("end resolveDiscardMsg...");
}
}
2.3编写spi配置文件
// E:\workdirectory\OpenSourceStudy\rocketmq-all-4.8.0-source-release\broker\src\main\resources\META-INF\service\org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener
org.apache.rocketmq.broker.transaction.queue.MyAbstractTransactionalMessageCheckListener
2.4修改rocketmq-broker-4.8.0.jar
1.在META-INF\service增加文件:AbstractTransactionalMessageCheckListener
2.将MyAbstractTransactionalMessageCheckListener.class文件放在位置:org.apache.rocketmq.broker.transaction.queue
2.5编写生产端事务回查方法
package com.kikop.mycomponent.listener;
import com.kikop.utils.DateUtils;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
public class MyTransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
// key:transactionId
// value:state
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
/**
* 执行本地事务
* 当发送半消息成功时,mqServer根据返回值决定是否提交事务,只执行一次
*
* @param msg
* @param arg
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println(String.format("开始执行本地事务,transactionId:%s, executeLocalTransaction:%s",
msg.getTransactionId(), DateUtils.getTime()));
// 直接模拟事务执行结果未知,触发 rocketmq 定时事务回查
int value = transactionIndex.getAndIncrement();
// int status = value % 3;
int status = 0;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
/**
* 如果已经成功,则不需再次检查
* 获取本地事务状态
* 默认检查15次,周期:1分钟
* 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次
*
* @param msg
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println(String.format("开始执行事务回查,transactionId:%s, checkLocalTransaction:%s", msg.getTransactionId(), DateUtils.getTime()));
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
2.6编写生产端测试代码
@Override
public void sendTransactionMsg() {
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 1; i++) {
try {
Message msg =
new Message("myTransactionTopic2", tags[i % tags.length], "KEY" + i,
("Hello kikop " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 生成一个事务ID
// 发送的时候会自动调用一次 executeLocalTransaction
SendResult sendResult = transactionMQProducer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
2.7测试
2.7.1生产端日志
// 1.消息发送
SendResult [sendStatus=SEND_OK, msgId=7F000001192C14DAD5DC13F38BCC0000, offsetMsgId=null, messageQueue=MessageQueue [topic=myTransactionTopic2, brokerName=broker-adebug, queueId=2], queueOffset=0]
// 2.执行本地事务
开始执行本地事务,transactionId:7F000001192C14DAD5DC13F38BCC0000, executeLocalTransaction:2022-04-04 20:58:48
// 3次事务回查
开始执行事务回查,transactionId:7F000001192C14DAD5DC13F38BCC0000, checkLocalTransaction:2022-04-04 20:59:15
开始执行事务回查,transactionId:7F000001192C14DAD5DC13F38BCC0000, checkLocalTransaction:2022-04-04 21:00:15
开始执行事务回查,transactionId:7F000001192C14DAD5DC13F38BCC0000, checkLocalTransaction:2022-04-04 21:01:15
2.7.2代理端日志
2022-04-04 21:02:15 ERROR TransactionalMessageCheckService - begin resolveDiscardMsg...
2022-04-04 21:02:15 ERROR TransactionalMessageCheckService - MsgExt:MessageExt [brokerName=null, queueId=0, storeSize=351, queueOffset=3, sysFlag=0, bornTimestamp=1649077128141, bornHost=/192.168.174.110:53893, storeTimestamp=1649077275581, storeHost=/192.168.174.110:10911, msgId=C0A8AE6E00002A9F0000000000001441, commitLogOffset=5185, bodyCRC=2146650276, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='RMQ_SYS_TRANS_HALF_TOPIC', flag=0, properties={REAL_TOPIC=myTransactionTopic2, TRANSACTION_CHECK_TIMES=3, KEYS=KEY0, TRAN_MSG=true, UNIQ_KEY=7F000001192C14DAD5DC13F38BCC0000, CLUSTER=rocketmq-clusterdebug, WAIT=true, PGROUP=please_rename_unique_group_name, TAGS=TagA, REAL_QID=2}, body=[72, 101, 108, 108, 111, 32, 107, 105, 107, 111, 112, 32, 48], transactionId='null'}] has been checked too many times, so discard it by moving it to system topic TRANS_CHECK_MAXTIME_TOPIC
2022-04-04 21:02:15 INFO TransactionalMessageCheckService - create new topic TopicConfig [topicName=TRANS_CHECK_MAX_TIME_TOPIC, readQueueNums=1, writeQueueNums=1, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]
2022-04-04 21:02:15 INFO brokerOutApi_thread_1 - register broker[0]to name server 127.0.0.1:9876 OK
2022-04-04 21:02:15 INFO TransactionalMessageCheckService - Put checked-too-many-time half message to TRANS_CHECK_MAXTIME_TOPIC OK. Restored in queueOffset=3, commitLogOffset=5185, real topic=myTransactionTopic2
2022-04-04 21:02:15 INFO TransactionalMessageCheckService - Put checked-too-many-time half message to TRANS_CHECK_MAXTIME_TOPIC OK. Restored in queueOffset=3, commitLogOffset=5185, real topic=myTransactionTopic2
2022-04-04 21:02:15 ERROR TransactionalMessageCheckService - end resolveDiscardMsg...
网友评论