事务消费
RocketMq 的事务属于半事务,只能保证生产者事务成功之后,将消息发送到broker,如果消费者消费失败,不能回滚生产者事务.
分析下图
- 首先向broker发送half消息,此时消费者看不到此消息
- Broker向生产者发送.我已接收到half消息
- 执行本地事务,就是TransactionListener接口的executeLocalTransaction方法,返回
(1) COMMIT_MESSAGE 消息升级为普通消息,消费者感知到并消费消息
(2) ROLLBACK_MESSAGE 消息直接丢弃
(3) UNKNOW 等待broker的消息回查 - 根据步骤3返回的状态,提交消息让消费者消费或者丢弃消息
- 回查UNKNOW状态,或者迟迟不确认half消息
- 步骤3返回UNKNOW状态,回调TransactionListener接口的checkLocalTransaction方法,这里可以重新提交事务,并返回步骤3提到的三种状态
- 根据步骤6返回的状态,决定消息是否传送给消费端还是丢弃,如果依然返回UNKNOW则重复步骤5
事务在执行生产者本地代码是肯定会出现多次执行,要做好幂等性操作
官网示例修改了一下
监听
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger (0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<> ();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
//对不同的Tab返回不同的状态
String tags = msg.getTags ();
if (StringUtils.contains (tags, "TagA")) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.contains (tags, "TagB")) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else {
return LocalTransactionState.UNKNOW;
}
}
//如果返回了UNKNOW状态或者一段时间内没有提交任何状态,都会执行该方法,如果该方法返回状态依然是UNKNOW,或者长时间不返回状态,会重复调用,该消息会被丢弃
//broker服务器会有定时线程去扫面UNKNOW状态的消息,或者长时间没有返回状态的消息,定时线程的周期和超过几次丢弃消息,没看过源码,不敢随便说
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String tags = msg.getTags ();
System.out.println ("checkLocalTransaction" + tags);
if (StringUtils.contains (tags, "TagC")) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.contains (tags, "TagD")) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else {
return LocalTransactionState.UNKNOW;
}
}
生产者
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("192.168.137.146:9876");
//创建一个线程池 核心线程数2 最大线程数5个
//操过核心线程数的线程等待100s,100s内没有任务执行,被回收
//单位s
//操过核心线程数的任务,放到队列中,队列满了以后,重新创建线程,最大不能操过5个,5个包含2个核心线程
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
//添加监听
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
关于事务消息的限制(官网翻译)
- 事务消息不支持延时消息和批量消息
- 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为15次,但是用户可以通过Broker配置文件的transactionCheckMax参数来修改此限制。如果已经检查某条消息超过N次的话( N= transactioncheckmax)则Broker将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写AbstractTransactionCheckListener类来修改这个行为。
- 事务消息将在Broker配置文件中的参数transactionMsgTimeout这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性
CHECK_IMMUNITY_TIME_IN_SECONDS来改变这个限制,该参数优先于
transactionMsgTimeout参数。 - 事务性消息可能不止一次被检查或消费。
- 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过RocketMQ本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
-
事务消息的生产者ID不能与其他类型消息的生产者ID共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者ID查询到消费者。
image.png
网友评论