总体代码
我们可以通过阅读RocketMQ的消息存储代码来了解RocketMQ的消息存储格式,消息的存储入口是DefaultMessageStore,我们可以通过DefaultMessageStore.putMessage方法来查看消息的存储,整个存储过程示例图如下:
image- NettyRemotingServer是broker用于处理收发消息请求的入口,是通过netty处理的tcp请求以及rpc过程
- SendMessageProcessor是处理发消息请求的类,发送消息的处理都是由此类完成
- DefaultMessageStore则是消息存取的服务,其中有putMessage和getMessage方法用于处理消息的存储和读取
- CommitLog是对消息存储文件的抽象,消息经过编码后存入CommitLog
- MappedFile是对内存映射文件操作的封装,可以认为就是MappedByteBuffer
- DefaultAppendMessageCallback中处理消息的格式编码,是一个内部类
- GroupCommitLogService实现了将内存中的数据刷盘,分有同步和异步两种方式
- HAService封装了对消息的replication的处理,支持同步和异步两种方式
详细格式
根据前文的描述,想要知道消息存储的编码格式,我们可以进入到DefaultMessageAppendCallback中,阅读一下doAppend方法的逻辑:
image其中包含一个msgStoreItemMemory,它是个ByteBuffer,用于存储编码后的消息,消息的编码实际上就是将消息转换成字节的方式,但是这个过程又与对象的二进制序列化不一样,消息中的消息体是对象经过了序列化之后的数据,由生产者发送给了broker,而这里的二进制转换是通过编码的方式实现的,其中不仅需要有消息体的内容,还需要包括很多额外的数据,DefaultAppendMessageCallback的doAppend方法中,我们可以看到具体的消息编码代码:
image根据这段代码能够清晰的看到消息编码的过程,我们可以看到每个消息在存储时所需要携带的附加信息,在Java中,int占4字节,long占8字节,char占2字节,由此可以得到RocketMQ的消息在编码后的结构如下图:
image- 4字节表示消息的长度,消息的长度是整个消息体所占用的字节数的大小
- 4字节的魔数,是固定值,有MESSAGE_MAGIC_CODE和BLANK_MAGIC_CODE
- 4字节的CRC,是消息体的校验码,用于防止网络、硬件等故障导致数据与发送时不一样带来的问题
- 4字节的queueId,表示消息发到了哪个MessageQueue(逻辑上相当于kakka的partition)
- 4字节的flag,flag是创建Message对象时由生产者通过构造器设定的flag值
- 8字节的queueOffset,表示在queue中的偏移量
- 8字节的physicalPosition,表示在存储文件中的偏移量
- 4字节sysFlag,是生产者相关的信息标识,具体生产逻辑可以看相关代码
- 8字节消息创建时间
- 8字节消息生产者的host
- 8字节消息存储时间
- 8字节消息存储的机器的host
- 4字节表示重复消费次数
- 8字节消息事务相关偏移量
- 4字节表示消息体的长度
- 消息休,不是固定长度,和前面的4字节的消息体长度值相等
- 1字节表示topic的长度,因此topc的长度最多不能超过127个字节,超过的话存储会出错(有前置校验)
- Topic,存储topic,因为topic不是固定长度,所以这里所占的字节是不固定的,和前一个表示topic长度的字节的值相等
- 2字节properties的长度,properties是创建消息时添加到消息中的,因此,添加在消息中的poperties不能太多太大,所有的properties的kv对在拼接成string后,所占的字节数不能超过2^15-1
- Properties的内容,也不是固定长度,和前面的2字节properties长度的值相同
网友评论