本文分析broker在接收到消息生产者发送的消息后的处理. broker通过底层的通信框架接收到消息后会交给SendMessageProcessor处理. SendMessageProcessor的处理过程如下:
- 1.解析请求头
- 2.构造消息上下文
- 3.执行消息存储前的hook函数
- 4.存储消息
- 5.执行消息存储后的hook函数
解析请求头
这一步比较简单, 解析出消息中的ProducerGroup, Topic, DefaultTopicQueueNums等信息, 相关代码在:AbstractSendMessageProcessor#parseRequestHeader()和RemotingCommand#decodeCommandCustomHeader()中.
构造消息上下文&执行hook函数
RoketMQ的代码里包含了这些代码, 原本用意是在消息存储前和存储后执行一些hook函数(例如把消息详情打印出来), 但是在我看的版本里(先看的4.1.0-incubating-SNAPSHOT, 后来切换到了release-4.0.0-incubating), 并没有真正使用.
存储消息
存储消息主要分为一下几步:
1.创建一个response对象, 用于返回处理结果;
2.设置相关信息, requestId , message body, request queue和topic等;
3.封装信息到MessageExtBrokerInner中;
4.调用MessageStore存储MessageExtBrokerInner;
5.根据MessageStore返回的结果, 给出响应;
详细代码在SendMessageProcessor#sendMessage中, 此处不再贴出.
MessageExtBrokerInner完成后,会调用this.brokerController.getMessageStore().putMessage(msgInner)来存储消息, 关于这一步的详细分析请参考消息存储部分.
下文分析broker处理消息消费的过程.
网友评论