美文网首页
RoketMQ-事务消息

RoketMQ-事务消息

作者: 你家门口的两朵云 | 来源:发表于2021-09-09 16:59 被阅读0次
1.事务消息执行流程
image.png
1.事务消息代码实现

Provider.java

@Slf4j
public class Provider {
    public static void main(String[] args) throws Exception {
        //1.谁来发
        //DefaultMQProducer producer = new DefaultMQProducer("group1");
        TransactionMQProducer producer = new TransactionMQProducer("group1");
        //2.发给谁
        producer.setNamesrvAddr("localhost:9876");
        producer.setTransactionListener(new TransactionListener() {
            //正常事务
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                //执行本地事务,将消息保存到数据库里;
                //sql insert
                //返回三种状态:COMMIT_MESSAGE-提交,ROLLBACK_MESSAGE-回滚,UNKNOW-未知
                Integer now = RandomUtils.oneOfMany(1, 2, 3);//1==commit  2==rollback  3==unknown
                if (now==1){
                    log.info("执行sql==========存入数据成功!");
                    return LocalTransactionState.COMMIT_MESSAGE;
                }
                if (now==2){
                    log.info("执行sql==========存入数据失败...正在回滚...");
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }
                if (now==3){
                    log.info("执行sql==========未知错误!");
                    return LocalTransactionState.UNKNOW;
                }
                log.info("execute error");
                return LocalTransactionState.UNKNOW;
            }
            //补偿事务==处理正常事务找正常事务返回的无应答LocalTransactionState.UNKNOW;由Consumer发起;
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                //返回2种状态:COMMIT_MESSAGE-提交成功==>继续执行,ROLLBACK_MESSAGE-提价不成功==>回滚,不执行
                Integer now = RandomUtils.oneOfMany(1, 2);//1==commit  2==rollback
                if (now==1){
                    log.info("检查mysql==========事务执行成功!");
                    return LocalTransactionState.COMMIT_MESSAGE;
                }
                if (now==2){
                    log.info("检查mysql==========事务执行失败...已回滚");
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }
                log.info("check error");
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        });

        producer.start();
        String msg = "这是一条事务消息!";
        Message message = new Message("topic13", "tag1",msg.getBytes(StandardCharsets.UTF_8));
        TransactionSendResult result = producer.sendMessageInTransaction(message, null);
        System.out.println(result);
    }
}

Consumer.java

@Slf4j
public class Consumer {
    public static void main(String[] args) throws Exception {
        //1.创建一个接收消息的对象Consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2.设定接收的命名服务器地址
        consumer.setNamesrvAddr("localhost:9876");
        //3.设置接收信息对应的topic,对应的sub标签为任意
        consumer.subscribe("topic13", "*");
        //4.启动监听,接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            /**
             * @param msgs    msgs.size() >= 1<br> DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here
             * @param context
             * @return The consume status
             */
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg:msgs) {
                    String s = new String(msg.getBody());
                    log.info("msg.body()=====>"+s);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //5.启动接收消息的服务
        consumer.start();
    }
}

工具类RandomUtils

public class RandomUtils {
    /**
     * 随机返回一个传入的参数;
     * @param ts 对象数组
     * @param <T>传入类型
     * @return t 对象
     */
    @SafeVarargs
    public static <T> T oneOfMany(final T...ts){
        int tsLength =  ts.length;
        //生产数组下标长度的任意整数;随机获取一个数组;
        int pos = (int) (Math.random() *tsLength);
        return ts[pos];
    }
  
}

相关文章

  • RoketMQ-事务消息

    1.事务消息执行流程 1.事务消息代码实现 Provider.java Consumer.java 工具类Rand...

  • 事务消息

    https://blog.csdn.net/chunlongyu/article/details/53844393...

  • 事务消息

    注意:事务消息的 Producer ID 不能与其他类型消息的 Producer ID 共用。与其他类型的消息不同...

  • 事务消息

    总览 RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定...

  • 51.消息类型-事务消息

    事务消息 RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致性。 事务消息交互流程: 两个概...

  • RocketMQ分布式事务消息

    1、RocketMQ事务消息概念 RocketMQ事务消息:RocketMQ 提供分布事务功能,通过 Rocket...

  • 精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖

    什么是事务消息 事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全...

  • 什么是事务、半事务消息?怎么实现的?

    事务消息就是MQ提供的类似XA的分布式事务能⼒,通过事务消息可以达到分布式事务的最终⼀致性。半事务消息就是MQ收到...

  • RocketMq事务消息

    分布式事务 微服务倡导将复杂的系统拆分为若干个简单、职责单一、松耦合的服务,可以降低开发难度,便于敏捷开发。而对大...

  • RocketMq 事务消息

    系列 RocketMq 消息Tag过滤 RocketMq 广播模式 RocketMQ 同步调用的新特性 Rocke...

网友评论

      本文标题:RoketMQ-事务消息

      本文链接:https://www.haomeiwen.com/subject/dkxcwltx.html