美文网首页MQ
RocketMQ事务消息

RocketMQ事务消息

作者: 丑人林宗己 | 来源:发表于2020-01-14 10:28 被阅读0次

抽空扒一下RocketMQ的事务消息,看看具体的实现,版本4.6.0

基本原理

RocketMQ的事务消息实现是基于二阶段协议实现的。即先发送半消息,成功之后执行本次事务,之后发送事务结束消息

这里其实之前一直存在一个误解。这个误解其实一直都在怀疑只是没时间确认,因为无法确认RocketMQ怎么可能得到事务对象

在执行完半消息发送之后,会调用TransactionListener写入数据库,写入数据库成功后等到事务正常提交之后再进行发送事务结束消息,即本地事务需要与调用TransactionListener写入数据库需要在一个事务内。

如下,期望是本地事务reduct执行与消息发送在一个事务之内,以此达到数据库事务的特性,当发送结束事务消息成功后,如果此时事务异常回滚了,消息与事务的一致性就被破坏了。

public class BusinessService {
    
    @Transactional
    public void reduct() {
        businessService.reduct();///
        rocketmqProducer.sendMessageInTransaction(
                message, args
        );
    }
}

相反过来,RocketMQ的事务消息的本质是保证消息一定会发送成功,与本地事务并非是一致性。或者可以先发送半消息,然后再执行事务,之后再根据事务的结果是否执行事务结束消息。

public class BusinessService {

    public BusinessService() {
        rocketmqProducer.setTransactionListener(new TransactionListenerImpl());
    }
    
    public static class TransactionListenerImpl implements TransactionListener {
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            return businessService.reduct();///;
        }

        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            return businessService.check();///;
        }
    }
    
    // 对外提供该方法
    public void reductAndSendMsg() {
        rocketmqProducer.sendMessageInTransaction(
                message, args
        );
    }

    @Transactional
    public void reduct() {
        // TODO
    }
}

RocketMQ的这种基于两阶段协议的事务消息,相比普通消息而言是有了事务成功的保证,但是使用时还是要慎重,因为它非常依赖消息队列,一旦消息队列服务波动涉及到的都将无法提供服务。

源码分析

client

客户端的源码基本上都在DefaultMQProducerImpl#sendMessageInTransaction()方法中,核心片段如下

MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
    sendResult = this.send(msg); // 半消息,注意是同步的消息
} catch (Exception e) {
    throw new MQClientException("send message Exception", e);
}
//....
switch (sendResult.getSendStatus()) {
    case SEND_OK: {
        try {
            //...
           localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
           // .....
        } catch (Throwable e) {
            log.info("executeLocalTransactionBranch exception", e);
            log.info(msg.toString());
            localException = e;
        }
    }
    break;
    case FLUSH_DISK_TIMEOUT:
    case FLUSH_SLAVE_TIMEOUT:
    case SLAVE_NOT_AVAILABLE:
        localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
        break;
    default:
        break;
}

try {
    this.endTransaction(sendResult, localTransactionState, localException); // 异步消息
} catch (Exception e) {
    log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}

broker

服务端关注几个点

  • SendMessageProcessor半消息处理
  • EndTransactionProcessor事务结束处理
  • TransactionalMessageCheckService消息回查

其次逻辑队列是RocketMQ存储中的一个非常核心的架构,也是需要重点关注

  • RMQ_SYS_TRANS_HALF_TOPIC 半消息队列
  • RMQ_SYS_TRANS_OP_HALF_TOPIC 事务消息已处理队列
  • CID_RMQ_SYS_TRANS事务消息由于超时或超过最大回查次数等原因的队列

这里重点看看消息回查的逻辑即可,其他的按照类名称点进去翻一翻即可。

消息回查的基本逻辑,从RMQ_SYS_TRANS_HALF_TOPIC取出该主题下的一组逻辑队列,对队列进行循环,取出每个队列记录的上次消费的物理位置(此处的消费并非是消息被消费而是消息被处理过),同样,根据该逻辑队列从RMQ_SYS_TRANS_OP_HALF_TOPIC中取出相对应的逻辑队列,该逻辑队列存储的试衣镜处理过的消息(包括处理成功或者失败)。之后再根据 一些参数进行判断(具体请看代码),该丢弃的丢弃(丢弃进入CID_RMQ_SYS_TRANS队列),该回查的回查,最后将处理的队列进度更新到消费进度(consumer_table)中。

