一个消息中间件最核心的东西就是消息存储结构。
这是kafka的消息存储:
clipboard.png每个topic_partition对应一个日志文件,Producer对该日志文件进行“顺序写”,Consumer对该文件进行“顺序读”。这种存储方式,对于每个文件来说是顺序IO,但是当并发的读写多个partition的时候,对应多个文件的顺序IO,表现在文件系统的磁盘层面,还是随机IO。
因此出现了当partition或者topic个数过多时,Kafka的性能急剧下降。参见http://blog.csdn.net/chunlongyu/article/details/53913758
RocketMq的消息存储:
RocketMQ采用了单一的日志文件,即把同1台机器上面所有topic的所有queue的消息,存放在一个文件里面,从而避免了随机的磁盘写入。
如上图所示,所有消息都存在一个单一的CommitLog文件里面,然后有后台线程异步的同步到ConsumeQueue,再由Consumer进行消费。
这里至所以可以用“异步线程”,也是因为消息队列天生就是用来“缓冲消息”的。只要消息到了CommitLog,发送的消息也就不会丢。只要消息不丢,那就有了“充足的回旋余地”,用一个后台线程慢慢同步到ConsumeQueue,再由Consumer消费。
可以说,这也是在消息队列内部的一个典型的“最终一致性”的案例:Producer发了消息,进了CommitLog,此时Consumer并不可见。但没关系,只要消息不丢,消息最终肯定会进入ConsumeQueue,让Consumer可见。
深入看一下这两个结构。
ConsumeQueue
ConsumeQueue的存储位置
默认的存储位置:${user.home} \store\consumequeue${topicName}${queueId}${fileName}
可以修改:配置文件的
storePathRootDir=/home/haieradmin/mqstore/rocketmqstore
storePathCommitLog=/home/haieradmin/mqstore/rocketmqstore/commitlog
这两个参数。
测试先发送100个消息:
public class SyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new
DefaultMQProducer("produce1");
producer.setNamesrvAddr("10.135.17.26:9876;10.135.17.27:9876");
producer.start();
for (int i = 0; i < 100; i++) {
Message msg = new Message("DemoTopic", "TagA",
("Hello RocketMQ").getBytes()
);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
发送100个完全一样的消息
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128085F6B4, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=0], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128085FD31, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F00000012808606D9, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=2], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128086075F, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=3], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270BE8E7, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=0], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270BE96D, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270BEC87, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=2], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270BED0D, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=3], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128086CB34, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=0], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128086CFB4, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=1], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128086D811, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=2], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128086DA94, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=3], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270CA577, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=0], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270CD5A9, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=1], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270CD62F, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=2], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270CE170, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=3], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F0000001280889405, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=0], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F0000001280889FDF, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=1], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088A262, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=2], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088A976, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=3], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270CFEF4, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=0], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270CFF7A, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=1], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D0000, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=2], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D031A, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=3], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088AE8D, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=0], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088AF13, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=1], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088AF99, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=2], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088B01F, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=3], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D064E, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=0], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D06D4, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=1], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D075A, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=2], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D07E0, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=3], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088B354, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=0], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088B3DA, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=1], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088B460, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=2], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088B795, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=3], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D0DC4, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=0], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D0E4A, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=1], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D117F, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=2], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D14B4, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=3], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088B81B, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=0], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088B8A1, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=1], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088B927, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=2], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088B9AD, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=3], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D17E9, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=0], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D186F, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=1], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D1BA4, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=2], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D1C2A, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=3], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088BA33, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=0], queueOffset=6]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088BAB9, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=1], queueOffset=6]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088BB3F, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=2], queueOffset=6]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088BBC5, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=3], queueOffset=6]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D1CB0, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=0], queueOffset=6]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D1D36, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=1], queueOffset=6]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D1DBC, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=2], queueOffset=6]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D20F1, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=3], queueOffset=6]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088BC4B, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=0], queueOffset=7]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088BCD1, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=1], queueOffset=7]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088BD57, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=2], queueOffset=7]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088BDDD, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=3], queueOffset=7]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D2177, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=0], queueOffset=7]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D21FD, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=1], queueOffset=7]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D2283, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=2], queueOffset=7]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D259D, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=3], queueOffset=7]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088C7A0, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=0], queueOffset=8]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088C826, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=1], queueOffset=8]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088CAA9, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=2], queueOffset=8]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088CDDE, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=3], queueOffset=8]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D2B66, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=0], queueOffset=8]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D2BEC, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=1], queueOffset=8]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D2F21, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=2], queueOffset=8]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D2FA7, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=3], queueOffset=8]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088DC32, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=0], queueOffset=9]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088DCB8, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=1], queueOffset=9]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088E90F, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=2], queueOffset=9]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128088EE26, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=3], queueOffset=9]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D3570, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=0], queueOffset=9]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D3B38, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=1], queueOffset=9]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D3BBE, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=2], queueOffset=9]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D3ED8, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=3], queueOffset=9]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F0000001280890074, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=0], queueOffset=10]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128089058B, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=1], queueOffset=10]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128089080E, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=2], queueOffset=10]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F0000001280890D25, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=3], queueOffset=10]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D49E4, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=0], queueOffset=10]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D4A6A, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=1], queueOffset=10]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D4AF0, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=2], queueOffset=10]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D4B76, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=3], queueOffset=10]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F0000001280890DAB, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=0], queueOffset=11]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F00000012808912C2, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=1], queueOffset=11]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F0000001280891545, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=2], queueOffset=11]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F00000012808915CB, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=3], queueOffset=11]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D4E8F, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=0], queueOffset=11]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D4F15, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=1], queueOffset=11]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D4F9B, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=2], queueOffset=11]
SendResult [sendStatus=SEND_OK, msgId=0A87111B00002A9F00000003270D5021, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-b, queueId=3], queueOffset=11]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F0000001280891651, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=0], queueOffset=12]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F00000012808916D7, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=1], queueOffset=12]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F000000128089175D, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=2], queueOffset=12]
SendResult [sendStatus=SEND_OK, msgId=0A87111A00002A9F00000012808917E3, messageQueue=MessageQueue [topic=DemoTopic, brokerName=broker-a, queueId=3], queueOffset=12]
可以看出,这100条消息时分别发送到两个broker上
去看一下
clipboard.png为什么是4个队列。可以看这篇。http://www.jianshu.com/p/ccdf6fc710b0
进去一个队列,发现有一个00000000000000000000的文件。
顺便说一下,消息队列文件名规则:
commitlog文件的默认存储大小1G,不是ConsumeQueue的
文件名以已有存储容量依次递增,类似如下:
00000000000000000000
00000000001073741824
00000000002147483648
。。。
这是个二进制文件,打开看一下。
[haieradmin@IBMMQ03 0]$ od -Ax -tx1 00000000000000000000
000000 00 00 00 12 80 85 f6 b4 00 00 00 86 00 00 00 00
000010 00 27 a8 07 00 00 00 12 80 86 cb 34 00 00 00 86
000020 00 00 00 00 00 27 a8 07 00 00 00 12 80 88 94 05
000030 00 00 00 86 00 00 00 00 00 27 a8 07 00 00 00 12
000040 80 88 ae 8d 00 00 00 86 00 00 00 00 00 27 a8 07
000050 00 00 00 12 80 88 b3 54 00 00 00 86 00 00 00 00
000060 00 27 a8 07 00 00 00 12 80 88 b8 1b 00 00 00 86
000070 00 00 00 00 00 27 a8 07 00 00 00 12 80 88 ba 33
000080 00 00 00 86 00 00 00 00 00 27 a8 07 00 00 00 12
000090 80 88 bc 4b 00 00 00 86 00 00 00 00 00 27 a8 07
0000a0 00 00 00 12 80 88 c7 a0 00 00 00 86 00 00 00 00
0000b0 00 27 a8 07 00 00 00 12 80 88 dc 32 00 00 00 86
0000c0 00 00 00 00 00 27 a8 07 00 00 00 12 80 89 00 74
0000d0 00 00 00 86 00 00 00 00 00 27 a8 07 00 00 00 12
0000e0 80 89 0d ab 00 00 00 86 00 00 00 00 00 27 a8 07
0000f0 00 00 00 12 80 89 16 51 00 00 00 86 00 00 00 00
000100 00 27 a8 07 00 00 00 00 00 00 00 00 00 00 00 00
000110 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
解释一下:
最左边的是这行数据开始的地址。16进制。比如第一行000000开始于第0个字节。第二行000010开始于第16个字节,这只是为了显示给你看的,方便数字节。
ConsumeQueue中每个消息时20Byte长。结构为
clipboard.pngCommitLog Offset是指这条消息在Commit Log文件中的实际偏移量
Size存储中消息的大小
Message Tag HashCode存储消息的Tag的哈希值:主要用于订阅时消息过滤(订阅时如果指定了Tag,会根据HashCode来快速查找到订阅的消息)
所以
clipboard.png上面蓝框是一条ConsumeQueue消息,红色的00000086是对应真实消息的size。
我之前故意发送了全部相同的消息,所以size都是一样的。因为tag是一样的,所以Message Tag HashCode也是一样的。
可以数一下,这个ConsumeQueue中现在一共13条数据,和前面的日志显示是一样的。
测一下消费
package com.yunsheng.simpleExample;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import java.util.List;
public class SyncConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
//声明并初始化一个consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");
consumer.setNamesrvAddr("10.135.17.26:9876;10.135.17.27:9876");
//这里设置的是一个consumer的消费策略
//CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息
//CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
//CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//设置consumer所订阅的Topic和Tag,*代表全部的Tag
consumer.subscribe("DemoTopic", "*");
//设置一个Listener,主要进行消息的逻辑处理
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
//
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
// if (msgs.size() > 0){
// System.out.println(new String(msgs.get(0).getBody()));
// }
//返回消费状态
//CONSUME_SUCCESS 消费成功
//RECONSUME_LATER 消费失败,需要稍后重新消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//调用start()方法启动consumer
consumer.start();
System.out.println("Consumer Started.");
}
}
看下输出:
ConsumeMessageThread_4 Receive New Messages: [MessageExt [queueId=3, storeSize=134, queueOffset=2, sysFlag=0, bornTimestamp=1507713810962, bornHost=/192.168.116.77:51970, storeTimestamp=1507713811012, storeHost=/10.135.17.26:10911, msgId=0A87111A00002A9F000000128088A976, commitLogOffset=79465851254, bodyCRC=1774740973, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=DemoTopic, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=13, WAIT=true, TAGS=TagA}, body=14]]]
现在查下ConsumeQueue,恩,都还在的。
不是消费过了吗?怎么还有?
但是现在再运行SyncConsumer ,并不会消费消息。
所以,一定是有某种手段保存了消费的进度。这种手段就是,在DefaultMQPushConsumer实例启动时,会到broker上,拿到本消费组broker已经记录好的消费进度(consumer offset),按照这个进度发起自己的第一次Pull请求。
所以,现在换一个consumerGroup的名字,会发现又消费了。
RocketMq的删除机制。RocketMq并不会立即删除消息,所以消息是可以被重复消费的。 RocketMq的消息时定期清除,默认3天。
网友评论