一、 关键特性
1 消息发送和消费
1)消息发送者步骤分析:
- 创建消息生产者producer,并制定生产者组名
- 指定NameServer地址
- 启动producer
- 创建消息对象,指定主题Topic、Tag和消息体
- 发送消息
- 关闭生产者producer
2)消息消费者步骤分析:
- 创建消费者consumer,制定消费者组名
- 指定NameServer地址
- 订阅主题Topic和Tag
- 设置回调函数,处理消息
- 启动消费者consumer
2 消息类型
使用RocketMQ可以发送普通消息、顺序消息、事务消息,顺序消息能实现有序消费,事务消息可以解决分布式事务实现数据最终一致。
1)普通消息
消息队列 MQ 提供三种方式来发送普通消息:
- 可靠同步发送
同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式。这种可靠的消息发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
public class SyncProducer {
public static void main(String[] args) throws Exception {
//- 创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
//- 指定NameServer地址
producer.setNamesrvAddr("192.168.217.130:9876");
//- 启动producer
producer.start();
//- 创建消息对象,指定主题Topic、Tag和消息体
Message message = new Message("base","Tag1","keys_1",("hello").getBytes());
//- 发送消息
SendResult result = producer.send(message);
//发送状态
SendStatus sendStatus = result.getSendStatus();
//消息id
String msgId = result.getMsgId();
//消息接受队列id
int queueId = result.getMessageQueue().getQueueId();
TimeUnit.SECONDS.sleep(3);
System.out.println("发送状态"+result+",消息id"+msgId+",队列"+queueId);
//- 关闭生产者producer
producer.shutdown();
}
}
- 可靠异步发送
异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式,发送方通过回调接口接收服务器响应,并对响应结果进行处理。异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
public class AsyncProducer {
public static void main(String[] args) throws Exception {
//- 创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
//- 指定NameServer地址
producer.setNamesrvAddr("192.168.217.130:9876");
//- 启动producer
producer.start();
//- 创建消息对象,指定主题Topic、Tag和消息体
for (int i = 0; i < 3; i++) {
Message message = new Message("base","Tag2",("hello"+i).getBytes());
//- 发送异步消息
producer.send(message, new SendCallback() {
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功:"+sendResult);
}
public void onException(Throwable throwable) {
System.out.println("发送异常:"+throwable);
}
});
TimeUnit.SECONDS.sleep(3);
}
//- 关闭生产者producer
producer.shutdown();
}
}
- 单向发送消息
这种方式注意用在不特别关心发送结果的场景,例如日志发送。
public class OnewayProducer {
public static void main(String[] args) throws Exception {
//- 创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
//- 指定NameServer地址
producer.setNamesrvAddr("192.168.217.130:9876");
//- 启动producer
producer.start();
//- 创建消息对象,指定主题Topic、Tag和消息体
for (int i = 0; i < 3; i++) {
Message message = new Message("base","Tag3",("hello"+i).getBytes());
//- 发送单向消息
producer.sendOneway(message);
TimeUnit.SECONDS.sleep(3);
}
//- 关闭生产者producer
producer.shutdown();
}
}
- 编写消息消费者消费消息( 启动时需要先启动消费者监听)
public class Consumer {
public static void main(String[] args) throws MQClientException {
//- 创建消费者consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//- 指定NameServer地址
consumer.setNamesrvAddr("192.168.217.130:9876");
//- 订阅主题Topic和Tag
consumer.subscribe("base","*");
//- 设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
//接收消息内容
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : list) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//- 启动消费者consumer
consumer.start();
}
}
RocketMQ 常见异常处理
2) 延时消息
消息在发送到消息队列 MQ 服务端后并不会立马投递,而是根据消息中的属性延迟固定时间后才投递给消费者。但是RocketMQ不支持任意时间精度,仅支持特定的 level,例如定时 5s, 10s, 1m 等。其中,level=0 级表示不延时,level=1 表示 1 级延时,level=2 表示 2 级延时,以此类推。
在服务器端(rocketmq-broker端)的属性配置文件中加入以下行:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
描述了各级别与延时时间的对应映射关系。
• 这个配置项配置了从1级开始,各级延时的时间,可以修改这个指定级别的延时时间;
• 时间单位支持:s、m、h、d,分别表示秒、分、时、天;
• 默认值就是上面声明的,可手工调整;
• 默认值已够用,不建议修改这个值。
public class DelayProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.217.130:9876");
producer.start();
//延时10s
Message message = new Message("base","Tag1","keys_1",("hello").getBytes());
message.setDelayTimeLevel(3);
producer.send(message);
producer.shutdown();
}
}
如果你使用阿里云服务器,可以使用阿里封装的api,它支持定时消息和延时消息,可以适应更多场景。
详细介绍和代码示例
3) 顺序消息
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
详细介绍
4) 事务消息
消息队列 MQ 提供类似 X/Open XA 的分布式事务功能,通过消息队列 MQ 事务消息能达到分布式事务的最终一致。上图说明了事务消息的大致流程:正常事务消息的发送和提交、事务消息的补偿流程。
-
事务消息发送及提交:
①发送消息(half消息);
②服务端响应消息写入结果;
③根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行);
④根据本地事务状态执行Commit或Rollback(Commit操作生成消息索引,消息对消费者可见)。 -
事务消息的补偿流程:
①对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”;
②Producer收到回查消息,检查回查消息对应的本地事务的状态。
③根据本地事务状态,重新Commit或RollBack
其中,补偿阶段用于解决消息Commit或Rollback发生超时或者失败的情况。
- 事务消息状态:
事务消息共有三种状态:提交状态、回滚状态、中间状态:
①TransactionStatus.CommitTransaction:提交事务,它允许消费者消费此消息。
②TransactionStatus.RollbackTransaction:回滚事务,它代表该消息将被删除,不允许被消费。
③TransactionStatus.Unkonwn:中间状态,它代表需要检查消息队列来确定消息状态。
详细介绍和代码示例
消息类型对比:
Topic的消息类型 | 是否支持事务消息 | 是否支持定时/延时消息 | 性能 |
---|---|---|---|
无序消息(普通、事务、定时/延时消息) | 是 | 是 | 最高 |
分区顺序消息 | 否 | 否 | 高 |
全局顺序消息 | 否 | 否 | 一般 |
发送方式对比:
消息类型 | 是否支持同步发送 | 是否支持异步发送 | 是否支持单向发送 |
---|---|---|---|
无序消息(普通、事务、定时/延时消息) | 是 | 是 | 最高 |
分区顺序消息 | 是 | 否 | 否 |
全局顺序消息 | 是 | 否 | 否 |
3 批量消息
批量发送消息能显著提高传递消息的性能,限制是这些消息应该具有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批量消息的总大小不应超过1MB。如果超过,需要把消息分割。
不超过1M,直接producer.send(msg)就可以了。
超过IM,消息分割代码示例
4 消息消费方式
(1)负载均衡模式
消费者默认采用负载均衡方式,多个消费者共同消费队列消息,每个消费者处理的消息不同。
(2)广播模式
消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的。
Producer负载均衡
Producer端,每个实例在发消息的时候,默认会轮询所有的message queue发送,以达 到让消息平均落在不同的queue上。而由于queue可以散落在不同的broker,所以消息 就发送到不同的broker下,如下图:
图中箭头线条上的标号代表顺序,发布方会把第一条消息发送至 Queue 0,然后第二条 消息发送至 Queue 1,以此类推。
Consumer负载均衡
1)集群模式
在集群消费模式下,每条消息只需要投递到订阅这个topic的Consumer Group下的一个 实例即可。RocketMQ采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定 拉取哪一条message queue。 而每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照queue的 数量和实例的数量平均分配queue给每个实例。 默认的分配算法是AllocateMessageQueueAveragely,如下图:
还有另外一种平均的算法是AllocateMessageQueueAveragelyByCircle,也是平均分摊 每一条queue,只是以环状轮流分queue的形式,如下图:
需要注意的是,集群模式下,queue都是只允许分配只一个实例,这是由于如果多个实 例同时消费一个queue的消息,由于拉取哪些消息是consumer主动控制的,那样会导致 同一个消息在不同的实例下被消费多次,所以算法上都是一个queue只分给一个 consumer实例,一个consumer实例可以允许同时分到不同的queue。 通过增加consumer实例去分摊queue的消费,可以起到水平扩展的消费能力的作用。而 有实例下线的时候,会重新触发负载均衡,这时候原来分配到的queue将分配到其他实 例上继续消费。 但是如consumer实例的数量比message queue的总数量还多的话,多出来的 consumer实例将无法分到queue,也就无法消费到消息,也就无法起到分摊负载的作用了。所以需要控制让queue的总数量大于等于consumer的数量。
2)广播模式
由于广播模式下要求一条消息需要投递到一个消费组下面所有的消费者实例,所以也就 没有消息被分摊消费的说法。 在实现上,就是在consumer分配queue的时候,所有consumer都分到所 有的queue。
《深入理解RocketMQ》- MQ消息的投递机制
5 简单消息过滤
1) Tag过滤
RocketMQ 的消息过滤方式有别于其他消息中间件,是在订阅时,再做过滤,先来看下 Consume Queue 的存储结构。
(1)在 Broker 端进行 Message Tag 比对,先遍历 Consume Queue,如果存储的 Message Tag 与订阅的 Message Tag 不符合,则跳过,继续比对下一个,符合则传输给 Consumer。注意:Message Tag 是字符串形式,Consume Queue 中存储的是其对应的 hashcode,比对时也是比对 hashcode。
(2)Consumer 收到过滤后的消息后,同样也要执行在 Broker 端的操作,但是比对的是真实的 Message Tag 字 符串,而不是 Hashcode。
为什么过滤要这样做?
(1)Message Tag 存储 Hashcode,是为了在 Consume Queue 定长方式存储,节约空间。
(2)过滤过程中不会访问 Commit Log 数据,可以保证堆积情况下也能高效过滤。
(3) 即使存在 Hash 冲突,也可以在 Consumer 端进行修正,保证万无一失。
简单消息过滤通过指定多个 Tag 来过滤消息,过滤动作在服务器进行。如:
consumer.subscribe("TopicTest1", "TagA || TagC || TagD");
以上方式对于复杂的场景可能不起作用,因为一个消息只能有一个tag。这种情况下,可以使用SQL表达式筛选消息。
2) SQL语法过滤
consumer.subscribe("TopicTest",MessageSelector
.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" + "and (a is not null and a between 0 3)"));
注意:只有使用push模式的消费者此案使用SQL92标准的sql语句。
6 消息重试
1)顺序消息的重试
对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重 试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。
2) 无序消息的重试
对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,您可以通过设置返回状态达到消息重试的结果。
无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。
3)死信队列
当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次 数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。 在消息队列 RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。
7 消费幂等
二、消息存储
分布式队列因为有高可靠性的要求,所以数据要进行持久化存储。
流程:
(1) 消息生成者发送消息;
(2) MQ收到消息,将消息进行持久化,在存储中新增一条记录 ;
(3) 返回ACK给生产者;
(4) MQ push 消息给对应的消费者,然后等待消费者返回ACK;
(5) 如果消息消费者在指定时间内成功返回ack,那么MQ认为消息消费成功,在存储中 删除消息,即执行第6步;如果MQ在指定时间内没有收到ACK,则认为消息消费失 败,会尝试重新push消息,重复执行4、5、6步骤
(6) MQ删除消息。
1 存储介质
(1) 关系型数据库DB
Apache下开源的另外一款MQ—ActiveMQ(默认采用的KahaDB做消息存储)可选用 JDBC的方式来做消息持久化,通过简单的xml配置信息即可实现JDBC消息存储。由于, 普通关系型数据库(如Mysql)在单表数据量达到千万级别的情况下,其IO读写性能往往 会出现瓶颈。在可靠性方面,该种方案非常依赖DB,如果一旦DB出现故障,则MQ的消 息就无法落盘存储会导致线上故障。
(2) 文件系统
目前业界较为常用的几款产品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盘 至所部署虚拟机/物理机的文件系统来做持久化(刷盘一般可以分为异步刷盘和同步刷盘两种模式)。消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据 持久化方式。除非部署MQ机器本身或是本地磁盘挂了,否则一般是不会出现无法持 久化的故障问题。
2 性能对比
文件系统>关系型数据库DB
3 消息的存储和发送
1)消息存储
磁盘如果使用得当,磁盘的速度完全可以匹配上网络 的数据传输速度。目前的高性能磁 盘,顺序写速度可以达到600MB/s, 超过了一般网卡的传输速度。但是磁盘随机写的速 度只有大概100KB/s,和顺序写的性能相差6000倍!因为有如此巨大的速度差别,好的 消息队列系统会比普通的消息队列系统速度快多个数量级。RocketMQ的消息用顺序写, 保证了消息存储的速度。
2)消息发送
Linux操作系统分为【用户态】和【内核态】,文件操作、网络操作需要涉及这两种形态 的切换,免不了进行数据复制。 一台服务器 把本机磁盘文件的内容发送到客户端,一般分为两个步骤:
1)read;读取本地文件内容;
2)write;将读取的内容通过网络发送出去。
这两个看似简单的操作,实际进行了4 次数据复制,分别是:
1. 从磁盘复制数据到内核态内存;
2. 从内核态内存复 制到用户态内存;
3. 然后从用户态 内存复制到网络驱动的内核态内存;
4. 最后是从网络驱动的内核态内存复 制到网卡中进行传输。
Consumer 消费消息过程,使用了零拷贝,零拷贝包含以下两种方式
- 使用 mmap + write 方式
优点:即使频繁调用,使用小块文件传输,效率也很高
缺点:不能很好的利用 DMA 方式,会比 sendfile 多消耗 CPU,内存安全性控制复杂,需要避免 JVM Crash问题。 - 使用 sendfile 方式
优点:可以利用 DMA 方式,消耗 CPU 较少,大块文件传输效率高,无内存安全新问题。
缺点:小块文件效率低于 mmap 方式,只能是 BIO 方式传输,不能使用 NIO。
RocketMQ 选择了第一种方式,mmap+write 方式,因为有小块数据传输的需求,效果会比 sendfile 更好。
关于 Zero Copy 的更详细介绍,请参考以下文章
http://www.linuxjournal.com/article/6345
通过使用mmap的方式,可以省去向用户态的内存复制,提高速度。这种机制在Java中是 通过MappedByteBuffer实现的 RocketMQ充分利用了上述特性,提高消息存盘和网络发送 的速度。
这里需要注意的是,采用MappedByteBuffer这种内存映射的方式有几个限制,其 中之一是一次只能映射1.5~2G 的文件至用户态的虚拟内存,这也是为何RocketMQ 默认设置单个CommitLog日志数据文件为1G的原因了。
MQ消息最终一致性解决方案
4 消息存储结构
RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成 的,消息真正的物 理存储文件是CommitLog,ConsumeQueue是消息的逻辑队列,类似数据库的索引文 件,存储的是指向物理存储的地址。每 个Topic下的每个Message Queue都有一个对应 的ConsumeQueue文件。
-
CommitLog:存储消息的元数据
-
ConsumerQueue:存储消息在CommitLog的索引
-
IndexFile:为了消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程
5 刷盘机制
RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复, 又可以让存储的消息 量超出内存的限制。RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通 过Producer写入RocketMQ的时 候,有两种写磁盘方式,分布式同步刷盘和异步刷盘。
1)同步刷盘
在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE 后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线 程,返回消息写 成功的状态。
2)异步刷盘
在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞 吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。
3)配置
同步刷盘还是异步刷盘,都是通过Broker配置文件里的flushDiskType 参数设置的, 这个参数被配置成SYNC_FLUSH、ASYNC_FLUSH中的 一个。
三、高可用性机制
RocketMQ分布式集群是通过Master和Slave的配合达到高可用性的。
Master和Slave的区别:在Broker的配置文件中,参数 brokerId的值为0表明这个Broker 是Master,大于0表明这个Broker是 Slave,同时brokerRole参数也会说明这个Broker 是Master还是Slave。 Master角色的Broker支持读和写,Slave角色的Broker仅支持读,也就是 Producer只能 和Master角色的Broker连接写入消息;Consumer可以连接 Master角色的Broker,也可 以连接Slave角色的Broker来读取消息。
1 消息消费高可用
在Consumer的配置文件中,并不需要设置是从Master读还是从Slave 读,当Master不 可用或者繁忙的时候,Consumer会被自动切换到从Slave 读。有了自动切换Consumer 这种机制,当一个Master角色的机器出现故障后,Consumer仍然可以从Slave读取消 息,不影响Consumer程序。这就达到了消费端的高可用性。
2 消息发送高可用
在创建Topic的时候,把Topic的多个Message Queue创建在多个Broker组上(相同 Broker名称,不同 brokerId的机器组成一个Broker组),这样当一个Broker组的 Master不可 用后,其他组的Master仍然可用,Producer仍然可以发送消息。 RocketMQ目前还不支持把Slave自动转成Master,如果机器资源不足, 需要把Slave转 成Master,则要手动停止Slave角色Broker,更改配置文 件,用新的配置文件启动 Broker。
3 消息主从复制
如果一个Broker组有Master和Slave,消息需要从Master复制到Slave 上,有同步和异步两种复制方式。
1)同步复制
同步复制方式是等Master和Slave均写 成功后才反馈给客户端写成功状态;
在同步复制方式下,如果Master出故障, Slave上有全部的备份数据,容易恢复,但是同 步复制会增大数据写入 延迟,降低系统吞吐量。
2)异步复制
异步复制方式是只要Master写成功 即可反馈给客户端写成功状态。在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果Master出了故障, 有些数据因为没有被写 入Slave,有可能会丢失;
3)配置
同步复制和异步复制是通过Broker配置文件里的brokerRole参数进行设置的,这个参数 可以被设置成ASYNC_MASTER、 SYNC_MASTER、SLAVE三个值中的一个。
4) 总结
实际应用中要结合业务场景,合理设置刷盘方式和主从复制方式, 尤其是SYNC_FLUSH 方式,由于频繁地触发磁盘写动作,会明显降低 性能。通常情况下,应该把Master和 Save配置成ASYNC_FLUSH的刷盘 方式,主从之间配置成SYNC_MASTER的复制方式,这 样即使有一台 机器出故障,仍然能保证数据不丢,是个不错的选择。
网友评论