美文网首页rocketMq理论与实践
RocketMQ producer 路由同步

RocketMQ producer 路由同步

作者: 晴天哥_王志 | 来源:发表于2020-05-02 19:39 被阅读0次

    系列

    开篇

    • 这个系列的主要目的是介绍RocketMq producer的原理和用法,在这个系列当中会介绍 producer的启动流程、producer的路由同步、producer的消息发送流程。

    • 这篇文章主要producer的路由同步,主要介绍producer从namesvr同步topic的路由信息,后续的消息发送会用到路由信息。

    producer 路由同步

    • producer侧的路由同步有两种途径,途径一是在消息发送过程去同步获取路由信息;途径二是通过定时任务同步获取路由信息。

    消息发送过程同步路由信息

    public class DefaultMQProducerImpl implements MQProducerInner {
    
        private SendResult sendDefaultImpl(
            Message msg,
            final CommunicationMode communicationMode,
            final SendCallback sendCallback,
            final long timeout
        ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    
            this.makeSureStateOK();
            Validators.checkMessage(msg, this.defaultMQProducer);
            final long invokeID = random.nextLong();
            long beginTimestampFirst = System.currentTimeMillis();
            long beginTimestampPrev = beginTimestampFirst;
            long endTimestamp = beginTimestampFirst;
            // 尝试获取topic路由信息
            TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        }
    
    
        private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
            TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
            if (null == topicPublishInfo || !topicPublishInfo.ok()) {
    
                this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
                // updateTopicRouteInfoFromNameServer负责更新路由
                this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
                topicPublishInfo = this.topicPublishInfoTable.get(topic);
            }
    
            if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
                return topicPublishInfo;
            } else {
                this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
                topicPublishInfo = this.topicPublishInfoTable.get(topic);
                return topicPublishInfo;
            }
        }
    }
    
    • DefaultMQProducerImpl#sendDefaultImpl负责发送消息,在过程中调用tryToFindTopicPublishInfo获取topic对应的路由信息。
    • 通过updateTopicRouteInfoFromNameServer来解析从namesvr获取的topic信息保存至producer侧。

    定时任务同步路由信息

    public class MQClientInstance {
    
        private void startScheduledTask() {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                    } catch (Exception e) {
                        log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                    }
                }
            }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
        }
    
    
        public void updateTopicRouteInfoFromNameServer() {
            Set<String> topicList = new HashSet<String>();
    
            // Consumer
            {
                Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
                while (it.hasNext()) {
                    Entry<String, MQConsumerInner> entry = it.next();
                    MQConsumerInner impl = entry.getValue();
                    if (impl != null) {
                        Set<SubscriptionData> subList = impl.subscriptions();
                        if (subList != null) {
                            for (SubscriptionData subData : subList) {
                                topicList.add(subData.getTopic());
                            }
                        }
                    }
                }
            }
    
            // Producer
            {
                Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                while (it.hasNext()) {
                    Entry<String, MQProducerInner> entry = it.next();
                    MQProducerInner impl = entry.getValue();
                    if (impl != null) {
                        Set<String> lst = impl.getPublishTopicList();
                        topicList.addAll(lst);
                    }
                }
            }
    
            for (String topic : topicList) {
                this.updateTopicRouteInfoFromNameServer(topic);
            }
        }
    }
    
    • MQClientInstance以10ms的频率执行定时任务来从namesrv获取topic对应的路由信息。
    • MQClientInstance.this.updateTopicRouteInfoFromNameServer()负责获取路由信息。
    • producer侧的定时负责收集producer侧的topic生成topicList,遍历topicList同步topic对应的路由信息。

    路由信息生成

    topic路由信息获取

    public class MQClientInstance {
    
        public boolean updateTopicRouteInfoFromNameServer(final String topic) {
            return updateTopicRouteInfoFromNameServer(topic, false, null);
        }
    
    
        public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
            DefaultMQProducer defaultMQProducer) {
            try {
                if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                    try {
                        TopicRouteData topicRouteData;
                        if (isDefault && defaultMQProducer != null) {
                            topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                                1000 * 3);
                            if (topicRouteData != null) {
                                for (QueueData data : topicRouteData.getQueueDatas()) {
                                    int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                                    data.setReadQueueNums(queueNums);
                                    data.setWriteQueueNums(queueNums);
                                }
                            }
                        } else {
                            topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                        }
    
                        // 解析topicRouteData
                        if (topicRouteData != null) {
                            TopicRouteData old = this.topicRouteTable.get(topic);
                            boolean changed = topicRouteDataIsChange(old, topicRouteData);
                            if (!changed) {
                                changed = this.isNeedUpdateTopicRouteInfo(topic);
                            } else {
                                log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
                            }
    
                            if (changed) {
                                TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
    
                                for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                    this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                                }
    
                                // Update Pub info
                                {
                                    TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                                    publishInfo.setHaveTopicRouterInfo(true);
                                    Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                                    while (it.hasNext()) {
                                        Entry<String, MQProducerInner> entry = it.next();
                                        MQProducerInner impl = entry.getValue();
                                        if (impl != null) {
                                            impl.updateTopicPublishInfo(topic, publishInfo);
                                        }
                                    }
                                }
    
                                // Update sub info
                                {
                                    Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                                    Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
                                    while (it.hasNext()) {
                                        Entry<String, MQConsumerInner> entry = it.next();
                                        MQConsumerInner impl = entry.getValue();
                                        if (impl != null) {
                                            impl.updateTopicSubscribeInfo(topic, subscribeInfo);
                                        }
                                    }
                                }
                                log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
                                this.topicRouteTable.put(topic, cloneTopicRouteData);
                                return true;
                            }
                        } else {
                            log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
                        }
                    } catch (MQClientException e) {
    
                    } catch (RemotingException e) {
    
                    } finally {
                        this.lockNamesrv.unlock();
                    }
                } else {
                }
            } catch (InterruptedException e) {
            }
    
            return false;
        }
    }
    
    • updateTopicRouteInfoFromNameServer负责和namesrv通信获取路由信息。
    • 通过mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000*3)来实现。
    public class TopicRouteData extends RemotingSerializable {
        private String orderTopicConf;
        private List<QueueData> queueDatas;
        private List<BrokerData> brokerDatas;
        private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
    }
    
    
    public class QueueData implements Comparable<QueueData> {
        private String brokerName;
        private int readQueueNums;
        private int writeQueueNums;
        private int perm;
        private int topicSynFlag;
    }
    
    
    public class BrokerData implements Comparable<BrokerData> {
        private String cluster;
        private String brokerName;
        private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
        private final Random random = new Random();
    }
    
    • TopicRouteData包含List<QueueData> queueDatas 和 List<BrokerData> brokerDatas。
    • QueueData保存topic下所有broker信息,表明topic在broker节点上队列信息
    • BrokerData保存brokerName对应的ip:port地址

    topic路由信息解析

    public class MQClientInstance {
    
        public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
    
            TopicPublishInfo info = new TopicPublishInfo();
            info.setTopicRouteData(route);
            if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
              // 省略相关代码
    
            } else {
                List<QueueData> qds = route.getQueueDatas();
                // 按照brokerName进行排序
                Collections.sort(qds);
                // 遍历所有broker生成队列维度信息
                for (QueueData qd : qds) {
                    // 具备写能力的QueueData能够用于队列生成
                    if (PermName.isWriteable(qd.getPerm())) {
                        BrokerData brokerData = null;
                        // 遍历brokerDatas查找该topic下的brokerData
                        for (BrokerData bd : route.getBrokerDatas()) {
                            if (bd.getBrokerName().equals(qd.getBrokerName())) {
                                brokerData = bd;
                                break;
                            }
                        }
    
                        if (null == brokerData) {
                            continue;
                        }
    
                        if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
                            continue;
                        }
                        // 遍历QueueData的写队列数,生成MessageQueue,
                        // 并添加TopicPublishInfo用以生成该topic的TopicPublishInfo
                        for (int i = 0; i < qd.getWriteQueueNums(); i++) {
                            MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                            info.getMessageQueueList().add(mq);
                        }
                    }
                }
    
                info.setOrderTopic(false);
            }
    
            return info;
        }
    }
    
    • topic路由信息的解析按照以下步骤进行,根据brokerName先排序,针对排序后的QueueData以写队列的个数来构建MessageQueue。
    • Collections.sort(qds)按照brokerName来进行排序。
    • new MessageQueue(topic, qd.getBrokerName(), i) 针对每个brokerName下的队列个数构建MessageQueue。
    public class TopicPublishInfo {
        private boolean orderTopic = false;
        private boolean haveTopicRouterInfo = false;
        private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
        private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
        private TopicRouteData topicRouteData;
    }
    
    public class MessageQueue implements Comparable<MessageQueue>, Serializable {
        private static final long serialVersionUID = 6191200464116433425L;
        private String topic;
        private String brokerName;
        private int queueId;
    }
    
    • TopicPublishInfo用以保存最细粒度的queue对象,由messageQueueList来保存
    • MessageQueue保存producer发送消息时候需要选择具体的最细粒度的队列

    TopicPublishInfo举例

    {
        "TBW102": [{
            "brokerName": "broker-a",
            "perm": 7,
            "readQueueNums": 8,
            "topicSynFlag": 0,
            "writeQueueNums": 8
        }, {
            "brokerName": "broker-b",
            "perm": 7,
            "readQueueNums": 8,
            "topicSynFlag": 0,
            "writeQueueNums": 8
        }]
    }
    
    • topic(名为TBW102)在broker-a和broker-b上存在队列信息。

    • 首先按照broker-a、broker-b的顺序针对broker信息进行排序。

    • 针对broker-a会生成8个MessageQueue对象,MessageQueue的topic为TBW102,brokerName为broker-a,queueId分别是0-7。

    • 针对broker-b会生成8个MessageQueue对象,MessageQueue的topic为TBW102,brokerName为broker-b,queueId分别是0-7。

    • topic(名为TBW102)的TopicPublishInfo整体包含16个MessageQueue对象,其中有8个broker-a的MessageQueue,有8个broker-b的MessageQueue。

    • MessageQueue是以topic+brokerName+queueId作为维度的Queue对象

    相关文章

      网友评论

        本文标题:RocketMQ producer 路由同步

        本文链接:https://www.haomeiwen.com/subject/ocmkghtx.html