一、概述
参考资料:
http://rocketmq.apache.org/docs/transaction-example/
官网上说,是一个两阶段事务提交的实现
使用限制:
1、事务消息不支持批量或者延迟消息
2、对于UNKNOW的消息,RocketMQ会尝试回调15次,超过15次会扔掉消息,并记录错误日志;可以修改broker的transactionCheckMax属性来改变这个次数,如果不想扔掉消息(或者想做一些其他的操作),也可以重写AbstractTransactionCheckListener类来修改默认的扔掉消息的行为。
3、如果返回状态为UNKNOW,那么会每 60秒 执行一次事务状态检查回调函数,这个时间可以通过修改broker的transactionTimeout参数来改变,或者在发送消息的时候修改用户参数:CHECK_IMMUNITY_TIME_IN_SECONDS,这个参数优先于transactionTimeout。
4、事务消息可能被检查或者消费多次?
5、反正RocketMQ开源版本还是会丢数据,如果你一点儿都不想丢,建议使用同步的 double wirte机制。
6、Producer 发送者的ID不能和其他非事务型生产者的ID共用
二、RocketMQ事务的三种状态
1、LocalTransactionState.UNKNOW
未知状态,RocketMQ会调用事务状态检查函数(checkLocalTransaction),来最终决定状态
2、LocalTransactionState.COMMIT_MESSAGE:事务成功提交,消息会被发送并消费
3、LocalTransactionState.ROLLBACK_MESSAGE:事务回滚,消息会被扔掉
三、流程图
流程图四、创建一个事务消息发送类 TransactionProducer
与之前例子不同的地方有三个:
1、使用TransactionMQProducer来声明生产者
2、需要新建一个线程池 executorService ,并绑定到 producer
3、需要声明一个TransactionListener ,并绑定到 producer,这个类会在后面说明
还需要注意的是,发送完消息后,不要立即关闭 producer,否则无法测试回调的情况,这里就简单调用 Thread.sleep(1000 * 60 * 60);
package com.asd.rocket.controller.transaction;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.*;
/**
* @author zhangluping@sinosoft.com.cn
* @date 2019/4/12 10:00
*/
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("tg");
ExecutorService executorService = new ThreadPoolExecutor(2, 5,
100, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2000), r -> {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
});
producer.setNamesrvAddr("10.1.11.155:9876");
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA"};
for (int i = 0; i < 1; i++) {
try {
Message msg =
new Message("qqq", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
// 保持会儿 对于UNKNOW的消息需要在1分钟后回调
Thread.sleep(1000 * 60 * 60);
producer.shutdown();
}
}
五、TransactionListener 事务监听类
使用一个ConcurrentHashMap来模拟存储事务执行的状态,并返回UNKONW状态,在生产中,由于网络问题或者其他问题,会产生UNKONW状态,这里直接模拟这种状态,当然也可以直接返回COMMIT_MESSAGE或者ROLLBACK_MESSAGE,如果返回后面这两种状态,checkLocalTransaction回调函数不会执行。
这个类主要有两个方法:
executeLocalTransaction:本地事务执行方法,系统的业务逻辑(扣钱加钱等事务性操作)
checkLocalTransaction:本地事务状态检查回调方法,模拟从ConcurrentHashMap获取本地事务的执行情况,并根据执行情况决定最终的事务提交状态
可以修改 localTrans.put(msg.getTransactionId(), status1); 这行的状态,来测试各种情况:
1、status0 = 0; 消息会回调15次,然后扔掉,消费者收不到消息;
2、status1 = 1; 消息会在第一次回调后,发送给消费者并被消费;
3、status2 = 2; 消息会在第一次回调后,扔掉消息,消费者收不到消息。
package com.asd.rocket.controller.transaction;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author zhangluping@sinosoft.com.cn
* @date 2019/4/12 10:03
*/
public class TransactionListenerImpl implements TransactionListener {
/**
* 生产环境可以考虑存储到Redis 或者 MongoDB中
*/
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
/**
* 执行本地方法
* @param msg 消息内容
* @param arg 参数
* @return 提交状态 可选:COMMIT_MESSAGE,ROLLBACK_MESSAGE, UNKNOW
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println("执行本地事务!"+" 事务ID:"+msg.getTransactionId());
// 模拟事务未知状态
int status0 = 0;
// 模拟事务提交成功
int status1 = 1;
// 模拟事务回滚
int status2 = 2;
localTrans.put(msg.getTransactionId(), status1);
return LocalTransactionState.UNKNOW;
}
/**
* 本地事务回查函数
* 每60s对 RMQ_SYS_TRANS_HALF_TOPIC 的消息进行一次检查
* 即上面方法返回 UNKNOW 状态
* @param msg 消息
* @return 本地事务提交状态 COMMIT_MESSAGE,ROLLBACK_MESSAGE, UNKNOW
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("执行事务执行状态回调函数,事务ID:"+msg.getTransactionId());
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
System.out.println("我也不知道该干啥了");
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
六、消费者
就是一个普通的消费者
package com.asd.rocket.controller.chapter2;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.List;
/**
* @author zhangluping@sinosoft.com.cn
* @date 2019/4/10 16:12
*/
public class OrderedProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("example_group_name");
producer.setNamesrvAddr("10.1.11.155:9876");
//Launch the instance.
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC"};
for (int i = 0; i < 100; i++) {
int orderId = i % 10;
byte[] messageBody = ("orderId=" + orderId + ",message="+i).getBytes(RemotingHelper.DEFAULT_CHARSET);
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("qqq", tags[i % tags.length], "KEY" + i,messageBody);
final int f_i = i;
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
System.out.println("Topic:"+tags[f_i % tags.length]+",订单:"+arg+",消息:"+f_i+" 使用的Queue为:"+mqs.get(index).getQueueId());
return mqs.get(index);
}
}, orderId);
}
//server shutdown
producer.shutdown();
}
}
网友评论