系列
开篇
-
这个系列的主要目的是介绍RocketMq producer的原理和用法,在这个系列当中会介绍 producer的启动流程、producer的路由同步、producer的消息发送流程。
-
这篇文章主要producer的路由同步,主要介绍producer从namesvr同步topic的路由信息,后续的消息发送会用到路由信息。
producer 路由同步
- producer侧的路由同步有两种途径,途径一是在消息发送过程去同步获取路由信息;途径二是通过定时任务同步获取路由信息。
消息发送过程同步路由信息
public class DefaultMQProducerImpl implements MQProducerInner {
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
// 尝试获取topic路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
}
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
// updateTopicRouteInfoFromNameServer负责更新路由
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
}
- DefaultMQProducerImpl#sendDefaultImpl负责发送消息,在过程中调用tryToFindTopicPublishInfo获取topic对应的路由信息。
- 通过updateTopicRouteInfoFromNameServer来解析从namesvr获取的topic信息保存至producer侧。
定时任务同步路由信息
public class MQClientInstance {
private void startScheduledTask() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
}
public void updateTopicRouteInfoFromNameServer() {
Set<String> topicList = new HashSet<String>();
// Consumer
{
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
Set<SubscriptionData> subList = impl.subscriptions();
if (subList != null) {
for (SubscriptionData subData : subList) {
topicList.add(subData.getTopic());
}
}
}
}
}
// Producer
{
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
Set<String> lst = impl.getPublishTopicList();
topicList.addAll(lst);
}
}
}
for (String topic : topicList) {
this.updateTopicRouteInfoFromNameServer(topic);
}
}
}
- MQClientInstance以10ms的频率执行定时任务来从namesrv获取topic对应的路由信息。
- MQClientInstance.this.updateTopicRouteInfoFromNameServer()负责获取路由信息。
- producer侧的定时负责收集producer侧的topic生成topicList,遍历topicList同步topic对应的路由信息。
路由信息生成
topic路由信息获取
public class MQClientInstance {
public boolean updateTopicRouteInfoFromNameServer(final String topic) {
return updateTopicRouteInfoFromNameServer(topic, false, null);
}
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
// 解析topicRouteData
if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}
if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
// Update Pub info
{
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}
// Update sub info
{
Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicSubscribeInfo(topic, subscribeInfo);
}
}
}
log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true;
}
} else {
log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
}
} catch (MQClientException e) {
} catch (RemotingException e) {
} finally {
this.lockNamesrv.unlock();
}
} else {
}
} catch (InterruptedException e) {
}
return false;
}
}
- updateTopicRouteInfoFromNameServer负责和namesrv通信获取路由信息。
- 通过mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000*3)来实现。
public class TopicRouteData extends RemotingSerializable {
private String orderTopicConf;
private List<QueueData> queueDatas;
private List<BrokerData> brokerDatas;
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
public class QueueData implements Comparable<QueueData> {
private String brokerName;
private int readQueueNums;
private int writeQueueNums;
private int perm;
private int topicSynFlag;
}
public class BrokerData implements Comparable<BrokerData> {
private String cluster;
private String brokerName;
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
private final Random random = new Random();
}
- TopicRouteData包含List<QueueData> queueDatas 和 List<BrokerData> brokerDatas。
- QueueData保存topic下所有broker信息,表明topic在broker节点上队列信息。
- BrokerData保存brokerName对应的ip:port地址。
topic路由信息解析
public class MQClientInstance {
public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
TopicPublishInfo info = new TopicPublishInfo();
info.setTopicRouteData(route);
if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
// 省略相关代码
} else {
List<QueueData> qds = route.getQueueDatas();
// 按照brokerName进行排序
Collections.sort(qds);
// 遍历所有broker生成队列维度信息
for (QueueData qd : qds) {
// 具备写能力的QueueData能够用于队列生成
if (PermName.isWriteable(qd.getPerm())) {
BrokerData brokerData = null;
// 遍历brokerDatas查找该topic下的brokerData
for (BrokerData bd : route.getBrokerDatas()) {
if (bd.getBrokerName().equals(qd.getBrokerName())) {
brokerData = bd;
break;
}
}
if (null == brokerData) {
continue;
}
if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
continue;
}
// 遍历QueueData的写队列数,生成MessageQueue,
// 并添加TopicPublishInfo用以生成该topic的TopicPublishInfo
for (int i = 0; i < qd.getWriteQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
info.getMessageQueueList().add(mq);
}
}
}
info.setOrderTopic(false);
}
return info;
}
}
- topic路由信息的解析按照以下步骤进行,根据brokerName先排序,针对排序后的QueueData以写队列的个数来构建MessageQueue。
- Collections.sort(qds)按照brokerName来进行排序。
- new MessageQueue(topic, qd.getBrokerName(), i) 针对每个brokerName下的队列个数构建MessageQueue。
public class TopicPublishInfo {
private boolean orderTopic = false;
private boolean haveTopicRouterInfo = false;
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private TopicRouteData topicRouteData;
}
public class MessageQueue implements Comparable<MessageQueue>, Serializable {
private static final long serialVersionUID = 6191200464116433425L;
private String topic;
private String brokerName;
private int queueId;
}
- TopicPublishInfo用以保存最细粒度的queue对象,由messageQueueList来保存。
- MessageQueue保存producer发送消息时候需要选择具体的最细粒度的队列。
TopicPublishInfo举例
{
"TBW102": [{
"brokerName": "broker-a",
"perm": 7,
"readQueueNums": 8,
"topicSynFlag": 0,
"writeQueueNums": 8
}, {
"brokerName": "broker-b",
"perm": 7,
"readQueueNums": 8,
"topicSynFlag": 0,
"writeQueueNums": 8
}]
}
-
topic(名为TBW102)在broker-a和broker-b上存在队列信息。
-
首先按照broker-a、broker-b的顺序针对broker信息进行排序。
-
针对broker-a会生成8个MessageQueue对象,MessageQueue的topic为TBW102,brokerName为broker-a,queueId分别是0-7。
-
针对broker-b会生成8个MessageQueue对象,MessageQueue的topic为TBW102,brokerName为broker-b,queueId分别是0-7。
-
topic(名为TBW102)的TopicPublishInfo整体包含16个MessageQueue对象,其中有8个broker-a的MessageQueue,有8个broker-b的MessageQueue。
-
MessageQueue是以topic+brokerName+queueId作为维度的Queue对象。
网友评论