美文网首页
(六)生产者是如何发送消息的---根据topic获取路由信息

(六)生产者是如何发送消息的---根据topic获取路由信息

作者: guessguess | 来源:发表于2021-06-21 20:11 被阅读0次

    先来看看DefaultMQProducer的结构。


    DefaultMQProducer的结构

    先分别来说说这些类的基本功能

    MQAdmin

    来看看这个接口的定义

    public interface MQAdmin {
        void createTopic(final String key, final String newTopic, final int queueNum)
            throws MQClientException;
        void createTopic(String key, String newTopic, int queueNum, int topicSysFlag)
            throws MQClientException;
        long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException;
        long maxOffset(final MessageQueue mq) throws MQClientException;
        long minOffset(final MessageQueue mq) throws MQClientException;
        long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException;
        MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException,
            InterruptedException, MQClientException;
        QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
            final long end) throws MQClientException, InterruptedException;
        MessageExt viewMessage(String topic,
            String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
    }
    

    上面这个接口,定义了许多方法。
    创建topic
    查询队列偏移量
    通过消息id查询消息信息
    根据关键字(key)分页查询topic中消息

    MQProducer

        void start() throws MQClientException;
        void shutdown();
        List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;
        SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,
            InterruptedException;
        SendResult send(final Message msg, final long timeout) throws MQClientException,
            RemotingException, MQBrokerException, InterruptedException;
        void send(final Message msg, final SendCallback sendCallback) throws MQClientException,
            RemotingException, InterruptedException;
        void send(final Message msg, final SendCallback sendCallback, final long timeout)
            throws MQClientException, RemotingException, InterruptedException;
        void sendOneway(final Message msg) throws MQClientException, RemotingException,
            InterruptedException;
        SendResult send(final Message msg, final MessageQueue mq) throws MQClientException,
            RemotingException, MQBrokerException, InterruptedException;
        SendResult send(final Message msg, final MessageQueue mq, final long timeout)
            throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
        void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback)
            throws MQClientException, RemotingException, InterruptedException;
        void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, long timeout)
            throws MQClientException, RemotingException, InterruptedException;
        void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException,
            RemotingException, InterruptedException;
        SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)
            throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
        SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg,
            final long timeout) throws MQClientException, RemotingException, MQBrokerException,
            InterruptedException;
        void send(final Message msg, final MessageQueueSelector selector, final Object arg,
            final SendCallback sendCallback) throws MQClientException, RemotingException,
            InterruptedException;
        void send(final Message msg, final MessageQueueSelector selector, final Object arg,
            final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException,
            InterruptedException;
        void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg)
            throws MQClientException, RemotingException, InterruptedException;
    
        TransactionSendResult sendMessageInTransaction(final Message msg,
            final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;
        TransactionSendResult sendMessageInTransaction(final Message msg,
            final Object arg) throws MQClientException;
        SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,
            InterruptedException;
        SendResult send(final Collection<Message> msgs, final long timeout) throws MQClientException,
            RemotingException, MQBrokerException, InterruptedException;
        SendResult send(final Collection<Message> msgs, final MessageQueue mq) throws MQClientException,
            RemotingException, MQBrokerException, InterruptedException;
        SendResult send(final Collection<Message> msgs, final MessageQueue mq, final long timeout)
            throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
        void send(final Collection<Message> msgs, final SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException,
            InterruptedException;
        void send(final Collection<Message> msgs, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException,
            MQBrokerException, InterruptedException;
        void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback) throws MQClientException, RemotingException,
            MQBrokerException, InterruptedException;
        void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback, final long timeout) throws MQClientException,
            RemotingException, MQBrokerException, InterruptedException;
        Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException,
            RemotingException, MQBrokerException, InterruptedException;
        void request(final Message msg, final RequestCallback requestCallback, final long timeout)
            throws MQClientException, RemotingException, InterruptedException, MQBrokerException;
        Message request(final Message msg, final MessageQueueSelector selector, final Object arg,
            final long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException,
            InterruptedException;
        void request(final Message msg, final MessageQueueSelector selector, final Object arg,
            final RequestCallback requestCallback,
            final long timeout) throws MQClientException, RemotingException,
            InterruptedException, MQBrokerException;
        Message request(final Message msg, final MessageQueue mq, final long timeout)
            throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException;
        void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
            throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
    
    

    从上面看,方法还是很多的,但是主要功能还是发送消息,以及生产者的启动与停止

    ClientConfig

    更多是一个配置类,代码如下

    public class ClientConfig {
        注册中心的地址
        private String namesrvAddr = NameServerAddressUtils.getNameServerAddresses();
        客户端本地的ip地址
        private String clientIP = RemotingUtil.getLocalAddress();
        客户端的实例名称
        private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
        /**
         * Pulling topic information interval from the named server
         */
        private int pollNameServerInterval = 1000 * 30;
        /**
         * Heartbeat interval in microseconds with message broker
         */
        private int heartbeatBrokerInterval = 1000 * 30;
    }
    

    这个类主要是起一个配置的作用。
    封装了客户端的信息,以及与注册中心,Broker的通信的心跳时长

    DefaultMQProducer

    DefaultMQProducer实现了MQAdmin以及MQProducer接口。
    说明具备了管理topic的功能(创建topic,从topic中查询信息),以及发送消息的功能。
    DefaultMQProducer继承了ClientConfig
    当然需要管理topic,发送消息,这些必然需要知道一些基本的信息,比如注册中心地址,broker地址。

    首先还是直接通过源码入手。
    直接看DefaultMQProducer的send方法。代码如下

    public class DefaultMQProducer extends ClientConfig implements MQProducer {
        protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
        @Override
        public SendResult send(
            Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
            Validators.checkMessage(msg, this);
            msg.setTopic(withNamespace(msg.getTopic()));
            return this.defaultMQProducerImpl.send(msg);
        }
    }
    

    最终定位到实现消息发送的代码如下

    public class DefaultMQProducerImpl implements MQProducerInner {
        private SendResult sendDefaultImpl(
            Message msg,
            final CommunicationMode communicationMode,
            final SendCallback sendCallback,
            final long timeout
        ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
            this.makeSureStateOK();
            Validators.checkMessage(msg, this.defaultMQProducer);
            final long invokeID = random.nextLong();
            long beginTimestampFirst = System.currentTimeMillis();
            long beginTimestampPrev = beginTimestampFirst;
            long endTimestamp = beginTimestampFirst;
            根据的消息主题,获取主题的路由信息
            TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
            if (topicPublishInfo != null && topicPublishInfo.ok()) {
                boolean callTimeout = false;
                MessageQueue mq = null;
                Exception exception = null;
                SendResult sendResult = null;
                int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
                int times = 0;
                String[] brokersSent = new String[timesTotal];
                发送失败可以进行重发,所以是循环机制
                for (; times < timesTotal; times++) {
                    String lastBrokerName = null == mq ? null : mq.getBrokerName();
                    从路由信息中获取合适的队列
                    MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                    if (mqSelected != null) {
                        mq = mqSelected;
                        brokersSent[times] = mq.getBrokerName();
                        try {
                            beginTimestampPrev = System.currentTimeMillis();
                            if (times > 0) {
                                //Reset topic with namespace during resend.
                                msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                            }
                            long costTime = beginTimestampPrev - beginTimestampFirst;
                            if (timeout < costTime) {
                                callTimeout = true;
                                break;
                            }
                            进行发送
                            sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                            endTimestamp = System.currentTimeMillis();
                            若延时故障开启,则会决定是否将broker加入到延时队列,以进行规避
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                            switch (communicationMode) {
                                case ASYNC:
                                    return null;
                                case ONEWAY:
                                    return null;
                                case SYNC:
                                    if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                        if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                            continue;
                                        }
                                    }
    
                                    return sendResult;
                                default:
                                    break;
                            }
                        } catch (RemotingException e) {
                            处理远程通信的异常,同时将异常的broker加入到延时故障列表中,继续遍历
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                            continue;
                        } catch (MQClientException e) {
                            处理远程通信的异常,同时将异常的broker加入到延时故障列表中,继续遍历
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                            exception = e;
                            continue;
                        } catch (MQBrokerException e) {
                            处理远程通信的异常,同时将异常的broker加入到延时故障列表中
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                            exception = e;
                            switch (e.getResponseCode()) {
                                case ResponseCode.TOPIC_NOT_EXIST:
                                case ResponseCode.SERVICE_NOT_AVAILABLE:
                                case ResponseCode.SYSTEM_ERROR:
                                case ResponseCode.NO_PERMISSION:
                                case ResponseCode.NO_BUYER_ID:
                                case ResponseCode.NOT_IN_CURRENT_UNIT:
                                    continue;
                                default:
                                    if (sendResult != null) {
                                        return sendResult;
                                    }
    
                                    throw e;
                            }
                        } catch (InterruptedException e) {
                            处理通信的异常,同时将异常的broker加入到延时故障列表中
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                            throw e;
                        }
                    } else {
                        break;
                    }
                }
    
                if (sendResult != null) {
                    return sendResult;
                }
                ...省略无关代码
                throw mqClientException;
            }
            找不到topic对应的路由信息就会抛出异常
            throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
                null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
        }
    }
    

    上面的代码实现很长。主要的代码分为几步。
    1.通过topic获取对应的路由信息
    2.从路由信息中选出合适的队列
    3.往队列对应的broker发送消息
    4.如果发送失败,可以在有限次数内进行重试发送

    如何根据topic获取对应的路由数据

    在说如何获取路由之前,要讲一下生产者保存路由的数据结构TopicPublishInfo。

    TopicPublishInfo的结构如下

    TopicPublishInfo

    获取topic的方法实现

    方法如下

    public class DefaultMQProducerImpl implements MQProducerInner {
        缓存,用于存放topic以及路由信息
        private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
            new ConcurrentHashMap<String, TopicPublishInfo>();
    
        private SendResult sendDefaultImpl(
            Message msg,
            final CommunicationMode communicationMode,
            final SendCallback sendCallback,
            final long timeout
        ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
             ...省略若干代码
            TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
             ...省略若干代码
        }
    
        private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
             优先从缓存中获取
            TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
            if (null == topicPublishInfo || !topicPublishInfo.ok()) {
                this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
                若无则通过请求去获取topic的路由信息
                this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
                topicPublishInfo = this.topicPublishInfoTable.get(topic);
            }
            如果获取之后,topicPushlishInfo是合法的,则直接返回
            if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
                return topicPublishInfo;
            } else {
                如果通过原先的topic获取不到,则直接通过默认的topic去获取路由数据
                this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
                topicPublishInfo = this.topicPublishInfoTable.get(topic);
                return topicPublishInfo;
            }
        }
    }
    

    从上面代码看,最重要的点无非就是如何获取TopicPublishInfo

    如何获取TopicPublishInfo

    从上面的代码看,总共有2个方法去获取topic的路由信息。
    但是最终也是一个方法的实现的。
    该方法实现的逻辑也比较简单,无非就是按非默认的topic去获取路由数据,另外一种则是按自定义的topic去获取路由数据。
    代码如下

    public class MQClientInstance {
        记录topic对应的路由数据
        private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
        public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
            DefaultMQProducer defaultMQProducer) {
            try {
                if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                    try {
                        TopicRouteData topicRouteData;
                        if (isDefault && defaultMQProducer != null) {
                            获取默认的topic的路由数据
                            topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                                1000 * 3);
                            if (topicRouteData != null) {
                                for (QueueData data : topicRouteData.getQueueDatas()) {
                                    int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                                    data.setReadQueueNums(queueNums);
                                    data.setWriteQueueNums(queueNums);
                                }
                            }
                        } else {
                            获取自定义的topic的路由数据
                            topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                        }
                        if (topicRouteData != null) {
                            TopicRouteData old = this.topicRouteTable.get(topic);
                            判断路由数据是否发生变化
                            boolean changed = topicRouteDataIsChange(old, topicRouteData);
                            if (!changed) {
                                如果没变化,看看是否需要去更新
                                changed = this.isNeedUpdateTopicRouteInfo(topic);
                            } else {
                                log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
                            }
                            如果发生变化
                            if (changed) {
                                TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
                                将brokerName对应的集群内容保存起来(brokerId->ip地址)
                                for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                    this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                                }
    
                                将路由数据,转化成TopicPublishInfo
                                {
                                    TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                                    publishInfo.setHaveTopicRouterInfo(true);
                                    Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                                    while (it.hasNext()) {
                                        Entry<String, MQProducerInner> entry = it.next();
                                        MQProducerInner impl = entry.getValue();
                                        if (impl != null) {
                                            设置到每个生产者的topicPublishInfoTable中。这样子每个生产者就记录了topic->的对应关系
                                            impl.updateTopicPublishInfo(topic, publishInfo);
                                        }
                                    }
                                }
    
                               省略代码...
                                this.topicRouteTable.put(topic, cloneTopicRouteData);
                                return true;
                            }
                        }
                    } catch (MQClientException e) {
                          省略代码
                    } catch (RemotingException e) {
                          省略代码
                    } finally {
                        this.lockNamesrv.unlock();
                    }
                } else {
                          省略代码
                }
            } catch (InterruptedException e) {
                          省略代码
            }
            return false;
        }
        
        判断路由数据是否发生变化
        private boolean topicRouteDataIsChange(TopicRouteData olddata, TopicRouteData nowdata) {
            if (olddata == null || nowdata == null)
                return true;
            TopicRouteData old = olddata.cloneTopicRouteData();
            TopicRouteData now = nowdata.cloneTopicRouteData();
            Collections.sort(old.getQueueDatas());
            Collections.sort(old.getBrokerDatas());
            Collections.sort(now.getQueueDatas());
            Collections.sort(now.getBrokerDatas());
            return !old.equals(now);
        }
    
        private boolean isNeedUpdateTopicRouteInfo(final String topic) {
            boolean result = false;
            {
                Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                while (it.hasNext() && !result) {
                    Entry<String, MQProducerInner> entry = it.next();
                    MQProducerInner impl = entry.getValue();
                    if (impl != null) {
                        //这个方法其实是从DefaultMQProducerImpl的变量topicPublishInfoTable中判断是否topic对应的数据是否已经存在,或者是否有效来决定是不是要更新。
                        //TopicPublishInfo prev = this.topicPublishInfoTable.get(topic);
                        //return null == prev || !prev.ok();
                        如果不存在,或者失效,则进行更新
                        result = impl.isPublishTopicNeedUpdate(topic);
                    }
                }
            }
            省略部分代码
            return result;
        }
    
        将TopicRouteData 转化成TopicPublishInfo
        public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
            TopicPublishInfo info = new TopicPublishInfo();
            info.setTopicRouteData(route);
            if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
                省略
            } else {
                List<QueueData> qds = route.getQueueDatas();
                QueueData实现了排序接口,按brokerName排序
                Collections.sort(qds);
                for (QueueData qd : qds) {
                    将可写的队列过滤出来
                    if (PermName.isWriteable(qd.getPerm())) {
                        BrokerData brokerData = null;
                        for (BrokerData bd : route.getBrokerDatas()) {
                            从路由数据的brokerData中找到对应的broker
                            if (bd.getBrokerName().equals(qd.getBrokerName())) {
                                brokerData = bd;
                                break;
                            }
                        }
                        如果没找到则跳过
                        if (null == brokerData) {
                            continue;
                        }
                        如果没有master的相关信息,则跳过
                        if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
                            continue;
                        }
                        按读写队列的数目,给队列设置id
                        for (int i = 0; i < qd.getWriteQueueNums(); i++) {
                            MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                            info.getMessageQueueList().add(mq);
                        }
                    }
                }
                info.setOrderTopic(false);
            }
            return info;
        }
    }
    

    至此生产者是如何获取topic的过程就结束了。


    获取topic对应路由数据的流程图

    相关文章

      网友评论

          本文标题:(六)生产者是如何发送消息的---根据topic获取路由信息

          本文链接:https://www.haomeiwen.com/subject/qaikyltx.html