美文网首页
对事务消息的源码分析

对事务消息的源码分析

作者: Britney_z | 来源:发表于2021-10-11 17:18 被阅读0次

    我大概画了生产者和broker的交互

    首先发送半消息,broker上接收消息,topic为RMQ_SYS_TRANS_HALF_TOPIC。

    如果本地执行事务,提交消息 LocalTransactionState.COMMIT_MESSAGE。我们看看broker是如何处理的

    然后在commitlog里查询到该消息

    接着我们看看这个sendFinalMessage方法,就是提交真正的消息队列seq_topic1 

    下面我们看看deletePrepareMessage这个方法,要把半消息删除掉的逻辑。

    下面我们看看RMQ_SYS_TRANS_OP_HALF_TOPIC这个topic,我们将RMQ_SYS_TRANS_HALF_TOPIC里的这个消息,存储到RMQ_SYS_TRANS_OP_HALF_TOPIC,其中body是该半消息在RMQ_SYS_TRANS_HALF_TOPIC的queueOffset。

    如果客户端返回的是 LocalTransactionState.ROLLBACK_MESSAGE,则不再存储真正的seq_topic1,剩下的逻辑和commit大致相同。

    下面是我们验证broker  check消息,我们先返回unkown类型

    当半消息发送成功后,回调事务监听器,存储本地事务执行结果,返回broker本地事务执行结果,我们看下由于执行本地事务的时候,返回的是unknown状态,所以肯定会check,所以此时seq_topic1并未生成,在check方法里 返回COMMIT_MESSAGE之后我们再看看。

    下面我们来分析分析,定时check的过程

    我们首先取我们从RMQ_SYS_TRANS_OP_HALF_TOPIC取消息,看下removeMap这个map   key是RMQ_SYS_TRANS_HALF_TOPIC的queueOffset   value是RMQ_SYS_TRANS_OP_HALF_TOPIC 的queueOffset。

    如果removeMap包含i,则说明这个RMQ_SYS_TRANS_HALF_TOPIC消息已经被确认或者回滚 就不要check处理了。我们看下needDiscard如果check次数大于最大事务check次数,则put消息到TRANS_CHECK_MAX_TIME_TOPIC。

    具体细节就不扣了,后面就是check消息 ,然后客户端确认本地事务执行结果,然后commit or rollback。 更新RMQ_SYS_TRANS_HALF_TOPIC和RMQ_SYS_TRANS_OP_HALF_TOPIC 的consumeoffset

    相关文章

      网友评论

          本文标题:对事务消息的源码分析

          本文链接:https://www.haomeiwen.com/subject/xrrcwltx.html