1.事务消息执行流程

1.事务消息代码实现
Provider.java
@Slf4j
public class Provider {
public static void main(String[] args) throws Exception {
//1.谁来发
//DefaultMQProducer producer = new DefaultMQProducer("group1");
TransactionMQProducer producer = new TransactionMQProducer("group1");
//2.发给谁
producer.setNamesrvAddr("localhost:9876");
producer.setTransactionListener(new TransactionListener() {
//正常事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
//执行本地事务,将消息保存到数据库里;
//sql insert
//返回三种状态:COMMIT_MESSAGE-提交,ROLLBACK_MESSAGE-回滚,UNKNOW-未知
Integer now = RandomUtils.oneOfMany(1, 2, 3);//1==commit 2==rollback 3==unknown
if (now==1){
log.info("执行sql==========存入数据成功!");
return LocalTransactionState.COMMIT_MESSAGE;
}
if (now==2){
log.info("执行sql==========存入数据失败...正在回滚...");
return LocalTransactionState.ROLLBACK_MESSAGE;
}
if (now==3){
log.info("执行sql==========未知错误!");
return LocalTransactionState.UNKNOW;
}
log.info("execute error");
return LocalTransactionState.UNKNOW;
}
//补偿事务==处理正常事务找正常事务返回的无应答LocalTransactionState.UNKNOW;由Consumer发起;
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
//返回2种状态:COMMIT_MESSAGE-提交成功==>继续执行,ROLLBACK_MESSAGE-提价不成功==>回滚,不执行
Integer now = RandomUtils.oneOfMany(1, 2);//1==commit 2==rollback
if (now==1){
log.info("检查mysql==========事务执行成功!");
return LocalTransactionState.COMMIT_MESSAGE;
}
if (now==2){
log.info("检查mysql==========事务执行失败...已回滚");
return LocalTransactionState.ROLLBACK_MESSAGE;
}
log.info("check error");
return LocalTransactionState.ROLLBACK_MESSAGE;
}
});
producer.start();
String msg = "这是一条事务消息!";
Message message = new Message("topic13", "tag1",msg.getBytes(StandardCharsets.UTF_8));
TransactionSendResult result = producer.sendMessageInTransaction(message, null);
System.out.println(result);
}
}
Consumer.java
@Slf4j
public class Consumer {
public static void main(String[] args) throws Exception {
//1.创建一个接收消息的对象Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.设定接收的命名服务器地址
consumer.setNamesrvAddr("localhost:9876");
//3.设置接收信息对应的topic,对应的sub标签为任意
consumer.subscribe("topic13", "*");
//4.启动监听,接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
/**
* @param msgs msgs.size() >= 1<br> DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here
* @param context
* @return The consume status
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg:msgs) {
String s = new String(msg.getBody());
log.info("msg.body()=====>"+s);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.启动接收消息的服务
consumer.start();
}
}
工具类RandomUtils
public class RandomUtils {
/**
* 随机返回一个传入的参数;
* @param ts 对象数组
* @param <T>传入类型
* @return t 对象
*/
@SafeVarargs
public static <T> T oneOfMany(final T...ts){
int tsLength = ts.length;
//生产数组下标长度的任意整数;随机获取一个数组;
int pos = (int) (Math.random() *tsLength);
return ts[pos];
}
}
网友评论