事务消费

作者: 念䋛 | 来源:发表于2021-06-21 18:04 被阅读0次

事务消费
RocketMq 的事务属于半事务,只能保证生产者事务成功之后,将消息发送到broker,如果消费者消费失败,不能回滚生产者事务.
分析下图

  1. 首先向broker发送half消息,此时消费者看不到此消息
  2. Broker向生产者发送.我已接收到half消息
  3. 执行本地事务,就是TransactionListener接口的executeLocalTransaction方法,返回
    (1) COMMIT_MESSAGE 消息升级为普通消息,消费者感知到并消费消息
    (2) ROLLBACK_MESSAGE 消息直接丢弃
    (3) UNKNOW 等待broker的消息回查
  4. 根据步骤3返回的状态,提交消息让消费者消费或者丢弃消息
  5. 回查UNKNOW状态,或者迟迟不确认half消息
  6. 步骤3返回UNKNOW状态,回调TransactionListener接口的checkLocalTransaction方法,这里可以重新提交事务,并返回步骤3提到的三种状态
  7. 根据步骤6返回的状态,决定消息是否传送给消费端还是丢弃,如果依然返回UNKNOW则重复步骤5
    事务在执行生产者本地代码是肯定会出现多次执行,要做好幂等性操作
    官网示例修改了一下
    监听
public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger (0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<> ();

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        //对不同的Tab返回不同的状态
        String tags = msg.getTags ();
        if (StringUtils.contains (tags, "TagA")) {
            return LocalTransactionState.COMMIT_MESSAGE;
        } else if (StringUtils.contains (tags, "TagB")) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        } else {
            return LocalTransactionState.UNKNOW;
        }
    }

    //如果返回了UNKNOW状态或者一段时间内没有提交任何状态,都会执行该方法,如果该方法返回状态依然是UNKNOW,或者长时间不返回状态,会重复调用,该消息会被丢弃
    //broker服务器会有定时线程去扫面UNKNOW状态的消息,或者长时间没有返回状态的消息,定时线程的周期和超过几次丢弃消息,没看过源码,不敢随便说
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        String tags = msg.getTags ();
        System.out.println ("checkLocalTransaction" + tags);
        if (StringUtils.contains (tags, "TagC")) {
            return LocalTransactionState.COMMIT_MESSAGE;
        } else if (StringUtils.contains (tags, "TagD")) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        } else {
            return LocalTransactionState.UNKNOW;
        }
    }

生产者

public static void main(String[] args) throws MQClientException, InterruptedException {
    TransactionListener transactionListener = new TransactionListenerImpl();
    TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
    producer.setNamesrvAddr("192.168.137.146:9876");
    //创建一个线程池 核心线程数2 最大线程数5个
    //操过核心线程数的线程等待100s,100s内没有任务执行,被回收
    //单位s
    //操过核心线程数的任务,放到队列中,队列满了以后,重新创建线程,最大不能操过5个,5个包含2个核心线程
    ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("client-transaction-msg-check-thread");
            return thread;
        }
    });
    producer.setExecutorService(executorService);
    //添加监听
    producer.setTransactionListener(transactionListener);
    producer.start();
    String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
    for (int i = 0; i < 10; i++) {
        try {
            Message msg =
                new Message("TopicTest", tags[i % tags.length], "KEY" + i,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.sendMessageInTransaction(msg, null);
            System.out.printf("%s%n", sendResult);
            Thread.sleep(10);
        } catch (MQClientException | UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
    for (int i = 0; i < 100000; i++) {
        Thread.sleep(1000);
    }
    producer.shutdown();
}

关于事务消息的限制(官网翻译)

  1. 事务消息不支持延时消息和批量消息
  2. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为15次,但是用户可以通过Broker配置文件的transactionCheckMax参数来修改此限制。如果已经检查某条消息超过N次的话( N= transactioncheckmax)则Broker将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写AbstractTransactionCheckListener类来修改这个行为。
  3. 事务消息将在Broker配置文件中的参数transactionMsgTimeout这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性
    CHECK_IMMUNITY_TIME_IN_SECONDS来改变这个限制,该参数优先于
    transactionMsgTimeout参数。
  4. 事务性消息可能不止一次被检查或消费。
  5. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过RocketMQ本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
  6. 事务消息的生产者ID不能与其他类型消息的生产者ID共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者ID查询到消费者。


    image.png

相关文章

  • 事务消费

    事务消费RocketMq 的事务属于半事务,只能保证生产者事务成功之后,将消息发送到broker,如果消费者消费失...

  • ActiveMQ事物/持久化/其他

    事务jms中事务分为生产者和消费者两块,消息的生产和消费不能包含在同一个事务中。 生产者:在事务状态下进行发送操作...

  • 2-2 SimpleMessageListenerContain

    简介 简单消息监听容器 == 装载消费者 作用 可以监听队列(多个队列) 设置事务特性、事务管理器、事务属性、事务...

  • 基于rocketmq事务消息的分布式事务

    先看下图 以上图例展示了mq事务消息解决分布式事务的producer环节,consumer正常消费即可。 show...

  • Windows 安装 RocketMQ

    一、RocketMQ 介绍 1、消息顺序2、消息重复消费3、事务消息 二、RocketMQ 安装 Windows:...

  • 面试集锦

    (金财互联) 1、多线程状态的几种异常2、mq怎么保证顺序消费3、大事务小事务4、什么场景用到lambda5、My...

  • 分布式事务解决方案之可靠消息最终一致性

    可靠消息最终一致性方案是指当事务发起方执行完成本地事务后并发出一条消息,事务参与方(消息消费者)一定能够接收消息并...

  • 【干货】Kafka 事务特性分析

    特性背景 消息事务是指一系列的生产、消费操作可以要么都完成,要么都失败,类似数据库的事务。这个特性在0.10.2的...

  • 3.2.rabbitMq消费者的消费方式(gold_axe)

    性能和可靠性的权衡 3种消费方式:事务 , 拉取 , Qos 消费者一般使用推送, 不用拉取(太慢了) 批量机制可...

  • 生活的苦

    生活的苦 一如 挣钱的事务 苦,意味着 收入 而享乐 恰似消费 需要支付

网友评论

    本文标题:事务消费

    本文链接:https://www.haomeiwen.com/subject/rqpryltx.html