美文网首页
RocketMQ消息发送源码分析

RocketMQ消息发送源码分析

作者: hcq0514 | 来源:发表于2020-12-15 14:15 被阅读0次

rocketMq源码里的发送示例

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("test");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        for (int i = 0; i < 1000; i++) {
            try {
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
    }
}

实例化DefaultMQProducer()

  • 初始化流程图


  • DefaultMQProducer主要属性
//producer组
    private String producerGroup;
//生成topic的key
    private String createTopicKey = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC;
//每个topic生成的队列数
    private volatile int defaultTopicQueueNums = 4;
//发送消息时的超时时间
    private int sendMsgTimeout = 3000;
    /**
     * Compress message body threshold, namely, message body larger than 4k will be compressed on default.(当数据量超过多少时压缩数据)
     */
    private int compressMsgBodyOverHowmuch = 1024 * 4;
    /**
     * Maximum number of retry to perform internally before claiming sending failure in synchronous mode.(同步模式下的最大失败重试次数)
     */
    private int retryTimesWhenSendFailed = 2;
    /**
     * Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. (异步模式下的最大失败重试次数)
     */
    private int retryTimesWhenSendAsyncFailed = 2;
    /**
     * Indicate whether to retry another broker on sending failure internally.(消息重试时选择另外一个Broker时,是否不等待存储结果就返回,默认为false)
     */
    private boolean retryAnotherBrokerWhenNotStoreOK = false;
    /**
     * Maximum allowed message size in bytes.(允许发送的最大消息长度)
     */
    private int maxMessageSize = 1024 * 1024 * 4; // 4M
    /**
     * Interface of asynchronous transfer data
     */
    private TraceDispatcher traceDispatcher = null;
  • 依赖关系


  • MQAdmin接口主要方法
public interface MQAdmin {
//创建主题
    void createTopic(final String key, final String newTopic, final int queueNum)
        throws MQClientException;  
//根据时间戳从队列中查找消息偏移量
    long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException;
//查找消息队列中最大的偏移量
    long maxOffset(final MessageQueue mq) throws MQClientException;
//查找消息队列中最小的偏移量
    long minOffset(final MessageQueue mq) throws MQClientException;
//根据偏移量查找消息
    MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException,
        InterruptedException, MQClientException;
//根据条件查找消息
    QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
        final long end) throws MQClientException, InterruptedException;
//根据消息ID和主题查找消息
    MessageExt viewMessage(String topic,
        String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
  • MQProducer接口主要方法
public interface MQProducer extends MQAdmin {
//启动 
    void start() throws MQClientException;
//关闭
    void shutdown();
//查找该主题下所有消息
    List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;
//同步发送消息
    SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,
        InterruptedException;
//同步超时发送消息
    SendResult send(final Message msg, final long timeout) throws MQClientException,
        RemotingException, MQBrokerException, InterruptedException;
//异步发送消息
    void send(final Message msg, final SendCallback sendCallback) throws MQClientException,
        RemotingException, InterruptedException;
//异步超时发送消息
    void send(final Message msg, final SendCallback sendCallback, final long timeout)
        throws MQClientException, RemotingException, InterruptedException;
//发送单向消息
    void sendOneway(final Message msg) throws MQClientException, RemotingException,
        InterruptedException;
//选择指定队列同步发送消息
    SendResult send(final Message msg, final MessageQueue mq) throws MQClientException,
        RemotingException, MQBrokerException, InterruptedException
//发送事务消息
    TransactionSendResult sendMessageInTransaction(final Message msg,
        final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;

    TransactionSendResult sendMessageInTransaction(final Message msg,
        final Object arg) throws MQClientException;
//批量发送消息
    SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,
        InterruptedException;
...
}
  • new DefaultMQProducer(String producerGroup)
初始化后要设置namespace的地址
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
然后来到org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean)方法
主要代码为
    public void start(final boolean startFactory) throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
//检查参数是否合法(主要是producerGroup)
                this.checkConfig();
//如果没指定producer的名称的话,将producer实例名称改为线程名称
                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    this.defaultMQProducer.changeInstanceNameToPID();
                }
