美文网首页
RocketMQ 存储和索引

RocketMQ 存储和索引

作者: 黄靠谱 | 来源:发表于2019-02-07 21:08 被阅读23次

    参考

    有思考的大佬博客
    https://www.jianshu.com/p/b73fdd893f98

    概述

    1. CommitLog:RocketMQ把所有Topic所有Queue的消息都持久化到同一个物理文件CommitLog中。
    2. ConsumerQueue(逻辑消费队列):本身并不存储任何消息,但是Consumer通过监听这个逻辑队列,来获取到待消费的消息
    3. IndexFile:因为所有的消息都存在CommitLog中,如果要实现根据 key 查询 消息的方法,就会变得非常困难,所以为了解决这种业务需求,有了IndexFile的存在。

    Consumer即可根据ConsumerQueue来查找待消费的消息了。
    ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。
    而IndexFile(索引文件)则只是为了消息查询提供了一种通过key或时间区间来查询消息的方法(ps:这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程)

    image

    CommitLog

    物理存储。CommitLog消息存放物理文件,每台broker上的commitLog被本机器所有queue共享不做区分。
    commitLog -> 1个mappedFileQueue -> N个mappedFile--> Message

    public class CommitLog {
        private final MappedFileQueue mappedFileQueue;
    
     MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
     result = mappedFile.appendMessage(msg, this.appendMessageCallback);
    }
    

    ConsumerQueue(逻辑消费队列)

    Consumer通过监听ConsumerQueue来感知是否有新消息。相当于是一个CommitLog的Queue级别的逻辑消费队列(CommitLog是不分Topic和Queue的)。
    事务消息就是基于这个原理:日志的prepare就是把消息写在CommitLog里面,但是不更新到ConsumerQueue里面,事务消息的Commit就是把消息更新到ConsumerQueue里面。

    1. 每个topic的每个queue都有一个唯一的ConsumerQueue,Consumequeue类文件的存储路径默认为$HOME/store/consumequeue/{topic}/{queueId}/{fileName},每个文件由30W条数据组成
    public class ConsumeQueue {
        public static final int CQ_STORE_UNIT_SIZE = 20;
        private final MappedFileQueue mappedFileQueue;
        private final String topic;
        private final int queueId;
        private final ByteBuffer byteBufferIndex;
    
    1. consume queue中存储单元是一个20字节定长的数据MESSAGE_POSITION_INFO,是顺序写顺序读,每个Queue对应一个ConsumeQueue
      MESSAGE_POSITION_INFO:
    • offset: CommitLog中的物理位移(long 8字节)
    • size: CommitLog中的日志大小(该消息可能被压缩过)(int 4字节)
    • tagsCode:和storeTimestamp相关tagsCode(long 8字节)
    1. 存储消息第一步会触发CommitLog的物理IO写消息,然后,再写CQueue,
    2. 直接面向消费者,因为每个Queue有一个单独的Consume Queue,而一个Topic下有多个Queue,从而支持并发的消费
    3. 当一个Msg从ConsumerQueue里面删除时,有一个BLANK顶上,也是20个定长字节

    IndexFile

    最终提供根据topic,key等参数,通过IndexFile的索引功能,在CommitLog中,找到offset结果的方法
    QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)

    相关文章

      网友评论

          本文标题:RocketMQ 存储和索引

          本文链接:https://www.haomeiwen.com/subject/bnvgsqtx.html