先来看看DefaultMQProducer的结构。
DefaultMQProducer的结构
先分别来说说这些类的基本功能
MQAdmin
来看看这个接口的定义
public interface MQAdmin {
void createTopic(final String key, final String newTopic, final int queueNum)
throws MQClientException;
void createTopic(String key, String newTopic, int queueNum, int topicSysFlag)
throws MQClientException;
long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException;
long maxOffset(final MessageQueue mq) throws MQClientException;
long minOffset(final MessageQueue mq) throws MQClientException;
long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException;
MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
final long end) throws MQClientException, InterruptedException;
MessageExt viewMessage(String topic,
String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
}
上面这个接口,定义了许多方法。
创建topic
查询队列偏移量
通过消息id查询消息信息
根据关键字(key)分页查询topic中消息
MQProducer
void start() throws MQClientException;
void shutdown();
List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;
SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException;
SendResult send(final Message msg, final long timeout) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;
void send(final Message msg, final SendCallback sendCallback) throws MQClientException,
RemotingException, InterruptedException;
void send(final Message msg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException;
void sendOneway(final Message msg) throws MQClientException, RemotingException,
InterruptedException;
SendResult send(final Message msg, final MessageQueue mq) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;
SendResult send(final Message msg, final MessageQueue mq, final long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback)
throws MQClientException, RemotingException, InterruptedException;
void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException;
void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException,
RemotingException, InterruptedException;
SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg,
final long timeout) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException;
void send(final Message msg, final MessageQueueSelector selector, final Object arg,
final SendCallback sendCallback) throws MQClientException, RemotingException,
InterruptedException;
void send(final Message msg, final MessageQueueSelector selector, final Object arg,
final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException,
InterruptedException;
void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg)
throws MQClientException, RemotingException, InterruptedException;
TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;
TransactionSendResult sendMessageInTransaction(final Message msg,
final Object arg) throws MQClientException;
SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException;
SendResult send(final Collection<Message> msgs, final long timeout) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;
SendResult send(final Collection<Message> msgs, final MessageQueue mq) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;
SendResult send(final Collection<Message> msgs, final MessageQueue mq, final long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
void send(final Collection<Message> msgs, final SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException;
void send(final Collection<Message> msgs, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException,
MQBrokerException, InterruptedException;
void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback) throws MQClientException, RemotingException,
MQBrokerException, InterruptedException;
void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback, final long timeout) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;
Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException,
RemotingException, MQBrokerException, InterruptedException;
void request(final Message msg, final RequestCallback requestCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException, MQBrokerException;
Message request(final Message msg, final MessageQueueSelector selector, final Object arg,
final long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException,
InterruptedException;
void request(final Message msg, final MessageQueueSelector selector, final Object arg,
final RequestCallback requestCallback,
final long timeout) throws MQClientException, RemotingException,
InterruptedException, MQBrokerException;
Message request(final Message msg, final MessageQueue mq, final long timeout)
throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException;
void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
从上面看,方法还是很多的,但是主要功能还是发送消息,以及生产者的启动与停止
ClientConfig
更多是一个配置类,代码如下
public class ClientConfig {
注册中心的地址
private String namesrvAddr = NameServerAddressUtils.getNameServerAddresses();
客户端本地的ip地址
private String clientIP = RemotingUtil.getLocalAddress();
客户端的实例名称
private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
/**
* Pulling topic information interval from the named server
*/
private int pollNameServerInterval = 1000 * 30;
/**
* Heartbeat interval in microseconds with message broker
*/
private int heartbeatBrokerInterval = 1000 * 30;
}
这个类主要是起一个配置的作用。
封装了客户端的信息,以及与注册中心,Broker的通信的心跳时长
DefaultMQProducer
DefaultMQProducer实现了MQAdmin以及MQProducer接口。
说明具备了管理topic的功能(创建topic,从topic中查询信息),以及发送消息的功能。
DefaultMQProducer继承了ClientConfig
当然需要管理topic,发送消息,这些必然需要知道一些基本的信息,比如注册中心地址,broker地址。
首先还是直接通过源码入手。
直接看DefaultMQProducer的send方法。代码如下
public class DefaultMQProducer extends ClientConfig implements MQProducer {
protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
@Override
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
Validators.checkMessage(msg, this);
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.send(msg);
}
}
最终定位到实现消息发送的代码如下
public class DefaultMQProducerImpl implements MQProducerInner {
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
根据的消息主题,获取主题的路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
发送失败可以进行重发,所以是循环机制
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
从路由信息中获取合适的队列
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
if (times > 0) {
//Reset topic with namespace during resend.
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
long costTime = beginTimestampPrev - beginTimestampFirst;
if (timeout < costTime) {
callTimeout = true;
break;
}
进行发送
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
若延时故障开启,则会决定是否将broker加入到延时队列,以进行规避
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
} catch (RemotingException e) {
处理远程通信的异常,同时将异常的broker加入到延时故障列表中,继续遍历
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
continue;
} catch (MQClientException e) {
处理远程通信的异常,同时将异常的broker加入到延时故障列表中,继续遍历
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
exception = e;
continue;
} catch (MQBrokerException e) {
处理远程通信的异常,同时将异常的broker加入到延时故障列表中
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
exception = e;
switch (e.getResponseCode()) {
case ResponseCode.TOPIC_NOT_EXIST:
case ResponseCode.SERVICE_NOT_AVAILABLE:
case ResponseCode.SYSTEM_ERROR:
case ResponseCode.NO_PERMISSION:
case ResponseCode.NO_BUYER_ID:
case ResponseCode.NOT_IN_CURRENT_UNIT:
continue;
default:
if (sendResult != null) {
return sendResult;
}
throw e;
}
} catch (InterruptedException e) {
处理通信的异常,同时将异常的broker加入到延时故障列表中
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
throw e;
}
} else {
break;
}
}
if (sendResult != null) {
return sendResult;
}
...省略无关代码
throw mqClientException;
}
找不到topic对应的路由信息就会抛出异常
throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
}
上面的代码实现很长。主要的代码分为几步。
1.通过topic获取对应的路由信息
2.从路由信息中选出合适的队列
3.往队列对应的broker发送消息
4.如果发送失败,可以在有限次数内进行重试发送
如何根据topic获取对应的路由数据
在说如何获取路由之前,要讲一下生产者保存路由的数据结构TopicPublishInfo。
TopicPublishInfo的结构如下
TopicPublishInfo获取topic的方法实现
方法如下
public class DefaultMQProducerImpl implements MQProducerInner {
缓存,用于存放topic以及路由信息
private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
new ConcurrentHashMap<String, TopicPublishInfo>();
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
...省略若干代码
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
...省略若干代码
}
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
优先从缓存中获取
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
若无则通过请求去获取topic的路由信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
如果获取之后,topicPushlishInfo是合法的,则直接返回
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
如果通过原先的topic获取不到,则直接通过默认的topic去获取路由数据
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
}
从上面代码看,最重要的点无非就是如何获取TopicPublishInfo
如何获取TopicPublishInfo
从上面的代码看,总共有2个方法去获取topic的路由信息。
但是最终也是一个方法的实现的。
该方法实现的逻辑也比较简单,无非就是按非默认的topic去获取路由数据,另外一种则是按自定义的topic去获取路由数据。
代码如下
public class MQClientInstance {
记录topic对应的路由数据
private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
获取默认的topic的路由数据
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {
获取自定义的topic的路由数据
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic);
判断路由数据是否发生变化
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
如果没变化,看看是否需要去更新
changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}
如果发生变化
if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
将brokerName对应的集群内容保存起来(brokerId->ip地址)
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
将路由数据,转化成TopicPublishInfo
{
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
设置到每个生产者的topicPublishInfoTable中。这样子每个生产者就记录了topic->的对应关系
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}
省略代码...
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true;
}
}
} catch (MQClientException e) {
省略代码
} catch (RemotingException e) {
省略代码
} finally {
this.lockNamesrv.unlock();
}
} else {
省略代码
}
} catch (InterruptedException e) {
省略代码
}
return false;
}
判断路由数据是否发生变化
private boolean topicRouteDataIsChange(TopicRouteData olddata, TopicRouteData nowdata) {
if (olddata == null || nowdata == null)
return true;
TopicRouteData old = olddata.cloneTopicRouteData();
TopicRouteData now = nowdata.cloneTopicRouteData();
Collections.sort(old.getQueueDatas());
Collections.sort(old.getBrokerDatas());
Collections.sort(now.getQueueDatas());
Collections.sort(now.getBrokerDatas());
return !old.equals(now);
}
private boolean isNeedUpdateTopicRouteInfo(final String topic) {
boolean result = false;
{
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext() && !result) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
//这个方法其实是从DefaultMQProducerImpl的变量topicPublishInfoTable中判断是否topic对应的数据是否已经存在,或者是否有效来决定是不是要更新。
//TopicPublishInfo prev = this.topicPublishInfoTable.get(topic);
//return null == prev || !prev.ok();
如果不存在,或者失效,则进行更新
result = impl.isPublishTopicNeedUpdate(topic);
}
}
}
省略部分代码
return result;
}
将TopicRouteData 转化成TopicPublishInfo
public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
TopicPublishInfo info = new TopicPublishInfo();
info.setTopicRouteData(route);
if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
省略
} else {
List<QueueData> qds = route.getQueueDatas();
QueueData实现了排序接口,按brokerName排序
Collections.sort(qds);
for (QueueData qd : qds) {
将可写的队列过滤出来
if (PermName.isWriteable(qd.getPerm())) {
BrokerData brokerData = null;
for (BrokerData bd : route.getBrokerDatas()) {
从路由数据的brokerData中找到对应的broker
if (bd.getBrokerName().equals(qd.getBrokerName())) {
brokerData = bd;
break;
}
}
如果没找到则跳过
if (null == brokerData) {
continue;
}
如果没有master的相关信息,则跳过
if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
continue;
}
按读写队列的数目,给队列设置id
for (int i = 0; i < qd.getWriteQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
info.getMessageQueueList().add(mq);
}
}
}
info.setOrderTopic(false);
}
return info;
}
}
至此生产者是如何获取topic的过程就结束了。
获取topic对应路由数据的流程图
网友评论