美文网首页
RocketMQ源码解析(三)-Producer

RocketMQ源码解析(三)-Producer

作者: 空挡 | 来源:发表于2018-12-30 19:25 被阅读0次

    发送方式

    producer发送消息支持3种方式,同步、异步和Oneway。

    • 同步发送:客户端提交消息到broker后会等待返回结果,相对来说是最常用的方式。
    • 异步发送:调用发送接口时会注册一个callback类,发送线程继续其它业务逻辑,producer在收到broker结果后回调。比较适合不想发送结果影响正常业务逻辑的情况。
    • Oneway:Producer提交消息后,无论broker是否正常接收消息都不关心。适合于追求高吞吐、能容忍消息丢失的场景,比如日志收集。

    发送实例

    public class SyncProducer {
        public static void main(String[] args) throws Exception {
            //Instantiate with a producer group name.
            DefaultMQProducer producer = new
                DefaultMQProducer("please_rename_unique_group_name");
            // Specify name server addresses.
            producer.setNamesrvAddr("localhost:9876");
            //Launch the instance.
            producer.start();
            for (int i = 0; i < 100; i++) {
                //Create a message instance, specifying topic, tag and message body.
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " +
                        i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                //Call send message to deliver message to one of brokers.
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            }
            //Shut down once the producer instance is not longer in use.
            producer.shutdown();
        }
    }
    

    发送消息首先需要初始化一个DefaultMQProducer,设置group name和nameserv的地址。Producer启动后就可以往指定的topic发送消息。

    MQProducer初始化

    Producer的调用关系是
    MQProducer -> DefaultMQProducer ->DefaultMQProducerImpl
    DefaultMQProducer是一个Facade类,封装了DefaultMQProducerImpl内部实现。我们来看下Producer的启动过程,DefaultMQProducerImpl.start()

    public void start(final boolean startFactory) throws MQClientException {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    //参数检查,不能使用系统默认的GroupName
                    this.checkConfig();
                    //设置clientInstanceName,使用进程ID (PID)
                    if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                        this.defaultMQProducer.changeInstanceNameToPID();
                    }
                   // 初始化MQClientInstance,一个进程只会存在一个MQClientInstance, 设置clientId (IP@PID)
                    this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
                   //将当前Producer注册进MQClientInsance,保证一个producerName值对应一个Producer
                    boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                    if (!registerOK) {
                        this.serviceState = ServiceState.CREATE_JUST;
                        throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                            null);
                    }
    
                    this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
                    //启动MQClientInstance
                    if (startFactory) {
                        mQClientFactory.start();
                    }
    
                    log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                        this.defaultMQProducer.isSendMessageWithVIPChannel());
                    //设置状态为RUNNING
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                case START_FAILED:
                case SHUTDOWN_ALREADY:
                    throw new MQClientException("The producer service state not OK, maybe started once, "
                        + this.serviceState
                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                        null);
                default:
                    break;
            }
           //向所有broker发送一次心跳 
           this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        }
    

    从上面的代码可以看出,start的过程主要就是初始化和启动一个MQClientInstance,将producer注册到instance中。我们来看下MQClientInstance的启动过程。
    MQClientInstance启动过程

    public void start() throws MQClientException {
    
            synchronized (this) {
                switch (this.serviceState) {
                    case CREATE_JUST:
                        this.serviceState = ServiceState.START_FAILED;
                        // 1、如果NameservAddr为空,尝试从http server获取nameserv的地址
                        if (null == this.clientConfig.getNamesrvAddr()) {
                            this.mQClientAPIImpl.fetchNameServerAddr();
                        }
                        // Start request-response channel
                        // 2、启动MQClientAPIImpl,初始化NettyClient
                        this.mQClientAPIImpl.start();
                        // 3、开启Client的定时任务
                        this.startScheduledTask();
                        // 4、Start pull service,开始处理PullRequest
                        this.pullMessageService.start();
                        // 5、Start rebalance service
                        this.rebalanceService.start();
                        // Start push service
                        //6、启动Client内置的producer
                        this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                        log.info("the client factory [{}] start OK", this.clientId);
                        this.serviceState = ServiceState.RUNNING;
                        break;
                    case RUNNING:
                        break;
                    case SHUTDOWN_ALREADY:
                        break;
                    case START_FAILED:
                        throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                    default:
                        break;
                }
            }
        }
    

    1、如果producer在初始化的时候没有设置nameserv的地址,则会尝试从一个http server获取nameserv。这个httpserver是可以配置的,这种方式非常适合于有统一配置中心的系统
    3、这里开启的定时任务有以下几个:
    1)获取nameserv地址,就是重复的做第1步,这样就可以动态切换nameserv的地址
    2)从nameserv更新topicRouteInfo,对于producer来说topic的路由信息是最重要的
    3)将缓存的broker信息和最新的topicRouteInfo做对比,清除已经下线的broker
    4)向broker发送心跳
    4 ~ 6,producer和consumer公用一个MQClientInstance的实现。这几步初始化是给consumer用的,后面讲consumer的时候再讲。
    Producer启动完成以后,就可以发送消息了,下面我们来看下一条普通的message的发送过程

    消息发送

    Producer默认采用SYNC方式提交消息,消息提交给broker收到response后返回。方法是DefaultMQProducerImpl.send( Message msg)

    /**
         * DEFAULT SYNC -------------------------------------------------------
         */
        public SendResult send(
            Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
            return send(msg, this.defaultMQProducer.getSendMsgTimeout());
        }
        public SendResult send(Message msg,
            long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
            return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
        }
    
        private SendResult sendDefaultImpl(
            Message msg,
            final CommunicationMode communicationMode,
            final SendCallback sendCallback,
            final long timeout
        ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
            //1、参数检查,消息不能发给系统预留的topic,消息体是否超过最大长度
            this.makeSureStateOK();
            Validators.checkMessage(msg, this.defaultMQProducer);
    
            final long invokeID = random.nextLong();
            long beginTimestampFirst = System.currentTimeMillis();
            long beginTimestampPrev = beginTimestampFirst;
            long endTimestamp = beginTimestampFirst;
            //2、根据消息的topic,获取该topic的路由信息
            TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
            if (topicPublishInfo != null && topicPublishInfo.ok()) {
                boolean callTimeout = false;
                ....
                //3、发送重试次数
                int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
                int times = 0; 
                //用来缓存发送和重试中已经用过的broker
                String[] brokersSent = new String[timesTotal];
                for (; times < timesTotal; times++) {
                    String lastBrokerName = null == mq ? null : mq.getBrokerName();
                    //4、从所有topic可用queue中选择一个queue
                    MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                    if (mqSelected != null) {//获取Queue成功
                        mq = mqSelected;
                        brokersSent[times] = mq.getBrokerName();
                        try {
                            ...
                            //5、提交消息到mq
                            sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                            endTimestamp = System.currentTimeMillis();
                            //6、成功,更新本次调用时间到MQFaultStrategy中
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                            switch (communicationMode) {
                                //异步和ONEWAY调用后就直接返回了
                                case ASYNC:
                                    return null;
                                case ONEWAY:
                                    return null;
                                //7、如果broker存储失败,判断是否要重试
                                case SYNC:
                                    if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                        if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                            continue;
                                        }
                                    }
    
                                    return sendResult;
                                default:
                                    break;
                            }
                        // 8、调用接口异常,更新状态到MQFaultStrategy中
                        } catch (RemotingException e) {
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                            ...
                            exception = e;
                            continue;
                        } catch (MQClientException e) {
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                            ...
                            exception = e;
                            continue;
                        } catch (MQBrokerException e) {
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                            ...
                            exception = e;
                            switch (e.getResponseCode()) {
                                case ResponseCode.TOPIC_NOT_EXIST:
                                case ResponseCode.SERVICE_NOT_AVAILABLE:
                                case ResponseCode.SYSTEM_ERROR:
                                case ResponseCode.NO_PERMISSION:
                                case ResponseCode.NO_BUYER_ID:
                                case ResponseCode.NOT_IN_CURRENT_UNIT:
                                    continue;
                                default:
                                    if (sendResult != null) {
                                        return sendResult;
                                    }
    
                                    throw e;
                            }
                        } catch (InterruptedException e) {
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                            ...
                            throw e;
                        }
                    } else {
                        break;
                    }
                }
                //9、成功则返回结果
                if (sendResult != null) {
                    return sendResult;
                }
    
                ...
                MQClientException mqClientException = new MQClientException(info, exception);
                if (callTimeout) {
                    throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
                }
                //超过重试次数后,根据不同的错误设置抛出异常类型
                if (exception instanceof MQBrokerException) {
                    mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
                } else if (exception instanceof RemotingConnectException) {
                    mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
                } else if (exception instanceof RemotingTimeoutException) {
                    mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
                } else if (exception instanceof MQClientException) {
                    mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
                }
    
                throw mqClientException;
            }
            ...
        }
    

    从上面的发送逻辑可以看出,无论哪种发送方式,最终都是调用的sendDefaultImpl来提交消息。
    第2步:获取topic的所有路由信息,详细逻辑后面讲
    第3步:SYNC发送可以设置失败重试次数
    第4步:因为每个topic会在集群的多个broker上存在多个queue,所以这里会选择一个合适的queue,也就是在producer端实现负载均衡的功能,详细逻辑后面讲
    第6和8步:无论提交消息成功或者失败,都会更新结果到MQFaultStrategy中,也就是第4中选取queue时采用的策略
    第7步:对于消息提交成功,不止有SUCCESS一种状态,还有别的情况下也会认为成功的,比如broker接收和处理消息成功了,但是写给slave失败了,或者数据落盘失败了等。针对于存储失败的情况,客户端可以选择是否要重新发送。
    以上就是消息发送的整个流程,下面分解下每一步的实现

    获取topic路由

    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
            TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
            if (null == topicPublishInfo || !topicPublishInfo.ok()) {
                this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
                //如果TopicPushlishInfo不存在,则会尝试从Nameserv更新topic路由信息
                this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
                topicPublishInfo = this.topicPublishInfoTable.get(topic);
            }
    
            if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
                return topicPublishInfo;
            } else {
                this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
                topicPublishInfo = this.topicPublishInfoTable.get(topic);
                return topicPublishInfo;
            }
     }
    

    根据topic直接从内存的缓存中获取路由信息,缓存的更新在前面的定时任务已经讲过。
    如果TopicPushlishInfo不存在,则会尝试从Nameserv更新信息。更新策略是:
    1)按topicName去nameserv找指定topic的route信息;
    2)如果第一步没获取到则尝试获取默认创建topic(TBW102)的route信息,前提是broker支持默认创建。
    最终,如果没有获取到topic的route信息,则报错中止消息发送

    Queue选取策略

    选择Queue
    Queue的选取是发送端实现负责均衡的核心,根据client是否开启了延时容错,实现轮询和加可用性轮询的选取策略。

        public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
            //直接调用MQFaultStrategy的方法
            return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
        }
         //MQFaultStrategy的方法实现
        public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
            //开启了延时容错
            if (this.sendLatencyFaultEnable) {
                try {
                   //1、首先获取上次使用的Queue index+1
                    int index = tpInfo.getSendWhichQueue().getAndIncrement();
                    for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                        int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                        if (pos < 0)
                            pos = 0;
                        //2、找到index对应的queue
                        MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                        //3、如果queue对应的broker可用,则使用该broker
                        if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                            if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                                return mq;
                        }
                    }
                    //4、如果上一步没找个合适的broker,则从所有的broker中选择一个相对合适的,并且broker是可写的。
                    final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                    int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                    if (writeQueueNums > 0) {
                        final MessageQueue mq = tpInfo.selectOneMessageQueue();
                        if (notBestBroker != null) {
                            mq.setBrokerName(notBestBroker);
                            mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                        }
                        return mq;
                    } else {
                        latencyFaultTolerance.remove(notBestBroker);
                    }
                } catch (Exception e) {
                    log.error("Error occurred when selecting message queue", e);
                }
               //5、如果以上都没找到,则直接按顺序选择下一个
                return tpInfo.selectOneMessageQueue();
            }
            //6、未开启延时容错,直接按顺序选下一个
            return tpInfo.selectOneMessageQueue(lastBrokerName);
        }
    

    Producer为每个topic缓存了一个全局index,每次发送之后+1,然后从所有queue列表中选择index位置上的queue,这样就实现了轮询的效果。
    如果开启了延时容错,则会考虑broker的可用性:
    第1) 2)步:根据全局index找到queue
    第3)步:如果根据延时容错判断queue所在的broker当前可用,并且是第一次发送,或者是重试并且和上次用的broker是同一个,则使用这个queue。这里面有两个逻辑,一个是broker的可用性是如何判断的,这个我们下面说;第二个是为什么重试的时候要选上次的broker,下面说下我的理解。

    由前面的发送逻辑中的第6和8步知道,有两种情况会重试,一种是broker返回处理成功但是store失败,一种是broker返回失败。
    对于返回失败的情况,其实会直接更新broker为短时不可用状态,这个在第一个if条件就已经通不过了;而对于store失败的情况,说明broker当前是正常的,重发还是发给同一个broker有利于防止消息重复。

    第4)步:如果将所有queue按照第3)步的情况过一遍,发现都不符合条件,则从所有broker中选择一个相对好的。
    第5)步:如果第4不中的broker不支持写入,则跟未开启延时容错一样的逻辑,直接选下一个queue
    Broker延时控制逻辑
    由上面的queue的选择策略可以知道,queue的选择除了轮询以外,就是根据Broker的可用性。回看下消息发送的第6步和第8步,在消息发送后会更新时间和发送状态到MQFaultStrategy中,代码如下:

        public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
            if (this.sendLatencyFaultEnable) {
                //1、根据发送结果,计算broker不可用时长
                long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
                //2、更新Broker不可用时长
                this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
            }
        }
    

    第1步:根据上次消息发送时长和结果,计算Broker应该多长时间不可用,如果上次发送失败的话,发送时长按30秒计算。
    MQFaultStrategy维护了一个broker延时列表,如下:

    private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
    private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
    

    以上两个列表是一一对应的,当发送时长低于100ms时,设置broker不可用时长为0,之后依次增加,如果超过15秒,则有10分钟不可用。可以看到如果上次发送失败的话,也是10分钟不可用,如果重试肯定不会选择相同的broker。

    消息提交sendKernelImpl()

    Producer发送消息最终是调用sendKernelImpl()完成提交的,代码如下:

    private SendResult sendKernelImpl(final Message msg,
                                          final MessageQueue mq,
                                          final CommunicationMode communicationMode,
                                          final SendCallback sendCallback,
                                          final TopicPublishInfo topicPublishInfo,
                                          final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
            long beginStartTime = System.currentTimeMillis();
            //根据brokerName从缓存中获取broker的地址
            String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
            //Double check,如果地址为空,则从nameserv中再获取一次
            if (null == brokerAddr) {
                tryToFindTopicPublishInfo(mq.getTopic());
                brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
            }
    
            SendMessageContext context = null;
            if (brokerAddr != null) {
               //切换到VIP channel
               //Broker启动时会开启2个端口接收客户端数据,其中一个端口只接收producer的消息,
               //不接受consumer的拉取请求,被称为VIP channel
                brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
    
                byte[] prevBody = msg.getBody();
                try {
                    //for MessageBatch,ID has been set in the generating process
                    //客户端设置的id
                    if (!(msg instanceof MessageBatch)) {
                        MessageClientIDSetter.setUniqID(msg);
                    }
                    //如果消息body过长,则压缩并设置标记位
                    int sysFlag = 0;
                    boolean msgBodyCompressed = false;
                    if (this.tryToCompressMessage(msg)) {
                        sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                        msgBodyCompressed = true;
                    }
    
                    final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                        sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
                    }
                   //回调Forbidden Hook
                    if (hasCheckForbiddenHook()) {
                        ...
                        ...
                    }
                    // 回调SendMessage Hook
                    if (this.hasSendMessageHook()) {
                        ...
                        ...
                    }
                    //设置消息头
                    SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
                    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                    requestHeader.setTopic(msg.getTopic());
                    requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
                    requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
                    requestHeader.setQueueId(mq.getQueueId());
                    requestHeader.setSysFlag(sysFlag);
                    requestHeader.setBornTimestamp(System.currentTimeMillis());
                    requestHeader.setFlag(msg.getFlag());
                    requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                    requestHeader.setReconsumeTimes(0);
                    requestHeader.setUnitMode(this.isUnitMode());
                    requestHeader.setBatch(msg instanceof MessageBatch);
                    //要求重新发送的消息,设置重试次数和延时时间
                    //仅Consumer用
                    if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                        if (reconsumeTimes != null) {
                            requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                            MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                        }
    
                        String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                        if (maxReconsumeTimes != null) {
                            requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                            MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                        }
                    }
                    //通过NettyClient发送消息到Broker
                    SendResult sendResult = null;
                    switch (communicationMode) {
                        case ASYNC:
                            Message tmpMessage = msg;
                            if (msgBodyCompressed) {
                                //If msg body was compressed, msgbody should be reset using prevBody.
                                //Clone new message using commpressed message body and recover origin massage.
                                //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                                tmpMessage = MessageAccessor.cloneMessage(msg);
                                msg.setBody(prevBody);
                            }
                            long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                            if (timeout < costTimeAsync) {
                                throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                            }
                            sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                                brokerAddr,
                                mq.getBrokerName(),
                                tmpMessage,
                                requestHeader,
                                timeout - costTimeAsync,
                                communicationMode,
                                sendCallback,
                                topicPublishInfo,
                                this.mQClientFactory,
                                this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                                context,
                                this);
                            break;
                        case ONEWAY:
                        case SYNC:
                            long costTimeSync = System.currentTimeMillis() - beginStartTime;
                            if (timeout < costTimeSync) {
                                throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                            }
                            sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                                brokerAddr,
                                mq.getBrokerName(),
                                msg,
                                requestHeader,
                                timeout - costTimeSync,
                                communicationMode,
                                context,
                                this);
                            break;
                        default:
                            assert false;
                            break;
                    }
                    //回调Send message Hook
                    if (this.hasSendMessageHook()) {
                        context.setSendResult(sendResult);
                        this.executeSendMessageHookAfter(context);
                    }
    
                    return sendResult;
                } catch (RemotingException e) {
                    ...
                } catch (MQBrokerException e) {
                    ...
                } catch (InterruptedException e) {
                   ...
                } finally {
                    msg.setBody(prevBody);
                }
            }
           //Broker地址获取失败,抛出异常中止发送
            throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
        }
    

    最后一步,通过前面选择的queue和broker获取broker 地址,封装消息包并发送到broker,客户端支持单条消息发送,也支持多条消息封装到一个包中发送。Client会和broker保持长连接,提高发送速度。

    相关文章

      网友评论

          本文标题:RocketMQ源码解析(三)-Producer

          本文链接:https://www.haomeiwen.com/subject/irnqlqtx.html