rocketmq将消息存储在commitlog中,为了快速的读取当前topic以及对应队列的内容信息,rocketmq有个consumerqueue,根据每个topic为一个目录,下面每个queueId也有自己的文件,记录了当前queue在commitlog中的位置是什么。
通过brokerController初始化时,同时初始化MessageStore,其中的ReputMessageService线程不断的产生consumequeue,以及index文件。产生的buffer数据,通过FlushConsumeQueueService线程,刷盘到磁盘中。
因为commitlog是所有的topic混合存储的,所以根据conusmequeue去读取是随机读取的。但是因为MappedByteBuffer这种内存映射,可以直接将用户进程私有地址空间中的一块区域与文件对象建立映射关系,就差不多是直接从内存中完成对文件进行读写操作。而这种映射的大小有所限制,所以commitlog设置了1个g一个文件。而consumequeue其实整体而言也是一直往后消费的,所以大部分情况也是从内存中读取,所以性能也很好。
consumerqueue生成好后,consumer就根据consumerqueue来进行拉取数据,在代码PullMessageProcessor类中的processRequest方法拉取broker中的数据,在MessageStore#getMessage中获取消息。
PullRequestHoldService这个线程,执行长轮询的逻辑,如果获取不到消息,则hold住请求一阵子。rocketmq中有很多这种继承了serviceThread的线程。都做了很多重要的事情。
consumer如果消费失败的话,会调用SendMessageProcessor#consumerSendMsgBack来回写commitlog
网友评论