在broker中是会把消息存放在commitLog中的,在后台还会把消息的逻辑位置(类似索引)存放到consumeQueue中。所以在获取消息的时候,broker端是先读取consumeQueue中的消息逻辑位置,拿到offset后再去commitLog中获取消息返回给client端。
在broker启动初始化的时候回去注册PullMessageProcessor来处理获取消息的请求,接下来从这个类来查看对拉取消息请求的处理:
final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
在PullMessageProcessor中的processRequest可到,获取消息的请求处理也是交给了MessageStore来进行获取消息:
//根据offset去ConsumeQueue中获取消息在commitLog中的逻辑位置
SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
//从commitLog中获取消息
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
从DefaultMessageStore中很简单地就可以看出获取消息的大致逻辑,线程consumeQueue中获取逻辑消息位置然后再去commitLog中获取实际消息。
ConsumeQueue的维护:在rocketMq中,每个MessageQueue都对应有一个consumeQueue,他类似于是消息的逻辑队列。
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
······
//这个是一个后台线程,轮询把新写入的消息写入到consumeQueue中
this.reputMessageService = new ReputMessageService();
}
class ReputMessageService extends ServiceThread {
@Override
public void run() {
DefaultMessageStore.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
Thread.sleep(1);
//新写入的消息添加到consumeQueue
this.doReput();
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
DefaultMessageStore.log.info(this.getServiceName() + " service end");
}
}
private void doReput() {
//从mappedFile中获取到新写入的消息
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
DefaultMessageStore.this.doDispatch(dispatchRequest);
}
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
//prepared消息和rollback消息是不会往consumeQueue中存放的
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
DefaultMessageStore.this.putMessagePositionInfo(request);
break;
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
}
}
}
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
//通过topic和queueId获取到对应的consumeQueue,如果没有就创建对应queueId的consumeQueue
ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
cq.putMessagePositionInfoWrapper(dispatchRequest);
}
- 在DefaultMessageStore初始化的时候实例化了ReputMessageService,这是一个后台线程,主要靠它来进行后台轮询吧新写入到mappedFile中的消息添加到consumeQueue中。
- 后台线程不停滴从mappedFile中获取到消息,然后再把消息添加到consumeQueue中,但是也不是所有的消息都会添加到consumeQueue中,事务消息的prepared消息和rollback消息是不会被写入到consumeQueue中,所以事务消息再没有被commit的时候是不会被consumer消费的。
- 存放消息到consumeQueue中时,也是先去查看当前内存是否已经有对应的consumeQueue,没有就进行新建messageQueue对应的consumeQueue,用messageQueue的id作为key,consumeQueue作为value放到map中,然后把这个map添加到对应的topic下。所以这里可以看出MessageQueue和ConsumeQueue是一一对应的。
//这个是DefaultMessageStore中的一个属性
private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;
网友评论