美文网首页
(六)生产者是如何发送消息的

(六)生产者是如何发送消息的

作者: guessguess | 来源:发表于2021-06-23 17:35 被阅读0次

前面已经讲到了生产者在发送消息时,如何去选择队列,以及重试。但是并没有仔细讲发送消息的内部实现细节。
所以还是来看看发送消息的细节吧

生产者发送消息

具体代码如下

public class DefaultMQProducerImpl implements MQProducerInner {
private SendResult sendKernelImpl(final Message msg,
        final MessageQueue mq,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        首先还是从MQClient中获取broker的ip地址
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        如果为空,则尝试通过namesrv拉取路由数据
        if (null == brokerAddr) {
            tryToFindTopicPublishInfo(mq.getTopic());
            在向namesrv拉取路由数据后,数据会被保存在成员变量中,因此直接再获取即可
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }

        SendMessageContext context = null;
        如果路由地址不为空
        if (brokerAddr != null) {
            如果走vip通道的话,有专门的broker提供服务,这里我没弄明白,反正就是如果是vip就会把地址给替换,不过不影响大致流程
            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

            byte[] prevBody = msg.getBody();
            try {
                如果是批量消息,则给msg设置唯一id
                if (!(msg instanceof MessageBatch)) {
                    MessageClientIDSetter.setUniqID(msg);
                }
                
                给消息设置instanceid, 这样子知道从哪里发送来的。instanceid为host与pid结合的方式
                boolean topicWithNamespace = false;
                if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
                    msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
                    topicWithNamespace = true;
                }
                
                根据消息大小,决定是否进行压缩,消息内容超过4kb则会进行压缩
                同时变更系统标记
                int sysFlag = 0;
                boolean msgBodyCompressed = false;
                if (this.tryToCompressMessage(msg)) {
                    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                    msgBodyCompressed = true;
                }
                没看懂~
                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
                }
                省略部分代码
                设置请求相关信息
                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
                设置生产者的组
                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                设置topic
                requestHeader.setTopic(msg.getTopic());
                设置默认topic
                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
                设置默认的队列数
             requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
                设置队列id
                requestHeader.setQueueId(mq.getQueueId());
                设置系统标记
                requestHeader.setSysFlag(sysFlag);
                设置发送时间
                requestHeader.setBornTimestamp(System.currentTimeMillis());
                设置系统标记
                requestHeader.setFlag(msg.getFlag());
                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                设置重复消费次数
                requestHeader.setReconsumeTimes(0);
                requestHeader.setUnitMode(this.isUnitMode());
                设置是否批量消息的标记
                requestHeader.setBatch(msg instanceof MessageBatch);
                设置重复消费的次数
                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                    if (reconsumeTimes != null) {
                        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                    }

                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                    if (maxReconsumeTimes != null) {
                        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                    }
                }

                SendResult sendResult = null;
                通信方式,同步/异步/单边
                switch (communicationMode) {
                    异步
                    case ASYNC:
                        Message tmpMessage = msg;
                        boolean messageCloned = false;
                        异步不支持压缩消息
                        if (msgBodyCompressed) {
                            tmpMessage = MessageAccessor.cloneMessage(msg);
                            messageCloned = true;
                            msg.setBody(prevBody);
                        }

                        if (topicWithNamespace) {
                            if (!messageCloned) {
                                tmpMessage = MessageAccessor.cloneMessage(msg);
                                messageCloned = true;
                            }
                            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
                        }
                        超时则抛异常
                        long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeAsync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        支持回调
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            tmpMessage,
                            requestHeader,
                            timeout - costTimeAsync,
                            communicationMode,
                            sendCallback,
                            topicPublishInfo,
                            this.mQClientFactory,
                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                            context,
                            this);
                        break;
                    单边发送,没有break,所以执行操作是与同步一致的
                    case ONEWAY:
                    同步发送
                    case SYNC:
                        long costTimeSync = System.currentTimeMillis() - beginStartTime;
                        超时,抛出异常
                        if (timeout < costTimeSync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        发送消息,并且获取响应
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            msg,
                            requestHeader,
                            timeout - costTimeSync,
                            communicationMode,
                            context,
                            this);
                        break;
                    default:
                        assert false;
                        break;
                }
                省略代码,返回响应
                return sendResult;
            } 
            省略异常处理的代码
        }
        如果没有路由数据,会直接抛出异常
        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }
}

从上面几种模式来看,发送消息的模式有3种。
同步:
支持消息压缩,等待消息发送的响应
单边发送:
支持消息压缩,不支持回调,调完就结束
异步:
不支持消息压缩,支持回调

顺便看看实际发送的细节,是不是与实际相符

public class MQClientAPIImpl {
    public SendResult sendMessage(
        final String addr,
        final String brokerName,
        final Message msg,
        final SendMessageRequestHeader requestHeader,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final MQClientInstance instance,
        final int retryTimesWhenSendFailed,
        final SendMessageContext context,
        final DefaultMQProducerImpl producer
    ) throws RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        RemotingCommand request = null;
        String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
        boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
        if (isReply) {
            if (sendSmartMsg) {
                SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
                request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
            } else {
                request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
            }
        } else {
            if (sendSmartMsg || msg instanceof MessageBatch) {
                SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
                request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
            } else {
                request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
            }
        }
        request.setBody(msg.getBody());

        switch (communicationMode) {
            case ONEWAY:
                直接返回null,单边发送
                this.remotingClient.invokeOneway(addr, request, timeoutMillis);
                return null;
            case ASYNC:
                final AtomicInteger times = new AtomicInteger();
                long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTimeAsync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                直接返回null,异步操作,但是添加了回调
                this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
                    retryTimesWhenSendFailed, times, context, producer);
                return null;
            case SYNC:
                同步操作等待响应
                long costTimeSync = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTimeSync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
            default:
                assert false;
                break;
        }

        return null;
    }

}

相关文章

网友评论

      本文标题:(六)生产者是如何发送消息的

      本文链接:https://www.haomeiwen.com/subject/lnmbyltx.html