美文网首页rocketMq理论与实践
RocketMQ producer 路由同步

RocketMQ producer 路由同步

作者: 晴天哥_王志 | 来源:发表于2020-05-02 19:39 被阅读0次

系列

开篇

  • 这个系列的主要目的是介绍RocketMq producer的原理和用法,在这个系列当中会介绍 producer的启动流程、producer的路由同步、producer的消息发送流程。

  • 这篇文章主要producer的路由同步,主要介绍producer从namesvr同步topic的路由信息,后续的消息发送会用到路由信息。

producer 路由同步

  • producer侧的路由同步有两种途径,途径一是在消息发送过程去同步获取路由信息;途径二是通过定时任务同步获取路由信息。

消息发送过程同步路由信息

public class DefaultMQProducerImpl implements MQProducerInner {

    private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

        this.makeSureStateOK();
        Validators.checkMessage(msg, this.defaultMQProducer);
        final long invokeID = random.nextLong();
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;
        // 尝试获取topic路由信息
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    }


    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {

            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            // updateTopicRouteInfoFromNameServer负责更新路由
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }

        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }
}
  • DefaultMQProducerImpl#sendDefaultImpl负责发送消息,在过程中调用tryToFindTopicPublishInfo获取topic对应的路由信息。
  • 通过updateTopicRouteInfoFromNameServer来解析从namesvr获取的topic信息保存至producer侧。

定时任务同步路由信息

public class MQClientInstance {

    private void startScheduledTask() {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                } catch (Exception e) {
                    log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                }
            }
        }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
    }


    public void updateTopicRouteInfoFromNameServer() {
        Set<String> topicList = new HashSet<String>();

        // Consumer
        {
            Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
            while (it.hasNext()) {
                Entry<String, MQConsumerInner> entry = it.next();
                MQConsumerInner impl = entry.getValue();
                if (impl != null) {
                    Set<SubscriptionData> subList = impl.subscriptions();
                    if (subList != null) {
                        for (SubscriptionData subData : subList) {
                            topicList.add(subData.getTopic());
                        }
                    }
                }
            }
        }

        // Producer
        {
            Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
            while (it.hasNext()) {
                Entry<String, MQProducerInner> entry = it.next();
                MQProducerInner impl = entry.getValue();
                if (impl != null) {
                    Set<String> lst = impl.getPublishTopicList();
                    topicList.addAll(lst);
                }
            }
        }

        for (String topic : topicList) {
            this.updateTopicRouteInfoFromNameServer(topic);
        }
    }
}
  • MQClientInstance以10ms的频率执行定时任务来从namesrv获取topic对应的路由信息。
  • MQClientInstance.this.updateTopicRouteInfoFromNameServer()负责获取路由信息。
  • producer侧的定时负责收集producer侧的topic生成topicList,遍历topicList同步topic对应的路由信息。

路由信息生成

topic路由信息获取

public class MQClientInstance {

    public boolean updateTopicRouteInfoFromNameServer(final String topic) {
        return updateTopicRouteInfoFromNameServer(topic, false, null);
    }


    public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
        try {
            if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    TopicRouteData topicRouteData;
                    if (isDefault && defaultMQProducer != null) {
                        topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                            1000 * 3);
                        if (topicRouteData != null) {
                            for (QueueData data : topicRouteData.getQueueDatas()) {
                                int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                                data.setReadQueueNums(queueNums);
                                data.setWriteQueueNums(queueNums);
                            }
                        }
                    } else {
                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                    }

                    // 解析topicRouteData
                    if (topicRouteData != null) {
                        TopicRouteData old = this.topicRouteTable.get(topic);
                        boolean changed = topicRouteDataIsChange(old, topicRouteData);
                        if (!changed) {
                            changed = this.isNeedUpdateTopicRouteInfo(topic);
                        } else {
                            log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
                        }

                        if (changed) {
                            TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();

                            for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                            }

                            // Update Pub info
                            {
                                TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                                publishInfo.setHaveTopicRouterInfo(true);
                                Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                                while (it.hasNext()) {
                                    Entry<String, MQProducerInner> entry = it.next();
                                    MQProducerInner impl = entry.getValue();
                                    if (impl != null) {
                                        impl.updateTopicPublishInfo(topic, publishInfo);
                                    }
                                }
                            }

                            // Update sub info
                            {
                                Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                                Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
                                while (it.hasNext()) {
                                    Entry<String, MQConsumerInner> entry = it.next();
                                    MQConsumerInner impl = entry.getValue();
                                    if (impl != null) {
                                        impl.updateTopicSubscribeInfo(topic, subscribeInfo);
                                    }
                                }
                            }
                            log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
                            this.topicRouteTable.put(topic, cloneTopicRouteData);
                            return true;
                        }
                    } else {
                        log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
                    }
                } catch (MQClientException e) {

                } catch (RemotingException e) {

                } finally {
                    this.lockNamesrv.unlock();
                }
            } else {
            }
        } catch (InterruptedException e) {
        }

        return false;
    }
}
  • updateTopicRouteInfoFromNameServer负责和namesrv通信获取路由信息。
  • 通过mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000*3)来实现。
