美文网首页RocketMQ源码解析
RocketMQ源码-普通消息发送

RocketMQ源码-普通消息发送

作者: pigcoffee | 来源:发表于2019-08-14 10:54 被阅读0次

一、问题思考

    1、DefaultMQProducerImpl如何发送多个topic消息?

    2、如何选取MessageQueue?

    3、发送失败是如何进行重试的?

    4、超时时间怎么判断?

二、消息发送流程

消息发送流程

1、获取TopicPublishInfo

    根据msg.topic从topicPublishInfoTable中获取TopicPublishInfo;

    未取到则从NameServer拉取topic信息,超时时间为3000ms;

    更新topicPublishInfoTable;

详细源码如下:

TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,

        DefaultMQProducer defaultMQProducer) {

        try {

            if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {

                try {

                    TopicRouteData topicRouteData;

                    if (isDefault && defaultMQProducer != null) {

                        topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),

                            1000 * 3);

                        if (topicRouteData != null) {

                            for (QueueData data : topicRouteData.getQueueDatas()) {

                                int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());

                                data.setReadQueueNums(queueNums);

                                data.setWriteQueueNums(queueNums);

                            }

                        }

                    } else {

                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);

                    }

                    if (topicRouteData != null) {

                        TopicRouteData old = this.topicRouteTable.get(topic);

                        boolean changed = topicRouteDataIsChange(old, topicRouteData);

                        if (!changed) {

                            changed = this.isNeedUpdateTopicRouteInfo(topic);

                        } else {

                            log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);

                        }

                        if (changed) {

                            TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();

                            for (BrokerData bd : topicRouteData.getBrokerDatas()) {

                                this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());

                            }

                            // Update Pub info

                            {

                                TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);

                                publishInfo.setHaveTopicRouterInfo(true);

                                Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();

                                while (it.hasNext()) {

                                    Entry<String, MQProducerInner> entry = it.next();

                                    MQProducerInner impl = entry.getValue();

                                    if (impl != null) {

                                        impl.updateTopicPublishInfo(topic, publishInfo);

                                    }

                                }

                            }

                            // Update sub info

                            {

                                Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);

                                Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();

                                while (it.hasNext()) {

                                    Entry<String, MQConsumerInner> entry = it.next();

                                    MQConsumerInner impl = entry.getValue();

                                    if (impl != null) {

                                        impl.updateTopicSubscribeInfo(topic, subscribeInfo);

                                    }

                                }

                            }

                            log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);

                            this.topicRouteTable.put(topic, cloneTopicRouteData);

                            return true;

                        }

                    } else {

                        log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);

                    }

                } catch (Exception e) {

                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {

                        log.warn("updateTopicRouteInfoFromNameServer Exception", e);

                    }

                } finally {

                    this.lockNamesrv.unlock();

                }

            } else {

                log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);

            }

        } catch (InterruptedException e) {

            log.warn("updateTopicRouteInfoFromNameServer Exception", e);

        }

        return false;

    }

2、选取MessageQueue

MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {

        if (this.sendLatencyFaultEnable) {

            try {

                int index = tpInfo.getSendWhichQueue().getAndIncrement();

                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {

                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();

                    if (pos < 0)

                        pos = 0;

                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);

                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {

                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))

                            return mq;

                    }

                }

                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();

                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);

                if (writeQueueNums > 0) {

                    final MessageQueue mq = tpInfo.selectOneMessageQueue();

                    if (notBestBroker != null) {

                        mq.setBrokerName(notBestBroker);

                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);

                    }

                    return mq;

                } else {

                    latencyFaultTolerance.remove(notBestBroker);

                }

            } catch (Exception e) {

                log.error("Error occurred when selecting message queue", e);

            }

            return tpInfo.selectOneMessageQueue();

        }

        return tpInfo.selectOneMessageQueue(lastBrokerName);

    }

3、发送失败重试

    发送失败后选取其他Broker进行消息发送;

4、超时判断

long costTime = beginTimestampPrev - beginTimestampFirst;

if (timeout < costTime) {

    callTimeout = true;

    break;

}

相关文章

网友评论

    本文标题:RocketMQ源码-普通消息发送

    本文链接:https://www.haomeiwen.com/subject/jklmrftx.html