RocketMQ 发送普通消息有三种实现方式:可靠同步发送、可靠异步发送、单向
(Oneway)发送。
同步:发送者向MQ发送消息后需要同步等待消息消费器返回发送结果。
异步:发送者向MQ发送消息后会立即返回,可以指定发送成功的回调函数,消息发送者线程不会阻塞,直到运行结束,消息发送成功或失败的回调会在一个新的线程执行。
单向:消息发送者向MQ执行发送消息时,会直接返回,不等待消息服务器结果,也不用注册回调函数,适用于不在乎消息是否发送成功的情况。
在研究RocketMQ的消息发送如何实现我们需要清楚发送的消息包含那些内容,可以去查看RocketMQ的消息包装类。org.apache.rocketmq . common.message.Message :
Message 的基础属性主要包括消息所属主题topic , 消息Flag(RocketMQ 不做处理)、
RocketMQ 定义的MessageFlag如下所示:
public final static int COMPRESSED_FLAG = Ox1 // 压缩
-public final static int MULTl_TAGS_FLAG = Ox1 < 1
” public final static int TRANSACTION_PREPARED_TYPE= Ox1 < 2 // 事务未提交
’ public final static int TRANSACTION_COMMIT_ TYPE= Ox2 < 2 // 事务已提交
-public final static int TRANSACTION_ROLLBACK_ TYPE = Ox3 < 2 // 事务回滚
-public final static int TRANSACTION_NOT_TYPE= 0 // 非事务
Message 扩展属性主要包含下面几个。
tag :消息TAG ,用于消息过滤。
keys: Message 索引键, 多个用空格隔开, RocketMQ 可以根据这些key 快速检索到消息。
waitStoreMsgOK :消息发送时是否等消息存储完成后再返回。
delayTimeLevel : 消息延迟级别,用于定时消息或消息重试。
这些扩展属性存储在Message 的properties 中。
清楚了消息包含的信息之后我们可以去看生产者是如何发送消息的。在源码包中RocketMQ-Client模块中。下面我将分别了解生产者实现的几个部分。
一、生产者启动流程
消息生产者的代码都在client 模块中,相对于RocketMQ 来说,它就是客户端,也是消息的提供者,我们在应用系统中初始化生产者的一个实例即可使用它来发消息。
DefaultMQProducer
DefaultMQProducer 是默认的消息生产者实现类,它实现MQAdmin 的接口。我们可以看到里面提供了很多我们使用MQ常用的方法。
1. v oid createTopic(String key, String newTopic, int queueNum, int topicSysFlag)
创建主题。
key :目前未实际作用,可以与newTopic 相同。
newTopic : 主题名称。
queueNum :队列数量。
topicSysFlag :主题系统标签,默认为0 。
2 ) long searchOffset(final MessageQueue mq, final long timestamp)
根据时间戳从队列中查找其偏移量。
3) MessageExt viewMessage(final String offsetMsgld)
根据消息偏移量查找消息。
4 ) QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin, final long end)
根据条件查询消息。
topic :消息主题。
key :消息索引字段。
maxNum :本次最多取出消息条数。
begin :开始时间。
end :结束时间。
5 ) MessageExt viewMessage(String topic,String msgld)
根据主题与消息ID 查找消息。
6 ) List <MessageQueue> fetchPublishMessageQueues(final String topic)
查找该主题下所有的消息队列。
7 ) SendResult send(final Message msg)同步发送消息,具体发送到主题中的哪个消息队列由负载算法决定。
8 ) SendResult send(final Message msg, final long timeout)同步发送消息,如果发送超过timeout 则抛出超时异常。
9 ) void send(final Message msg, final SendCallback sendCallback)
异步发送消息, send Callback 参数是消息发送成功后的回调方法。
10) void send(final Message msg, final SendCallback sendCallback, final long timeout)
异步发送消息,如果发送超过timeout 指定的值,则抛出超时异常。
11 ) void sendOneway(final Message msg)单向消息发送,就是不在乎发送结果,消息发送出去后该方法立即返回。
12) SendResult send(final Message msg, final MessageQueue mq)同步方式发送消息,发送到指定消息队列。
13 ) void send(final Message msg, final MessageQueue mq, final SendCallback
send Callback)异步方式发送消息,发送到指定消息队列。
14) void sendOneway(final Message msg, final MessageQueue mq)单向方式发送消息,发送到指定的消息队列。
15 ) SendResult send(final Message msg , final MessageQueueSelector selector, final
Object arg)消息发送,指定消息选择算法,覆盖消息生产者默认的消息队列负载。
16 ) SendResult send(final Collection <Message> msgs, final MessageQueu巳mq, final long timeout)同步批量消息发送。
然后以下是他的一些核心属性,我们要了解启动流程就必须清楚这些基本属性代表的含义。
// 生产者所属组,消息服务器在回查事务状态时会随机选择该组中任何一个生产者发起事务回查请求。
private String producerGroup;
private String createTopicKey = MixAll.DEFAULT_TOPIC; //默认topicKey 。
private volatile int defaultTopicQueueNums = 4; // 默认主题在每一个Broker 队列数量。
private int sendMsgTimeout = 3000; // 发送消息默认超时时间, 默认3s 。
private int compressMsgBodyOverHowmuch = 1024 * 4; // 消息体超过该值则启用压缩,默认4K。
private int retryTimesWhenSendFailed = 2; // 同步方式发送消息重试次数,默认为2 ,总共执行3 次。
private int retryTimesWhenSendAsyncFailed = 2 ; // 异步方式发送消息重试次数,默认为2 。
private boolean retryAnotherBrokerWhenNotStoreOK = false; // 消息重试时选择另外一个Broker 时是否不等待存储结果就返回, 默认为false 。
pr ivate int maxMessageSize = 1024 * 1024 * 4; // 4M 允许发送的最大消息长度,默认为4M ,该值最大值为2 ^ 32-1 。
启动的流程(DefaultMQProducerlmpl#starta可以看到):
1.检查productGroup 配置是否符合要求;并改变生产者的instanceName 为进程ID 。
2.构建一个MQClient实例,整个NM 实例中只存在一个MQClientManager实例,维护一个MQClientlnstance 缓存表Table ,同一个clientld 只会创建一个MQClientinstance。(这个ClientId为客户端IP+ instance+ (unitname 可选),这里要考虑如果是同一台物理机部署多个MQ应用程序,ClientId就会相同,会造成混乱的,所以,代码设置如果该实例默认值DEAFAULT,MQ会自动将实例设置为进程id,可以避免不同进程的影响。
- 像MQ去注册当前的生产者,以便后续调用网络请求,心跳检测等。
4.启动MQClient实例,如果已经启动,本次启动不会执行。
二、消息发送的流程实现
RocketMQ一般消息发送的话,主要经历几个步骤:验证消息、查找路由、消息发送。
可以在DefaultMQProducer #send找到以下示例代码:
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
Validators.checkMessage(msg, this);
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.send(msg);
}
defaultMQProducerImpl.send(msg)如下:
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}
public SendResult send(Message msg,
long timeout(默认为3000)) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}
看代码可以发现默认消息发送以同步方式发送,默认超时时间为3s.
主要流程:
1、发送之前,检查生产者状态验证消息规范
消息发送之前,首先会先去确保生产者的运行状态,接着会验证消息的信息是否符合规范,比如主题名称、消息体不能为空,消息长度不能超过多少等等4M。
2、获取路由信息
接着,要获取主题的路由地址,也就是发送的地址,来判断要发送的策略跟节点地址。查询路由之前会先看是否已经存在缓存中,如果没有会去向NameServer查询该topic的路由信息。找不到会抛出异常。
3、选择消息队列
如果找到路由信息会根据它选择消息队列,返回的消息队列会按照broker、序号排序,,如果topicA 在broker-a,broker”b 上分别创建了4 个队列, 那么返回的消息队列:[{ “ broker-Name ”:” broker-a ”,” queueld ”:0}, {“ brokerName ”:” broker- a ”,” queueId ” : 1 },{“ brokerName ”: ” broker-a ”, “queueld ”:2}, {“BrokerName ”:“ broker-a ”,” queueld ” :3,
{“ brokerName ” :” broker-b ”,” queueld ”:0}, {“ brokerName ”:” broker-b”,“ queueld ” :1 }, {“ brokerName ”: ” broker-b ”,” queueld ”:2}, {“ brokerName ” :" broker-b ” , ” queueld ” :3 }],那RocketMQ 如何选择消息队列呢?
首先消息发送端采用重试机制,由retryTimesWhenSendFailed 指定同步方式重试次数,异步重试机制在收到消息发送结构后执行回调之前进行重试。由retryTimesWhenSendAsyncFailed指定,接下来就是循环执行, 选择消息队列、发送消息,发送成功则返回,收到异常则重试。选择消息队列有两种方式。
1 ) sendLatencyFaultEnable=false ,默认不启用Broker 故障延迟机制。
2 ) sendLatencyFaultEnable=true ,启用Broker 故障延迟机制。
默认不启用故障延迟的方式:
首先在一次消息发送过程中,可能会多次执行选择消息队列这个方法, lastBrokerName就是上一次选择的执行发送消息失败的Broker 。第一次执行消息队列选择时,lastBrokerName 为null ,此时直接用sendWhichQueue 自增再获取值, 与当前路由表中消息
队列个数取模, 返回该位置的MessageQueue(selectOneMessageQueue () 方法),如果消息发送再失败的话, 下次进行消息队列选择时规避上次MesageQueue 所在的Broker, 否则还是很有可能再次失败。
但是如果还broker宕机了,下次重试可能还会再访问该broker的下一个队列,这样会有性能损耗浪费,于是有第二种方式,broker故障延迟机制开启状态。这种状态会轮询获取一个消息队列,验证是否可用,同时会设置broker故障时间,在一段时间内,不可用。如果可用会修改该broker的是否可用信息,表明该broker故障已经恢复。
4、发送消息
消息发送API 核心入口: DefaultMQProducerimpl#sendKernelImpl。
根据MessageQueue 获取Broker 的网络地址。如果MQClientlnstance 的
brokerAddrTable 未缓存该Broker 的信息,则从NameServer 主动更新一下topic 的路由信息。如果路由更新后还是找不到Broker 信息,则抛出MQC lientException,提示Broker 不存在。
接着会为消息分配全局唯一id,超过默认大小4k,会使用zip压缩,并且设置消息的系统标记为已压缩,如果是事务Prepared 消息,则设置消息的系统标记为MessageSysFlag.TRANSACTION_PREPARED TYPE 。
如果注册了消息发送钩子函数, 则执行消息发送之前的增强逻辑。通过DefaultMQProducerlmpl#registerSendMessageHook 注册钩子处理类,并且可以注册多个。
构建消息发送请求包。主要包含如下重要信息:生产者组、主题名称、默认创建主题Key 、该主题在单个Broker 默认队列数、队列ID (队列序号)、消息系统标记( MessageSysFlag ) 、消息发送时间、消息标记(RocketMQ 对消息中的flag 不做任何处理,供应用程序使用) 、消息扩展属性、消息重试次数、是否是批量消息等。
最后,根据消息发送方式,同步、异步、单向方式进行网络传输。并且如果注册了消息发送钩子函数,执行after逻辑,就算消息发送时异常了,那都会执行。。
(从源码中可以找到有MQClientlnstance#updateTopicRoutelnfoFromN ameServer 这个方法的功能是消息生产者更新和维护路由缓存。)
上面已经讲解了生产者发送消息的启动流程,在最后具体的三种发送方式的实现并没有深入去讲解,现在我们来看下源码解析下具体内部是怎么实现的。
四、同步发送实现
MQ 客户端发送消息的入口是MQClientAPIImpl#sendMessage 。请求命令是RequestCode.SEND_MESSAGE, 我们可以找到该命令的类:org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessage
处理类是org .apache.rocketmq. broker. processor.SendMessageProcessor
你们可以下载源码去看对于的全类名找到上面的代码去查看。
该方法会根据发送类型选择执行。
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
SendMessageContext mqtraceContext,
SendMessageRequestHeader requestHeader) {
// 1.发送之前,内部有一个super.msgCheck(ctx, requestHeader, response);的执行。
final RemotingCommand response = preSend(ctx, request, requestHeader);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
if (response.getCode() != -1) {
return CompletableFuture.completedFuture(response);
}
final byte[] body = request.getBody();
int queueIdInt = requestHeader.getQueueId();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (queueIdInt < 0) {
queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
}
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
return CompletableFuture.completedFuture(response);
}
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
msgInner.setPropertiesString(requestHeader.getProperties());
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
CompletableFuture<PutMessageResult> putMessageResult = null;
Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (transFlag != null && Boolean.parseBoolean(transFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return CompletableFuture.completedFuture(response);
}
putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
}
同步发送的流程大概如下所示:
第一步. :检查消息发送是否合理,这里完成了以下几件事情:
1 )检查该Broker 是否有写权限。
2 )检查该Topic 是否可以进行消息发送。主要针对默认主题, 默认主题不能发送消息,仅供路由查找。
3 )在NameServer 端存储主题的配置信息,默认路径:${ROCKET_HOME}/store/
config/topiction 。下面是主题存储信息:{order : 是否是顺序消息; perm :权限码;readQueueNums :读队列数量; writeQueueNums : 写队列数量; topicName : 主题名称;topicSysFlag : topicFlag , 当前版本暂为保留; topicFilterType :主题过滤方式,当前版本仅支持SINGLE TAG 。
4 )检查队列,如果队列不合法,返回错误码。
第二步:如果消息重试次数超过允许的最大重试次数,消息将进入到DLD 延迟队列。延迟队列主题: %DLQ%+消费组名,延迟队列在消息消费时我再讲。
最后一步:调用DefaultMessageStore#putMessage 进行消息存储。
五、 异步发送
消息异步发送是指消息生产者调用发送的API 后,无须阻塞等待消息服务器返回本次
消息发送结果,只需要提供一个回调函数,供消息发送客户端在收到响应结果回调。异步方式相比同步方式,消息发送端的发送性能会显著提高,但为了保护消息服务器的负载压力,RocketMQ 对消息发送的异步消息进行了井发控制,通过参数clientAsyncSemaphoreValue来控制,默认为65535 。异步消息发送虽然也可以通过DefaultMQProducer#retryTimes WhenSendAsyncFailed属性来控制消息重试次数,但是重试的调用人口是在收到服务端响应包时进行的,如果出现网络异常、网络超时等将不会重试。
六、单向发送
单向发送是指消息生产者调用消息发送的API 后,无须等待消息服务器返回本次消息
发送结果,并且无须提供回调函数,表示消息发送压根就不关心本次消息发送是否成功,其实现原理与异步消息发送相同,只是消息发送客户端在收到响应结果后什么都不做而已,并且没有重试机制。
七、批量消息发送
最后我们要讲的一个是批量消息发送方式,顾名思义,其实就是将同一主题的多条消息一起打包发送到消息服务端,减少网络调用次数,提高网络传输效率。
并不是在同一批次中发送的消息数量越多性能就越好,其判断依据是单条消息的长度,如果单条消息内容比较长, 则打包多条消息发送会影响其他线程发送消息的响应时间,并且单批次消息发送总长度不能超过DefaultMQProducer#maxMessageSize 。
批量消息发送要解决的是如何将这些消息编码以便服务端能够正确解码出每条消息的消息内容。
我们来看下消息RemotingCommand 的属性。
1 ) code :请求命令编码,请求命令类型。
2 ) version : 版本号。
3 ) opaque : 客户端请求序号。
4 ) flag : 标记。倒数第一位表示请求类型, O : 请求; 1 :返回。倒数第二位, l : 表示oneway 。
5 ) remark :描述。
6 ) extFields : 扩展属性。
7 ) customeHeader : 每个请求对应的请求头信息。
8 ) byte[] body :消息体内容。
单条消息发送时,消息体的内容将保存在body 中。批量消息发送,需要将多条消息体
的内容存储在body 中 。
如何存储方便服务端正确解析出每条消息呢?
RocketMQ 采取的方式是,对单条消息内容使用固定格式进行存储
image.png
看下MQ如何实现的
首先在消息发送端,调用batch 方法,将一批消息封装成MessageBatch 对象。MessageBatch继承自Message 对象, MessageBatch 内部持有List<Message> messages 。这样的话,批量消息发送与单条消息发送的处理流程完全一样。MessageBatch 只需要将该集合中的每条消息的消息体body 聚合成一个byte 口数值,在消息服务端能够从该byte [] 数值中正确解析出消息即可。
MessageBatch#encode
public byte [ ] encode ( ) {
return MessageDecoder.encodeMessages(messages);
}
在创建RemotingCommand 对象时将调用messageBatch#encode()方法填充到RemotingCommand的body 域中。多条消息编码格式如图:
public static byte[] encodeMessages(List<Message> messages) {
//TO DO refactor, accumulate in one buffer, avoid copies
List<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size());
int allSize = 0;
for (Message message : messages) {
byte[] tmp = encodeMessage(message);
encodedMessages.add(tmp);
allSize += tmp.length;
}
byte[] allBytes = new byte[allSize];
int pos = 0;
for (byte[] bytes : encodedMessages) {
System.arraycopy(bytes, 0, allBytes, pos, bytes.length);
pos += bytes.length;
}
return allBytes;
}
在消息发送端将会按照上述结构进行解码,然后整个发送流程与单个消息发送没什么差异。
总结:
生产者启动的流程:
启动的流程(DefaultMQProducerlmpl#starta可以看到):
1.检查productGroup 配置是否符合要求;并改变生产者的instanceName 为进程ID 。
2.构建一个MQClient实例,整个NM 实例中只存在一个MQClientManager实例,维护一个MQClientlnstance 缓存表Table ,同一个clientld 只会创建一个MQClientinstance。(这个ClientId为客户端IP+ instance+ (unitname 可选),这里要考虑如果是同一台物理机部署多个MQ应用程序,ClientId就会相同,会造成混乱的,所以,代码设置如果该实例默认值DEAFAULT,MQ会自动将实例设置为进程id,可以避免不同进程的影响。
- 像MQ去注册当前的生产者,以便后续调用网络请求,心跳检测等。
4.启动MQClient实例,如果已经启动,本次启动不会执行。
发送消息的流程:
1、发送之前,检查生产者状态验证消息规范。
2、获取路由信息
3、选择消息队列
4、发送消息
发送消息时根据发送方式选择同步、异步、单向方式进行发送。
各种发送差异,见上文。
阅读摘自《RocketMQ技术内幕实现》阿里巴巴推荐
网友评论