美文网首页
rocketmq发送消息之寻找主题路由信息

rocketmq发送消息之寻找主题路由信息

作者: 黎明_dba5 | 来源:发表于2020-09-10 18:08 被阅读0次

    1、rocket发送消息整个过程可以概括为:检查消息->寻找主题路由信息->选择消息队列->发送消息,本文主要解析寻找主题路由信息部分。
    2、以rocketMq源码中的示例开始解读,入口是excample模块中的Producer#send()方法

    SendResult sendResult = producer.send(msg);
    

    DefaultMQProducerImpl#sendDefaultImpl方法

    Validators.checkMessage(msg, this.defaultMQProducer);
            final long invokeID = random.nextLong();
            long beginTimestampFirst = System.currentTimeMillis();
            long beginTimestampPrev = beginTimestampFirst;
            long endTimestamp = beginTimestampFirst;
            TopicPublishInfo topicPublishInfo = 
    #寻找主题路由信息入口,如果寻找不到路由信息,直接抛出异常信息
    this.tryToFindTopicPublishInfo(msg.getTopic());
            if (topicPublishInfo != null && topicPublishInfo.ok()) {
                boolean callTimeout = false;
                MessageQueue mq = null;
    

    DefaultMQProducerImpl#tryToFindTopicPublishInfo方法,第一次发送消息时,先从缓存中获取路由信息,如果没有获取到,则从NameServer查找主题信息,如果获取则返回,并且将主题信息放入缓存,如果还未获取到则从NameServer查询默认的主题信息

    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
            TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
            if (null == topicPublishInfo || !topicPublishInfo.ok()) {
                this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
                this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
                topicPublishInfo = this.topicPublishInfoTable.get(topic);
            }
    
            if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
                return topicPublishInfo;
            } else {
                this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
                topicPublishInfo = this.topicPublishInfoTable.get(topic);
                return topicPublishInfo;
            }
        }
    

    MQClientInstance#updateTopicRouteInfoFromNameServer方法从NameServer查询主题信息

    public boolean updateTopicRouteInfoFromNameServer(final String topic) {
            return updateTopicRouteInfoFromNameServer(topic, false, null);
        }
    
    public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
            DefaultMQProducer defaultMQProducer) {
            try {
                if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                    try {
                        TopicRouteData topicRouteData;
                        #获取默认主题
                        if (isDefault && defaultMQProducer != null) {
                            topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                                1000 * 3);
                            if (topicRouteData != null) {
                                for (QueueData data : topicRouteData.getQueueDatas()) {
                                    int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                                    data.setReadQueueNums(queueNums);
                                    data.setWriteQueueNums(queueNums);
                                }
                            }
                        } else {
                            #从NameSever获取指定的主题路由信息,底层使用Netty
                            topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                        }
                        if (topicRouteData != null) {
                            #因为之前在DefaultMQProducerImpl中已经缓存了主题路由信息,包括空的主题路由信息,因此这里需要检查是否要更新
                            TopicRouteData old = this.topicRouteTable.get(topic);
                            #比较时是将两个目标对象深度克隆后进行比对,防止影响原来的目标对象
                            boolean changed = topicRouteDataIsChange(old, topicRouteData);
                            if (!changed) {
                                changed = this.isNeedUpdateTopicRouteInfo(topic);
                            } else {
                                log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
                            }
                            #如果需要更新,这更新DefaultMQProducerImpl
                            if (changed) {
                                TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
                                #更新topic的broker地址信息
                                for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                    this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                                }
    
                                // Update Pub info 更新DefaultMQProducerImpl中的topicPublishInfoTable缓存信息
                                {
                                    TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                                    publishInfo.setHaveTopicRouterInfo(true);
                                    Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                                    while (it.hasNext()) {
                                        Entry<String, MQProducerInner> entry = it.next();
                                        MQProducerInner impl = entry.getValue();
                                        if (impl != null) {
                                            impl.updateTopicPublishInfo(topic, publishInfo);
                                        }
                                    }
                                }
    
                                // Update sub info更新消费者信息
                                {
                                    Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                                    Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
                                    while (it.hasNext()) {
                                        Entry<String, MQConsumerInner> entry = it.next();
                                        MQConsumerInner impl = entry.getValue();
                                        if (impl != null) {
                                            impl.updateTopicSubscribeInfo(topic, subscribeInfo);
                                        }
                                    }
                                }
                                log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
                                #将原始的主题路由信息放入缓存
                                this.topicRouteTable.put(topic, cloneTopicRouteData);
                                return true;
                            }
                        } else {
                            log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
                        }
                    } catch (MQClientException e) {
                        if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                            log.warn("updateTopicRouteInfoFromNameServer Exception", e);
                        }
                    } catch (RemotingException e) {
                        log.error("updateTopicRouteInfoFromNameServer Exception", e);
                        throw new IllegalStateException(e);
                    } finally {
                        this.lockNamesrv.unlock();
                    }
                } else {
                    log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);
                }
            } catch (InterruptedException e) {
                log.warn("updateTopicRouteInfoFromNameServer Exception", e);
            }
    
            return false;
        }
    

    MQClientInstance#topicRouteData2TopicPublishInfo方法从TopicRouteData路由信息中获取主题发布相关信息

    public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
            TopicPublishInfo info = new TopicPublishInfo();
            info.setTopicRouteData(route);
            #设置有序broker信息list
            if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
                String[] brokers = route.getOrderTopicConf().split(";");
                for (String broker : brokers) {
                    String[] item = broker.split(":");
                    int nums = Integer.parseInt(item[1]);
                    for (int i = 0; i < nums; i++) {
                        MessageQueue mq = new MessageQueue(topic, item[0], i);
                        info.getMessageQueueList().add(mq);
                    }
                }
    
                info.setOrderTopic(true);
            } else {
                List<QueueData> qds = route.getQueueDatas();
                Collections.sort(qds);
                for (QueueData qd : qds) {
                    #只获取有写权限的队列
                    if (PermName.isWriteable(qd.getPerm())) {
                        BrokerData brokerData = null;
                        for (BrokerData bd : route.getBrokerDatas()) {
                            if (bd.getBrokerName().equals(qd.getBrokerName())) {
                                brokerData = bd;
                                break;
                            }
                        }
    
                        if (null == brokerData) {
                            continue;
                        }
                        #只使用master节点
                        if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
                            continue;
                        }
    
                        for (int i = 0; i < qd.getWriteQueueNums(); i++) {
                            MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                            info.getMessageQueueList().add(mq);
                        }
                    }
                }
    
                info.setOrderTopic(false);
            }
    
            return info;
        }
    

    具体请求NameServer的主题路由信息省略,到此,第一次发送消息获取主题路由信息就已完成。

    相关文章

      网友评论

          本文标题:rocketmq发送消息之寻找主题路由信息

          本文链接:https://www.haomeiwen.com/subject/xnvpektx.html