by shihang.mai
1. 流程图
![](https://img.haomeiwen.com/i16110947/58a56e418e037bc2.png)
2. 路由发现过程
- broker向所有的nameServer注册,然后定时发送心跳给所有的nameServer
- 心跳包有两个属性,header+crc32的body,
- 发心跳包给nameServer,nameServer通过核心类DefaultRequestProcessor处理,RequestCode=REGISTER_BROKER,将心跳包信息分别放到brokerAddrTable和topicQueueTable
- producer发送TopicName给nameServer,一样用核心类DefaultRequestProcessor处理,但是RequestCode=GET_ROUTEINFO_BY_TOPIC,然后通过pickupTopicRouteData()方法,
- 拿TopicName去匹配topicQueueTable,放入List<QueueData>
- 有了List<QueueData>,拿brokerName去匹配brokerAddrTable,放入List<BrokerData>
- 将List<QueueData>和List<BrokerData>封装为TopicRouteData返回给producer
- 有了BrokerData定位ip,有了queueData定位queue,这样就可以向特定的broker特定的queue发msg了
BrokerData
private String cluster;
private String brokerName;
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
QueueData
private String brokerName;
private int readQueueNums;
private int writeQueueNums;
private int perm;
private int topicSynFlag;
consumer同理
3. 题外话
-
rocketmq4.5前并不支持容灾,即master挂了,salve并不会自动补上。
-
在4.5后,利用DLedger支持容灾,DLedger对commitlog包装。
-
DLedger作用: 选举、数据复制(用的commitlog)
网友评论