美文网首页
RocketMQ的消息存储格式

RocketMQ的消息存储格式

作者: 无醉_1866 | 来源:发表于2019-11-03 08:06 被阅读0次

总体代码

我们可以通过阅读RocketMQ的消息存储代码来了解RocketMQ的消息存储格式,消息的存储入口是DefaultMessageStore,我们可以通过DefaultMessageStore.putMessage方法来查看消息的存储,整个存储过程示例图如下:

image
  • NettyRemotingServer是broker用于处理收发消息请求的入口,是通过netty处理的tcp请求以及rpc过程
  • SendMessageProcessor是处理发消息请求的类,发送消息的处理都是由此类完成
  • DefaultMessageStore则是消息存取的服务,其中有putMessage和getMessage方法用于处理消息的存储和读取
  • CommitLog是对消息存储文件的抽象,消息经过编码后存入CommitLog
  • MappedFile是对内存映射文件操作的封装,可以认为就是MappedByteBuffer
  • DefaultAppendMessageCallback中处理消息的格式编码,是一个内部类
  • GroupCommitLogService实现了将内存中的数据刷盘,分有同步和异步两种方式
  • HAService封装了对消息的replication的处理,支持同步和异步两种方式

详细格式

根据前文的描述,想要知道消息存储的编码格式,我们可以进入到DefaultMessageAppendCallback中,阅读一下doAppend方法的逻辑:

image

其中包含一个msgStoreItemMemory,它是个ByteBuffer,用于存储编码后的消息,消息的编码实际上就是将消息转换成字节的方式,但是这个过程又与对象的二进制序列化不一样,消息中的消息体是对象经过了序列化之后的数据,由生产者发送给了broker,而这里的二进制转换是通过编码的方式实现的,其中不仅需要有消息体的内容,还需要包括很多额外的数据,DefaultAppendMessageCallback的doAppend方法中,我们可以看到具体的消息编码代码:

image

根据这段代码能够清晰的看到消息编码的过程,我们可以看到每个消息在存储时所需要携带的附加信息,在Java中,int占4字节,long占8字节,char占2字节,由此可以得到RocketMQ的消息在编码后的结构如下图:

image
  • 4字节表示消息的长度,消息的长度是整个消息体所占用的字节数的大小
  • 4字节的魔数,是固定值,有MESSAGE_MAGIC_CODE和BLANK_MAGIC_CODE
  • 4字节的CRC,是消息体的校验码,用于防止网络、硬件等故障导致数据与发送时不一样带来的问题
  • 4字节的queueId,表示消息发到了哪个MessageQueue(逻辑上相当于kakka的partition)
  • 4字节的flag,flag是创建Message对象时由生产者通过构造器设定的flag值
  • 8字节的queueOffset,表示在queue中的偏移量
  • 8字节的physicalPosition,表示在存储文件中的偏移量
  • 4字节sysFlag,是生产者相关的信息标识,具体生产逻辑可以看相关代码
  • 8字节消息创建时间
  • 8字节消息生产者的host
  • 8字节消息存储时间
  • 8字节消息存储的机器的host
  • 4字节表示重复消费次数
  • 8字节消息事务相关偏移量
  • 4字节表示消息体的长度
  • 消息休,不是固定长度,和前面的4字节的消息体长度值相等
  • 1字节表示topic的长度,因此topc的长度最多不能超过127个字节,超过的话存储会出错(有前置校验)
  • Topic,存储topic,因为topic不是固定长度,所以这里所占的字节是不固定的,和前一个表示topic长度的字节的值相等
  • 2字节properties的长度,properties是创建消息时添加到消息中的,因此,添加在消息中的poperties不能太多太大,所有的properties的kv对在拼接成string后,所占的字节数不能超过2^15-1
  • Properties的内容,也不是固定长度,和前面的2字节properties长度的值相同

相关文章

  • RocketMQ的消息存储格式

    总体代码 我们可以通过阅读RocketMQ的消息存储代码来了解RocketMQ的消息存储格式,消息的存储入口是De...

  • RocketMQ消息存储

    RocketMQ消息存储 1 CommitLog 要想知道RocketMQ如何存储消息,我们先看看CommitLo...

  • RocketMQ——RocketMQ消息存储

    DefaultMQPushConsumer 属性consumerGroup消费组名称messageModel消息消...

  • 四、设计

    1 消息存储 消息存储是RocketMQ中最为复杂和最为重要的一部分,本节将分别从RocketMQ的消息存储整体架...

  • rocketMQ 设计

    1 消息存储 消息存储是RocketMQ中最为复杂和最为重要的一部分,本节将分别从RocketMQ的消息存储整体架...

  • RocketMq消息存储

    一个消息中间件最核心的东西就是消息存储结构。 这是kafka的消息存储: 每个topic_partition对应一...

  • RocketMQ:消息存储

    通常来说我们对分布式队列有高可靠性的要求,所以数据要进行持久化存储。 消息生产者发送消息到MQ。 MQ收到消息,将...

  • rocketMQ消息存储

  • RocketMQ存储文件与内存映射

    概览 RocketMQ的消息存储主要是在${ROCKETMQ_HOME}/store文件夹下,message消息主...

  • RocketMQ 6.核心原理解析

    1. 消息的存储结构 RocketMQ消息存储[https://www.cnblogs.com/fanguangd...

网友评论

      本文标题:RocketMQ的消息存储格式

      本文链接:https://www.haomeiwen.com/subject/kbrkbctx.html