事务消息的整个逻辑大致分为三个点:

  • 客户端写入半消息到CommiLog,在服务端有线程异步将写入的消息不断的刷到逻辑队列中,此时半消息进入RMQ_SYS_TRANS_HALF_TOPIC,同步返回后客户端执行本地事务,执行成功后发送事务结束消息到服务端,此时再根据半消息回复为正常的消息放入到CommiLog(更正了消息的Topic),之后再由异步线程刷到逻辑队列(正确的待消费的逻辑队列)。其次再写入RMQ_SYS_TRANS_OP_HALF_TOPIC消息
  • 服务端每个一分钟(默认)会执行一次消息回查任务,根据RMQ_SYS_TRANS_HALF_TOPICRMQ_SYS_TRANS_OP_HALF_TOPIC的消息取出需要被回查的消息进行回查。回查时是由服务端主动向客户端发起回查请求,该逻辑处理为异步。
  • 客户端收到回查请求后进行回查,并根据回查的结果发送事务结束消息

核心代码片段就不贴出来了,因为TransactionalMessageServiceImpl#check整个方法都是非常核心的,代码又长,有需要自行查阅。

image.png

总结

事实上个人并不是很赞同使用事务消息,至少看起来略显得鸡肋(目前的版本),在分布式事务场景下只能保证消息一定发送,而分布式事务是需要保证各个节点事务严格一致性,如果发送成功了但是没有被正确消费等等,该方法就不适合了。只能说有特定场景,但是还不具备普适性,当然技术方案自然不可能做到面面俱到,能解决部分场景已然很好。

其次服务过于依赖消息队列服务,如果消息队列出现三五分钟的动荡,上游的服务基本上是废弃的。而大多数场景下是上游的服务并非严格依赖消息队列,所以即便是消息队列动荡,上游理应可以继续提供服务,下游可能存在数据延迟更新等问题,需要介入修复。比如下单后的发货,存在三五分钟通知延迟是可以接受的,但是不能下单就不能接受。

也不能否则存在上下游严格依赖的场景。但是如果需要强一致性的解决方案,是不是应该考虑使用其他技术方案,比如Seata

相关文章

  • RocketMQ分布式事务消息

    1、RocketMQ事务消息概念 RocketMQ事务消息:RocketMQ 提供分布事务功能,通过 Rocket...

  • 2020-03-12

    rocketMq: 事务消息发送步骤如下: 发送方将半事务消息发送至消息队列 RocketMQ 版服务端。 消息队...

  • rocketmq实现事务消息

    参考:rocketmq实现事务消息

  • Windows 安装 RocketMQ

    一、RocketMQ 介绍 1、消息顺序2、消息重复消费3、事务消息 二、RocketMQ 安装 Windows:...

  • 8:RocketMq实战 分布式事务消息架构讲解(文末有项目连

    1:什么是分布式事务: 2:RocketMQ4.X分布式事务消息架构讲解 3:RocketMQ4.X分布式事务消息...

  • RocketMQ事务消息

    抽空扒一下RocketMQ的事务消息,看看具体的实现,版本4.6.0。 基本原理 RocketMQ的事务消息实现是...

  • RocketMq事务消息

    分布式事务 微服务倡导将复杂的系统拆分为若干个简单、职责单一、松耦合的服务,可以降低开发难度,便于敏捷开发。而对大...

  • RocketMq 事务消息

    系列 RocketMq 消息Tag过滤 RocketMq 广播模式 RocketMQ 同步调用的新特性 Rocke...

  • RocketMQ事务消息

    概述 事务消息解决的问题是:Provider本地事务 + 消息投递 一起执行。解决应用端 和 MQ端两个独立的应用...

  • RocketMQ事务消息

    在微服务架构中,随着服务的逐步拆分,数据库私有已经成为共识,这也导致所面临的分布式事务问题成为微服务落地过程中一个...

网友评论

    本文标题:RocketMQ事务消息

    本文链接:https://www.haomeiwen.com/subject/jisgactx.html