美文网首页Java学习资料收集
RocketMQ之producer发送消息源码分析

RocketMQ之producer发送消息源码分析

作者: nhhnhh | 来源:发表于2019-07-04 13:55 被阅读0次

    RocketMQ主要有NameServer,producer的发送,borker端的消息存储,consumer端的消费。首先我们先来看一下producer的发送
    以下是producer的发送逻辑:
    1.DefaultMQProducerImpl#sendDefaultImpl(入口)
    2.判断一下topic是否可用,为空或者不可用则不处理DefaultMQProducerImpl#tryToFindTopicPublishInfo
    3.如果topick可用,则判断一下他的发送模式来计算他的发送总次数
    4.for循环发送次数,选择发送队列,如果队列为空,则直接跳出循环DefaultMQProducerImpl#selectOneMessageQueue
    5.如果选择队列的耗时太长,那也不再进行处理。
    6.调用发送主逻辑DefaultMQProducerImpl#sendKernelImpl
    7.更新一下broker的可用时间,对broker的可用性进行更新DefaultMQProducerImpl#updateFaultItem
    8.根据发送结果,来判断是否继续循环,如果是同步模式的话,就继续for循环。

    private SendResult sendDefaultImpl(
           Message msg,
           final CommunicationMode communicationMode,
           final SendCallback sendCallback,
           final long timeout
       ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
           this.makeSureStateOK();
           Validators.checkMessage(msg, this.defaultMQProducer);
    
           final long invokeID = random.nextLong();
           long beginTimestampFirst = System.currentTimeMillis();
           long beginTimestampPrev = beginTimestampFirst;
           long endTimestamp = beginTimestampFirst;
           //获取一下topic信息
           TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
           if (topicPublishInfo != null && topicPublishInfo.ok()) {
               boolean callTimeout = false;
               MessageQueue mq = null;
               Exception exception = null;
               SendResult sendResult = null;
               //判断一下发送的模式,如果是同步的,就为重试次数+1
               int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
               int times = 0;
               String[] brokersSent = new String[timesTotal];
               for (; times < timesTotal; times++) {
                   String lastBrokerName = null == mq ? null : mq.getBrokerName();
                   //选择可用的队列
                   MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                   if (mqSelected != null) {
                       mq = mqSelected;
                       brokersSent[times] = mq.getBrokerName();
                       try {
                           //判断一下获取队列的耗时,如果超时了就认为不可用,直接跳出循环
                           beginTimestampPrev = System.currentTimeMillis();
                           long costTime = beginTimestampPrev - beginTimestampFirst;
                           if (timeout < costTime) {
                               callTimeout = true;
                               break;
                           }
                           //发送的主逻辑
                           sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                           endTimestamp = System.currentTimeMillis();
                           //更新一下broker的可用时间,对broker的可用性进行判断
                           this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                           switch (communicationMode) {
                               case ASYNC:
                                   return null;
                               case ONEWAY:
                                   return null;
                               case SYNC:
                                   //如果发送状态不为ok的话,判断是否需要重试,判断落盘失败,是否需要重试,需要的话就继续循环。默认是false。
                                   if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                       if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                           continue;
                                       }
                                   }
    
                                   return sendResult;
                               default:
                                   break;
                           }
                       } catch (RemotingException e) {
                           endTimestamp = System.currentTimeMillis();
                           this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                           log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                           log.warn(msg.toString());
                           exception = e;
                           continue;
                       } catch (MQClientException e) {
                           endTimestamp = System.currentTimeMillis();
                           this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                           log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                           log.warn(msg.toString());
                           exception = e;
                           continue;
                       } catch (MQBrokerException e) {
                           endTimestamp = System.currentTimeMillis();
                           this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                           log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                           log.warn(msg.toString());
                           exception = e;
                           switch (e.getResponseCode()) {
                               case ResponseCode.TOPIC_NOT_EXIST:
                               case ResponseCode.SERVICE_NOT_AVAILABLE:
                               case ResponseCode.SYSTEM_ERROR:
                               case ResponseCode.NO_PERMISSION:
                               case ResponseCode.NO_BUYER_ID:
                               case ResponseCode.NOT_IN_CURRENT_UNIT:
                                   continue;
                               default:
                                   if (sendResult != null) {
                                       return sendResult;
                                   }
    
                                   throw e;
                           }
                       } catch (InterruptedException e) {
                           endTimestamp = System.currentTimeMillis();
                           this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                           log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                           log.warn(msg.toString());
    
                           log.warn("sendKernelImpl exception", e);
                           log.warn(msg.toString());
                           throw e;
                       }
                   } else {
                       break;
                   }
               }
    
               if (sendResult != null) {
                   return sendResult;
               }
    
               String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
                   times,
                   System.currentTimeMillis() - beginTimestampFirst,
                   msg.getTopic(),
                   Arrays.toString(brokersSent));
    
               info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
    
               MQClientException mqClientException = new MQClientException(info, exception);
               if (callTimeout) {
                   throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
               }
    
               if (exception instanceof MQBrokerException) {
                   mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
               } else if (exception instanceof RemotingConnectException) {
                   mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
               } else if (exception instanceof RemotingTimeoutException) {
                   mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
               } else if (exception instanceof MQClientException) {
                   mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
               }
    
               throw mqClientException;
           }
    
           List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
           if (null == nsList || nsList.isEmpty()) {
               throw new MQClientException(
                   "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
           }
    
           throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
               null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
       }
    

    那么我们来看一下tryToFindTopicPublishInfo这个方法

        private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
            //从本地缓存中读取是否有该topic信息
            TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
            //如果topic信息为空或者不可用则再从nameServer获取topic信息
            if (null == topicPublishInfo || !topicPublishInfo.ok()) {
                this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
                this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
                topicPublishInfo = this.topicPublishInfoTable.get(topic);
            }
    
            if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
                return topicPublishInfo;
            } else {
                //再从nameServer获取一下topic信息,此时获取的topic为默认的topic
                this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
                topicPublishInfo = this.topicPublishInfoTable.get(topic);
                return topicPublishInfo;
            }
        }
    

    updateTopicRouteInfoFromNameServer方法

    public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
            DefaultMQProducer defaultMQProducer) {
            try {
                //从nameServer获取数据的时候锁一下
                if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                    try {
                        TopicRouteData topicRouteData;
                        if (isDefault && defaultMQProducer != null) {
                            //获取topic为AUTO_CREATE_TOPIC_KEY的topic
                            topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                                1000 * 3);
                            if (topicRouteData != null) {
                                //赋值队列的读写队列数量
                                for (QueueData data : topicRouteData.getQueueDatas()) {
                                    int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                                    data.setReadQueueNums(queueNums);
                                    data.setWriteQueueNums(queueNums);
                                }
                            }
                        } else {
                            //获取对应的topic
                            topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                        }
                        if (topicRouteData != null) {
                            TopicRouteData old = this.topicRouteTable.get(topic);
                            //判断一下topic的信息,如读写队列数量,broker信息有没有变
                            boolean changed = topicRouteDataIsChange(old, topicRouteData);
                            if (!changed) {
                                //判断一下是否需要更新topic信息
                                changed = this.isNeedUpdateTopicRouteInfo(topic);
                            } else {
                                log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
                            }
    
                            if (changed) {
                                //赋值一份topic信息
                                TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
                                //重新往brokerAddrTable缓存里面塞入当前topic已经注册过的broker以及对应的地址
                                for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                    this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                                }
    
                                // Update Pub info
                                {
                                    //更新一下写的topic信息
                                    TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                                    publishInfo.setHaveTopicRouterInfo(true);
                                    Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                                    while (it.hasNext()) {
                                        Entry<String, MQProducerInner> entry = it.next();
                                        MQProducerInner impl = entry.getValue();
                                        if (impl != null) {
                                            impl.updateTopicPublishInfo(topic, publishInfo);
                                        }
                                    }
                                }
    
                                // Update sub info
                                {
                                    //更新一下读的topic信息
                                    Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                                    Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
                                    while (it.hasNext()) {
                                        Entry<String, MQConsumerInner> entry = it.next();
                                        MQConsumerInner impl = entry.getValue();
                                        if (impl != null) {
                                            impl.updateTopicSubscribeInfo(topic, subscribeInfo);
                                        }
                                    }
                                }
                                log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
                                this.topicRouteTable.put(topic, cloneTopicRouteData);
                                return true;
                            }
                        } else {
                            log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
                        }
                    } catch (Exception e) {
                        if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
                            log.warn("updateTopicRouteInfoFromNameServer Exception", e);
                        }
                    } finally {
                        this.lockNamesrv.unlock();
                    }
                } else {
                    log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);
                }
            } catch (InterruptedException e) {
                log.warn("updateTopicRouteInfoFromNameServer Exception", e);
            }
    
            return false;
        }
    

    选择队列selectOneMessageQueue

    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
           //是否支持容错,默认是false
           if (this.sendLatencyFaultEnable) {
               try {
                   //获取一下ThreadLocal里定义的index
                   int index = tpInfo.getSendWhichQueue().getAndIncrement();
                   //循环队列长度
                   for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                       int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                       if (pos < 0)
                           pos = 0;
                       //获取相对应的队列
                       MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                       //判断当前的borker是否可用,如果可用的话,并且lastBrokerName为null(当for循环第一次发送时候lastBrokerName为null)
                       if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                           if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                               return mq;
                       }
                   }
                   //走到这一步,说明没有符合条件的broker,那么再取一个broker
                   final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                   //获取当前broker的写队列数量
                   int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                   if (writeQueueNums > 0) {
                       //获取下一个mq队列,然后将对应的brokername还有队列id赋值
                       final MessageQueue mq = tpInfo.selectOneMessageQueue();
                       if (notBestBroker != null) {
                           mq.setBrokerName(notBestBroker);
                           mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                       }
                       return mq;
                   } else {
                       latencyFaultTolerance.remove(notBestBroker);
                   }
               } catch (Exception e) {
                   log.error("Error occurred when selecting message queue", e);
               }
               //重新获取队列
               return tpInfo.selectOneMessageQueue();
           }
    
           return tpInfo.selectOneMessageQueue(lastBrokerName);
       }
    

    发送主逻辑

     private SendResult sendKernelImpl(final Message msg,
                                          final MessageQueue mq,
                                          final CommunicationMode communicationMode,
                                          final SendCallback sendCallback,
                                          final TopicPublishInfo topicPublishInfo,
                                          final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
            long beginStartTime = System.currentTimeMillis();
            //获取broker为master角色的broker地址
            String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
            if (null == brokerAddr) {
                //r如果地址为空,重新获取一下topic信息
                tryToFindTopicPublishInfo(mq.getTopic());
                brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
            }
    
            SendMessageContext context = null;
            if (brokerAddr != null) {
                //是否开启vip通道,如果是的话,端口号减2为vip通道
                brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
    
                byte[] prevBody = msg.getBody();
                try {
                    //for MessageBatch,ID has been set in the generating process
                    if (!(msg instanceof MessageBatch)) {
                        MessageClientIDSetter.setUniqID(msg);
                    }
    
                    int sysFlag = 0;
                    boolean msgBodyCompressed = false;
                    //压缩一下msg信息
                    if (this.tryToCompressMessage(msg)) {
                        sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                        msgBodyCompressed = true;
                    }
                    //看一下是不是需要支持事务
                    final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                        sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
                    }
                    //发送信息校验一下
                    if (hasCheckForbiddenHook()) {
                        CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                        checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
                        checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
                        checkForbiddenContext.setCommunicationMode(communicationMode);
                        checkForbiddenContext.setBrokerAddr(brokerAddr);
                        checkForbiddenContext.setMessage(msg);
                        checkForbiddenContext.setMq(mq);
                        checkForbiddenContext.setUnitMode(this.isUnitMode());
                        this.executeCheckForbiddenHook(checkForbiddenContext);
                    }
                    //发送消息的前置逻辑
                    if (this.hasSendMessageHook()) {
                        context = new SendMessageContext();
                        context.setProducer(this);
                        context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                        context.setCommunicationMode(communicationMode);
                        context.setBornHost(this.defaultMQProducer.getClientIP());
                        context.setBrokerAddr(brokerAddr);
                        context.setMessage(msg);
                        context.setMq(mq);
                        String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                        if (isTrans != null && isTrans.equals("true")) {
                            context.setMsgType(MessageType.Trans_Msg_Half);
                        }
    
                        if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                            context.setMsgType(MessageType.Delay_Msg);
                        }
                        this.executeSendMessageHookBefore(context);
                    }
                    //构建发送消息体
                    SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
                    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                    requestHeader.setTopic(msg.getTopic());
                    requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
                    requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
                    requestHeader.setQueueId(mq.getQueueId());
                    requestHeader.setSysFlag(sysFlag);
                    requestHeader.setBornTimestamp(System.currentTimeMillis());
                    requestHeader.setFlag(msg.getFlag());
                    requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                    requestHeader.setReconsumeTimes(0);
                    requestHeader.setUnitMode(this.isUnitMode());
                    requestHeader.setBatch(msg instanceof MessageBatch);
                    if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                        if (reconsumeTimes != null) {
                            requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                            MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                        }
    
                        String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                        if (maxReconsumeTimes != null) {
                            requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                            MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                        }
                    }
    
                    SendResult sendResult = null;
                    //根据发送模式,进行发送message
                    switch (communicationMode) {
                        case ASYNC:
                            Message tmpMessage = msg;
                            if (msgBodyCompressed) {
                                //If msg body was compressed, msgbody should be reset using prevBody.
                                //Clone new message using commpressed message body and recover origin massage.
                                //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                                tmpMessage = MessageAccessor.cloneMessage(msg);
                                msg.setBody(prevBody);
                            }
                            long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                            if (timeout < costTimeAsync) {
                                throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                            }
                            sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                                brokerAddr,
                                mq.getBrokerName(),
                                tmpMessage,
                                requestHeader,
                                timeout - costTimeAsync,
                                communicationMode,
                                sendCallback,
                                topicPublishInfo,
                                this.mQClientFactory,
                                this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                                context,
                                this);
                            break;
                        case ONEWAY:
                        case SYNC:
                            long costTimeSync = System.currentTimeMillis() - beginStartTime;
                            if (timeout < costTimeSync) {
                                throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                            }
                            sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                                brokerAddr,
                                mq.getBrokerName(),
                                msg,
                                requestHeader,
                                timeout - costTimeSync,
                                communicationMode,
                                context,
                                this);
                            break;
                        default:
                            assert false;
                            break;
                    }
    
                    if (this.hasSendMessageHook()) {
                        context.setSendResult(sendResult);
                        this.executeSendMessageHookAfter(context);
                    }
    
                    return sendResult;
                } catch (RemotingException e) {
                    if (this.hasSendMessageHook()) {
                        context.setException(e);
                        this.executeSendMessageHookAfter(context);
                    }
                    throw e;
                } catch (MQBrokerException e) {
                    if (this.hasSendMessageHook()) {
                        context.setException(e);
                        this.executeSendMessageHookAfter(context);
                    }
                    throw e;
                } catch (InterruptedException e) {
                    if (this.hasSendMessageHook()) {
                        context.setException(e);
                        this.executeSendMessageHookAfter(context);
                    }
                    throw e;
                } finally {
                    msg.setBody(prevBody);
                }
            }
    
            throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
        }
    

    以上就是RocketMQ的producer发送消息的源码分析

    相关文章

      网友评论

        本文标题:RocketMQ之producer发送消息源码分析

        本文链接:https://www.haomeiwen.com/subject/ltefhctx.html