//(下面有代码详解)MQClientManager.getInstance()来获取MQClientManager,这是一个工厂类,用它来pro生成实例,生成的实例MQClientInstance封装了RocketMQ网络处理API,是消息生产者和消息消费者与NameServer、Broker打交道的网络通道
//该类中有一个ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable =new ConcurrentHashMap<String, MQClientInstance>()专门用来保存client
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
//将生成的实例记录到MQClientInstance.producerTable这张concurrentHashMap里
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
//维护topicPublishInfoTable表
                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
//(下面有代码详解)
                if (startFactory) {
                    mQClientFactory.start();
                }

       ...

        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

        this.timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                try {
                    RequestFutureTable.scanExpiredRequest();
                } catch (Throwable e) {
                    log.error("scan RequestFutureTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }

getOrCreateMQClientInstance代码

    public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
        String clientId = clientConfig.buildMQClientId();
//从factoryTable表里面去查找 这张表维护了所有实例
        MQClientInstance instance = this.factoryTable.get(clientId);
        if (null == instance) {
//如果没有该instance的类则创建一个
            instance =
                new MQClientInstance(clientConfig.cloneClientConfig(),
                    this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
            MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
...
        }
        return instance;
    }

mQClientFactory.start()代码

    public void start() throws MQClientException {
        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel(开启请求回调通道)
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks (开启多个定时任务)
                    this.startScheduledTask();
                    // Start pull service
                    this.pullMessageService.start();
                    // Start rebalance service
                    this.rebalanceService.start();
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }

消息开始发送DefaultMQProducer#send(Message)

消息发送主要流程有


image.png

第一步是消息校验,
第二步是查找路由
第三步是选择队列
第三步是调用netty发送消息

  • 消息校验
    public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
        throws MQClientException {
//判断msg是否为空
        if (null == msg) {
            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
        }
// 检验主题是否为空,是否超长等
        Validators.checkTopic(msg.getTopic());
//检验该主题是否允许推送
        Validators.isNotAllowedSendTopic(msg.getTopic());
// 检测body是否为空
        if (null == msg.getBody()) {
            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
        }
        if (0 == msg.getBody().length) {
            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
        }
//检测body长度是否超长
        if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
                "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
        }
    }
  • 查找路由
    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
//查看当前topicPublishInfoTable表里面是否已经存在该topic的信息
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
//路由为空,从namespace里面取
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }

        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
//如果未找到当前主题的路由信息,则用默认主题继续查找
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }

//TopicPublishInfo 主要属性
public class TopicPublishInfo {
//是否为顺序消息
    private boolean orderTopic = false;
    private boolean haveTopicRouterInfo = false;
//该主题的消息队列
    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
//每选择一次值+1,用于负载均衡使用
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
//关联Topic的路由元信息(从namespace那边获取的数据)
    private TopicRouteData topicRouteData;
}

updateTopicRouteInfoFromNameServer主要是调用这个方法像namespace获取路由信息,并解码成一个TopicRouteData对象

    public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
        boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
        GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
        requestHeader.setTopic(topic);

        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);
//调用namespace
        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.TOPIC_NOT_EXIST: {
                if (allowTopicNotExist) {
                    log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
                }

                break;
            }
            case ResponseCode.SUCCESS: {
                byte[] body = response.getBody();
                if (body != null) {
                    return TopicRouteData.decode(body, TopicRouteData.class);
                }
            }
            default:
                break;
        }

        throw new MQClientException(response.getCode(), response.getRemark());
    }

