美文网首页
RocketMQ源码阅读(八)-broker接收消息

RocketMQ源码阅读(八)-broker接收消息

作者: _呆瓜_ | 来源:发表于2018-01-03 16:36 被阅读237次

本文分析broker在接收到消息生产者发送的消息后的处理. broker通过底层的通信框架接收到消息后会交给SendMessageProcessor处理. SendMessageProcessor的处理过程如下:

  • 1.解析请求头
  • 2.构造消息上下文
  • 3.执行消息存储前的hook函数
  • 4.存储消息
  • 5.执行消息存储后的hook函数

解析请求头

这一步比较简单, 解析出消息中的ProducerGroup, Topic, DefaultTopicQueueNums等信息, 相关代码在:AbstractSendMessageProcessor#parseRequestHeader()和RemotingCommand#decodeCommandCustomHeader()中.

构造消息上下文&执行hook函数

RoketMQ的代码里包含了这些代码, 原本用意是在消息存储前和存储后执行一些hook函数(例如把消息详情打印出来), 但是在我看的版本里(先看的4.1.0-incubating-SNAPSHOT, 后来切换到了release-4.0.0-incubating), 并没有真正使用.

存储消息

存储消息主要分为一下几步:
1.创建一个response对象, 用于返回处理结果;
2.设置相关信息, requestId , message body, request queue和topic等;
3.封装信息到MessageExtBrokerInner中;
4.调用MessageStore存储MessageExtBrokerInner;
5.根据MessageStore返回的结果, 给出响应;

详细代码在SendMessageProcessor#sendMessage中, 此处不再贴出.
MessageExtBrokerInner完成后,会调用this.brokerController.getMessageStore().putMessage(msgInner)来存储消息, 关于这一步的详细分析请参考消息存储部分.

下文分析broker处理消息消费的过程.

相关文章

网友评论

      本文标题:RocketMQ源码阅读(八)-broker接收消息

      本文链接:https://www.haomeiwen.com/subject/nxhqnxtx.html