public class TopicRouteData extends RemotingSerializable {
    private String orderTopicConf;
    private List<QueueData> queueDatas;
    private List<BrokerData> brokerDatas;
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}


public class QueueData implements Comparable<QueueData> {
    private String brokerName;
    private int readQueueNums;
    private int writeQueueNums;
    private int perm;
    private int topicSynFlag;
}


public class BrokerData implements Comparable<BrokerData> {
    private String cluster;
    private String brokerName;
    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
    private final Random random = new Random();
}
  • TopicRouteData包含List<QueueData> queueDatas 和 List<BrokerData> brokerDatas。
  • QueueData保存topic下所有broker信息,表明topic在broker节点上队列信息
  • BrokerData保存brokerName对应的ip:port地址

topic路由信息解析

public class MQClientInstance {

    public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {

        TopicPublishInfo info = new TopicPublishInfo();
        info.setTopicRouteData(route);
        if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
          // 省略相关代码

        } else {
            List<QueueData> qds = route.getQueueDatas();
            // 按照brokerName进行排序
            Collections.sort(qds);
            // 遍历所有broker生成队列维度信息
            for (QueueData qd : qds) {
                // 具备写能力的QueueData能够用于队列生成
                if (PermName.isWriteable(qd.getPerm())) {
                    BrokerData brokerData = null;
                    // 遍历brokerDatas查找该topic下的brokerData
                    for (BrokerData bd : route.getBrokerDatas()) {
                        if (bd.getBrokerName().equals(qd.getBrokerName())) {
                            brokerData = bd;
                            break;
                        }
                    }

                    if (null == brokerData) {
                        continue;
                    }

                    if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
                        continue;
                    }
                    // 遍历QueueData的写队列数,生成MessageQueue,
                    // 并添加TopicPublishInfo用以生成该topic的TopicPublishInfo
                    for (int i = 0; i < qd.getWriteQueueNums(); i++) {
                        MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                        info.getMessageQueueList().add(mq);
                    }
                }
            }

            info.setOrderTopic(false);
        }

        return info;
    }
}
  • topic路由信息的解析按照以下步骤进行,根据brokerName先排序,针对排序后的QueueData以写队列的个数来构建MessageQueue。
  • Collections.sort(qds)按照brokerName来进行排序。
  • new MessageQueue(topic, qd.getBrokerName(), i) 针对每个brokerName下的队列个数构建MessageQueue。
public class TopicPublishInfo {
    private boolean orderTopic = false;
    private boolean haveTopicRouterInfo = false;
    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    private TopicRouteData topicRouteData;
}

public class MessageQueue implements Comparable<MessageQueue>, Serializable {
    private static final long serialVersionUID = 6191200464116433425L;
    private String topic;
    private String brokerName;
    private int queueId;
}
  • TopicPublishInfo用以保存最细粒度的queue对象,由messageQueueList来保存
  • MessageQueue保存producer发送消息时候需要选择具体的最细粒度的队列

TopicPublishInfo举例

{
    "TBW102": [{
        "brokerName": "broker-a",
        "perm": 7,
        "readQueueNums": 8,
        "topicSynFlag": 0,
        "writeQueueNums": 8
    }, {
        "brokerName": "broker-b",
        "perm": 7,
        "readQueueNums": 8,
        "topicSynFlag": 0,
        "writeQueueNums": 8
    }]
}
  • topic(名为TBW102)在broker-a和broker-b上存在队列信息。

  • 首先按照broker-a、broker-b的顺序针对broker信息进行排序。

  • 针对broker-a会生成8个MessageQueue对象,MessageQueue的topic为TBW102,brokerName为broker-a,queueId分别是0-7。

  • 针对broker-b会生成8个MessageQueue对象,MessageQueue的topic为TBW102,brokerName为broker-b,queueId分别是0-7。

  • topic(名为TBW102)的TopicPublishInfo整体包含16个MessageQueue对象,其中有8个broker-a的MessageQueue,有8个broker-b的MessageQueue。

  • MessageQueue是以topic+brokerName+queueId作为维度的Queue对象

相关文章

网友评论

    本文标题:RocketMQ producer 路由同步

    本文链接:https://www.haomeiwen.com/subject/ocmkghtx.html