判断本地路由表跟最新取到的路由表是否一致,不一致的话更改本地的

                    if (topicRouteData != null) {
//获取旧的路由表
                        TopicRouteData old = this.topicRouteTable.get(topic);
//路由表的元信息
                        boolean changed = topicRouteDataIsChange(old, topicRouteData);
//判断是否改变
                        if (!changed) {
                            changed = this.isNeedUpdateTopicRouteInfo(topic);
                        } else {
                            log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
                        }
                        if (changed) {
//生成一个TopicRouteData的克隆对象
                            TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
//更新brokerAddrTable表为新的地址
                            for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                            }
                            // Update Pub info
                            {
//把topic跟topicRouteData包装成一个TopicPublishInfo
                                TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                                publishInfo.setHaveTopicRouterInfo(true);
//往每个producer的topicPublishInfoTable里修改新topic地址
                                Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                                while (it.hasNext()) {
                                    Entry<String, MQProducerInner> entry = it.next();
                                    MQProducerInner impl = entry.getValue();
                                    if (impl != null) {
                                        impl.updateTopicPublishInfo(topic, publishInfo);
                                    }
                                }
                            }
//用于通知消费者的 (后面研究)
                            {
                                Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                                Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
                                while (it.hasNext()) {
                                    Entry<String, MQConsumerInner> entry = it.next();
                                    MQConsumerInner impl = entry.getValue();
                                    if (impl != null) {
                                        impl.updateTopicSubscribeInfo(topic, subscribeInfo);
                                    }
                                }
                            }
                            log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
                            this.topicRouteTable.put(topic, cloneTopicRouteData);
                            return true;
                        }
                    } 
  • 3、选择队列发送(this.selectOneMessageQueue(topicPublishInfo, lastBrokerName))
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
// 待研究Broker故障延迟机制(根据队列的请求失败次数来选择队列),默认不启用
        if (this.sendLatencyFaultEnable) {
            try {
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }

                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }

            return tpInfo.selectOneMessageQueue();
        }
//不启用故障延迟机制,直接进到该方法
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }

//负载均衡获取队列
    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            int index = this.sendWhichQueue.getAndIncrement();
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int pos = Math.abs(index++) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            return selectOneMessageQueue();
        }
    }
  • 发送消息
   private SendResult sendKernelImpl(final Message msg,//需要发送的队列
        final MessageQueue mq, //发送进的消息队列
        final CommunicationMode communicationMode,//消息发送内模式
        final SendCallback sendCallback,//异步消息回调函数
        final TopicPublishInfo topicPublishInfo,//主题路由信息
        final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//获取broker地址
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        if (null == brokerAddr) {
//找不到的话从namespace里面重新获取
            tryToFindTopicPublishInfo(mq.getTopic());
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }

//设置
        if (brokerAddr != null) {
            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

            byte[] prevBody = msg.getBody();
            try {
//设置专门的uniqID
                //for MessageBatch,ID has been set in the generating process
                if (!(msg instanceof MessageBatch)) {
                    MessageClientIDSetter.setUniqID(msg);
                }
//消息是否需要压缩(默认1024 * 4M)
                boolean msgBodyCompressed = false;
                if (this.tryToCompressMessage(msg)) {
                    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                    msgBodyCompressed = true;
                }
//如果是事务消息,设置消息标记MessageSysFlag.TRANSACTION_PREPARED_TYPE
                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
                }
...还有钩子函数等,省略 ,下面组装request请求
                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
//设置producer组
                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
//设
                requestHeader.setTopic(msg.getTopic());
                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
//设置队列id
                requestHeader.setQueueId(mq.getQueueId());
//设置消息标识
                requestHeader.setSysFlag(sysFlag);
                requestHeader.setBornTimestamp(System.currentTimeMillis());
                requestHeader.setFlag(msg.getFlag());
                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                requestHeader.setReconsumeTimes(0);
                requestHeader.setUnitMode(this.isUnitMode());
                requestHeader.setBatch(msg instanceof MessageBatch);
                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                    if (reconsumeTimes != null) {
                        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                    }

                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                    if (maxReconsumeTimes != null) {
                        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                    }
                }
}
//最终是调用该方法发送数据
    private SendResult sendMessageSync(
        final String addr,
        final String brokerName,
        final Message msg,
        final long timeoutMillis,
        final RemotingCommand request
    ) throws RemotingException, MQBrokerException, InterruptedException {
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        assert response != null;
        return this.processSendResponse(brokerName, msg, response);
    }

//如果注册了钩子函数,则最终调用该函数
                if (this.hasSendMessageHook()) {
                    context.setSendResult(sendResult);
                    this.executeSendMessageHookAfter(context);
                }

相关文章

网友评论

      本文标题:RocketMQ消息发送源码分析

      本文链接:https://www.haomeiwen.com/subject/yjwigktx.html