美文网首页
rocketmq发送消息之选择消息队列

rocketmq发送消息之选择消息队列

作者: 黎明_dba5 | 来源:发表于2020-09-11 18:22 被阅读0次

    DefaultMQProducerImpl#selectOneMessageQueue选择消息队列

    boolean callTimeout = false;
                MessageQueue mq = null;
                Exception exception = null;
                SendResult sendResult = null;
                #如果是同步发送消息,有3次发送机会
                int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
                int times = 0;
                String[] brokersSent = new String[timesTotal];
                for (; times < timesTotal; times++) {
                    String lastBrokerName = null == mq ? null : mq.getBrokerName();
                    #选择发送消息队列入口
                    MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                    if (mqSelected != null) {
                        mq = mqSelected;
                        brokersSent[times] = mq.getBrokerName();
                        try {
                            beginTimestampPrev = System.currentTimeMillis();
                            if (times > 0) {
                                //Reset topic with namespace during resend.
                                msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                            }
                            long costTime = beginTimestampPrev - beginTimestampFirst;
                            if (timeout < costTime) {
                                callTimeout = true;
                                break;
                            }
    
                            sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                            endTimestamp = System.currentTimeMillis();
                            #将发送延迟信息更新到延迟机制的缓存中
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                            switch (communicationMode) {
                                case ASYNC:
                                    return null;
                                case ONEWAY:
    

    lastBrokerName 为上一次发送消息的broker信息,第一次发送时为null

    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
            #如果启用了故障延迟发送机制时使用该逻辑
            if (this.sendLatencyFaultEnable) {
                try {
                    #随机选择一个队列,并且判断是否可用,如果可用则直接返回该队列
                    int index = tpInfo.getSendWhichQueue().getAndIncrement();
                    for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                        int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                        if (pos < 0)
                            pos = 0;
                        MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                        #当前时间和延迟时间比较是否可用
                        if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                            if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                                return mq;
                        }
                    }
                    #如果没找到可用的队列,这随机至少获取一个队列
                    final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                    int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                    if (writeQueueNums > 0) {
                        final MessageQueue mq = tpInfo.selectOneMessageQueue();
                        if (notBestBroker != null) {
                            mq.setBrokerName(notBestBroker);
                            mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                        }
                        return mq;
                    } else {
                        #此处不太明白移除机制
                        latencyFaultTolerance.remove(notBestBroker);
                    }
                } catch (Exception e) {
                    log.error("Error occurred when selecting message queue", e);
                }
                #轮询选择
                return tpInfo.selectOneMessageQueue();
            }
            #没有使用故障延迟发送机制时,使用此策略
            return tpInfo.selectOneMessageQueue(lastBrokerName);
        }
    
    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
            if (lastBrokerName == null) {
                #如果是第一次发送,则按照index+1的次序选择
                return selectOneMessageQueue();
            } else {
                #非第一次选择时,依次向后选择,并且不能选择上次已经发送失败的broker
                int index = this.sendWhichQueue.getAndIncrement();
                for (int i = 0; i < this.messageQueueList.size(); i++) {
                    int pos = Math.abs(index++) % this.messageQueueList.size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = this.messageQueueList.get(pos);
                    if (!mq.getBrokerName().equals(lastBrokerName)) {
                        return mq;
                    }
                }
                return selectOneMessageQueue();
            }
        }
    
    #更新延迟信息到缓存
    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
            if (this.sendLatencyFaultEnable) {
                #isolation 如果为true,这采用30秒故障隔离时长,如果为false这采用实际延迟时间计算故障隔离时长
                long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
                this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
            }
        }
    
      #根据延迟时间计算具体故障隔离时长
        private long computeNotAvailableDuration(final long currentLatency) {
            for (int i = latencyMax.length - 1; i >= 0; i--) {
                if (currentLatency >= latencyMax[i])
                    return this.notAvailableDuration[i];
            }
    
            return 0;
        }
    
    #更新延迟时间及故障隔离时长,故障隔离时长会在isAvailable方法中使用
    public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
            FaultItem old = this.faultItemTable.get(name);
            if (null == old) {
                final FaultItem faultItem = new FaultItem(name);
                faultItem.setCurrentLatency(currentLatency);
                faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
    
                old = this.faultItemTable.putIfAbsent(name, faultItem);
                if (old != null) {
                    old.setCurrentLatency(currentLatency);
                    old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
                }
            } else {
                old.setCurrentLatency(currentLatency);
                old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
            }
        }
    
    
    public boolean isAvailable(final String name) {
            final FaultItem faultItem = this.faultItemTable.get(name);
            if (faultItem != null) {
                return faultItem.isAvailable();
            }
            return true;
        }
    

    相关文章

      网友评论

          本文标题:rocketmq发送消息之选择消息队列

          本文链接:https://www.haomeiwen.com/subject/sjybektx.html