一、消息中间件对比
kafka | RocketMQ | RabbitMQ | ||
---|---|---|---|---|
定位 | 设计定位 | 系统间的数据流管道,实时数据处理。例如常规的消息系统、监控数据、日志收集 | 可靠的消息传输,例如消息推送 | 可靠的消息传输,与RocketMQ类似。 |
开发语言 | Scala | Java | Erlang | |
客户端语言 | Java,Python,C | Java | Java,Python,C | |
注册中心 | Zookeeper | namespace | 无 | |
选举方式 | 自动选举 | 不支持自动选举 | 无 | |
数据可靠性 | 很好。支持同步刷盘,同步复制,但性能差。 | 很好,支持同、异步刷盘,同步双写,异步复制 | 好 | |
消息写入性能 | 非常好,每条10个字符测试:百万条/s,Topic数量60个左右后性能会下降 | 很好,每条10个字符测试:单机单broker 7w/s,单机3broker 12w/s,Topic数量支持5W条左右 | 好,2W/s左右 | |
性能稳定性 | 队列、分区多的时候性能不稳定,明显下降,消息堆积时性能稳定 | 队列多的时候,消息堆积时性能稳定 | 消息堆积时性能不稳定 | |
消息堆积能力 | 非常好 | 非常好 | 一般 | |
消息获取 | pull | pull,push | pull,push | |
顺序消费 | 支持 | 支持,局部有序 | 支持 | |
定时消息 | 支持不好 | 支持,开源只支持指定级别的延迟 | 支持不好 | |
事务消息 | 不支持 | 支持 | 不支持 | |
消息查询 | 不支持 | 支持 | 不支持 |
二、RocketMQ架构分布图
image.pngApache RocketMQ是一个分布式消息传递和流媒体平台,具有低延迟,高性能和可靠性, 万亿级容量和灵活的可伸缩性。 它由四个部分组成:nameserver,broker,生产者和使用者。 它们中的每一个都可以水平扩展,而没有单个故障点。
- nameserver:提供轻量级的服务发现和路由。 每个名称服务器记录完整的路由信息,提供 相应的读写服务,并支持快速的存储扩展。 注意:nameserver集群中每个nameserver都是相互独立的,与zookeeper不同,nameserver节点间没有通讯,也没有主从、选举概念。
- Broker:通过提供轻量级的TOPIC和QUEUE机制来存储消息,把自身信息注册到每个nameserver中。
- 生产者:本地随机从nameserver中维护broker的信息,并与每个master broker有心跳通讯。
- 消费者:本地随机从nameserver中维护broker的信息,并与每个master broker有心跳通讯。
三、RocketMQ环境
环境变量
#java环境
export JAVA_HOME=/usr/local/jdk
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$JAVA_HOME/bin:$PATH
#rocketmq环境
export ROCKETMQ_HOME=/usr/local/rocketmq/rocketmq‐all‐4.1.0‐incubating
export PATH=$ROCKETMQ_HOME/bin:$PATH
broker配置
#rocketmq‐name服务地址,多个地址用;分开,不配置默认为localhost:9876
namesrvAddr = 192.168.241.198:9876
brokerClusterName = DefaultCluster
brokerName = broker‐a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
#主从角色SYNC_MASTER,ASYNC_MASTER,SLAVE
brokerRole = SYNC_MASTER
flushDiskType = ASYNC_FLUSH
#允许自动创建主题topic
autoCreateTopicEnable=true
#broker监听端口
listenPort=10911
#数据存储位置
storePathRootDir=/root/rocketmq/store
- brokerClusterName:所属集群名称,如果节点较多可以配置多个。
- brokerName:brokerName为子集群的名称,子集群中有一个Master与多个Slave,子集群下所有节点的brokerName必须一样,brokerId不一样,默认brokerId = 0的为Master节点,大于0的为Salve节点。
- namesrvAddr:注册中心连接开放端口,可以配置多个,用分号分隔。
- deleteWhen:删除数据的时间,04代表凌晨4点,fileReservedTime为数据保存在磁盘的时长,单位小时。
- brokerRole:Master节点与Slave节点间的同步方式,有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示Master和Slave之间同步数据的机制,其中Slave一致性使用SLAVE;
- flushDiskType:刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消 息写入磁盘后才返回成功状态,ASYNC_FLUSH不需要;
- autoCreateTopicEnable:自动新建topic,默认为false。
- listenPort:启动监听的端口号。
- storePathRootDir:磁盘存储消息的根目录。
内存的设置
rocketmq集群内存的设置是针对注册中心namesrv与broker内存的设置,分别设置rocketmq bin目录下的runserver.sh与runbroker.sh(或者runserver.cmd与runbroker.cmd)。
- runserver配置
JAVA_OPT="${JAVA_OPT} ‐server ‐Xms256m ‐Xmx256m ‐Xmn128m ‐XX:MetaspaceSi ze=64m ‐XX:MaxMetaspaceSize=128m"
- runbroker配置
JAVA_OPT="${JAVA_OPT} ‐server ‐Xms256m ‐Xmx256m ‐Xmn128m ‐XX:MetaspaceSi ze=64m ‐XX:MaxMetaspaceSize=128m"
单机运行
#启动注册中心
nohup sh bin/mqnamesrv ‐n 192.168.241.198:9876
#启动broker
nohup sh bin/mqbroker ‐n 192.168.241.198:9876 ‐c conf/broker.conf &
注意:启动注册中心或者broker的时候最好指定一下IP,防止在多网卡或者dockers的环境下,IP使用错误。
多机集群部署
在主目录下的conf文件夹下提供了多种broker配置模式,分别有:2m-2s-async,2m-2s- sync,2m-noslave。若目前2台机器,分别部署1个 NameServer,同时分别部署一个Master和一个Slave,互为主备。
-
注册中心配置
namesrvAddr配置与在单机的环境下无异。 -
broker配置
broker节点在集群中有主从之分,与单机环境下的配置差异性主要体现如下:
- master
#broker节点注册到多个注册中心
namesrvAddr = 192.168.241.198:9876;192.168.241.199:9876
#主节点
brokerId = 0
#SYNC_MASTER或者ASYNC_MASTER
brokerRole = SYNC_MASTER
- slave
#broker节点注册到多个注册中心
namesrvAddr = 192.168.241.198:9876;192.168.241.199:9876
#非0表示从节点唯一标志
brokerId = 1
#表明从节点
brokerRole = SLAVE
环境验证
- 查看集群监控状态
sh mqadmin clusterlist ‐n 192.168.241.198:9876;192.168.241.199:9876
- 测试
export NAMESRV_ADDR=192.168.241.198:9876;192.168.241.199:9876
测试发送端
> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
测试消费端
> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
rocketmq console
进入rocketmq-externals项目的GitHub地址,如下图,可看到RocketMQ项目的诸多扩展项目,其中就包含我们需要下载的rocketmq-console。
rocketmq-console是一个springboot项目,跑之前修改下配置。
四、基本概念
消息模型
RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责 生产消息,Consumer 负责消费消息,Broker 负责存储消息。
消息对象
-
生产者(producer)
负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。 -
消费者(Consumer)
负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费 (pull consumer)、推动式消费(push consumer)。
主题(Topic)
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
代理服务器(Broker Server)
消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收 从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相 关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
注册中心服务(Name Server)
注册中心服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的 Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。
组
-
生产者组(Producer Group)
同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的 是事物消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。 -
消费者组(Consumer Group)
同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费 (Clustering)和广播消费(Broadcasting)。
客户端消费
-
拉取式消费(Pull Consumer)
Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务 器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。 -
推动式消费(Push Consumer)
Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。 -
集群消费(Clustering)
集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。 -
广播消费(Broadcasting)
广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。
消息(Message)
消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。 系统提供了通过Message ID和Key查询消息的功能。
顺序消息
-
普通顺序消息(Normal Ordered Message)
普通顺序消费模式下,消费者通过同一个消费队列收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。 -
严格顺序消息(Strictly Ordered Message)
严格顺序消息模式下,消费者收到的所有消息均是有顺序的。
标签(Tag)
为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息, 可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连 贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消 费逻辑,实现更好的扩展性。
image.png注意1:每个broker中都会有一个commitlog,由于记录生产者发送的消息。
注意2:每个broker中有多个Topic,每个Topic中默认有4个queue队列,每个queue对应一个持久化文件。
注意3:每个broker中会对应一个consumerOffset.json文件,用于记录队列消费的节点到哪了。
注意4:consumer、producer与broker间的通信基于Netty来实现的,默认为Netty中的epoll模式,若系统不支持epoll模式,才使用nio模式。
注意5:producer在发送消息的时候,会以轮循的方式放置于队列中(比如图上broker-master-1与broker-master-2共8个),若有顺序消息的话,会保证所有顺序消息放在同一个队列中。
没开始使用的broker内部的文件。
image.png
已经使用的broker内部的文件。
image.png
config内部结构
image.png
store:存储commitlog文件,每个broker对应一个commitlog,commitlog中存储的是topic真正的内容数据。
index:索引。
consumequeue:存储每个主题下的队列,默认每个主题4个队列,这边存储的主要是消息的tag、消息对应在commitlog的地址、空间大小等,。
topic.json: 存储所有topic的信息,主要为topic的属性信息。
consumerOffset.json:消费者偏移量信息,对应了每个主题@每个消费群组{队列1:偏移量,队列2:偏移量,队列3:偏移量,队列4:偏移量}
名称 | 作用 |
---|---|
broker | broker模块:c和p端消息存储逻辑 |
client | 客户端api:produce、consumer端 接受与发送api |
common | 公共组件:常量、基类、数据结构 |
tools | 运维tools:命令行工具模块 |
store | 存储模块:消息、索引、commitlog存储 |
namesrv | 服务管理模块:服务注册topic等信息存储 |
remoting | 远程通讯模块:netty+fastjson |
logappender | 日志适配模块 |
example | Demo列子 |
filtersrv | 消息过滤器模块 |
srvutil | 辅助模块 |
filter | 过滤模块:消息过滤模块 |
distribution | 部署、运维相关zip包中的代码 |
openmessaging | 兼容openmessaging分布式消息模块 |
五、使用
1. 同步、异步、一次性
生产者
同步
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("tl_msg_student_group");
producer.setNamesrvAddr("192.168.241.198:9876");
//producer.setSendMsgTimeout(10000);
producer.start();
Message msg = new Message("TopicStudent" ,
"TagStudent" ,
"tag" ,
("Hello tuling RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET)
);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
producer.shutdown();
}
异步
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("tl_message_group");
producer.setNamesrvAddr("192.168.241.198:9876;192.168.241.199:9876");
producer.start();
//设置发送失败重试机制
producer.setRetryTimesWhenSendAsyncFailed(5);
int messageCount = 1;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
final int index = i;
Message msg = new Message("TopicTest",
"TagSendOne",
"OrderID188",
"I m sending msg content is yangguo".getBytes(RemotingHelper.DEFAULT_CHARSET));
//消息发送成功后,执行回调函数
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
//防止回调未回,producer就已经删除
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
一次性
DefaultMQProducer producer = new DefaultMQProducer("tl_message_group");
// Specify name server addresses.
producer.setNamesrvAddr("192.168.241.198:9876;192.168.241.199:9876");
producer.setSendMsgTimeout(10000);
producer.start();
for (int i = 0; i < 1; i++) {
Message msg = new Message("TopicTest" /* Topic */,
"TagSendOne" /* Tag */,
"OrderID198",
("Hello RocketMQ test i " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
producer.sendOneway(msg);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
消费者
// tl_msg_student_group
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tl_student_group");
// ;192.168.241.199:9876
consumer.setNamesrvAddr("192.168.241.198:9876;192.168.241.199:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("TopicStudent", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs){
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
2. 广播消息
生产者
DefaultMQProducer producer = new DefaultMQProducer("consumer_model_group");
producer.setNamesrvAddr("192.168.241.198:9876;192.168.241.199:9876");
producer.start();
for (int i = 0; i < 4; i++){
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
("Hello world"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_model_group");
consumer.setNamesrvAddr("192.168.241.198:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//广播,全量消费
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt ext : msgs){
System.out.printf(Thread.currentThread().getName() + " Receive New Message: " + new String(ext.getBody()) + "%n");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Broadcast Consumer Started.%n");
3. 批量消息
批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的 topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应 超过4MB。rocketmq建议每次批量消息大小大概在1MB。 当消息大小超过4MB时,需要将消息进行分割。
生产者
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1000 * 1000 * 1;//1MB
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
//遍历消息准备拆分
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; //for log overhead
if (tmpSize > SIZE_LIMIT) {
if (nextIndex - currIndex == 0) {
nextIndex++;
}
break;
}
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}
/**
* rocketMq 支持消息批量发送
* 同一批次的消息应具有:相同的主题,相同的waitStoreMsgOK,并且不支持定时任务。
* <strong> 同一批次消息建议大小不超过~1M </strong>,消息最大不能超过4M,需要
* 对msg进行拆分
*/
DefaultMQProducer producer = new DefaultMQProducer("batch_group");
producer.setNamesrvAddr("192.168.241.198:9876");
producer.start();
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
ListSplitter splitter = new ListSplitter(messages);
/**
* 对批量消息进行拆分
*/
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();
消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("batch_group");
// ;192.168.241.199:9876
consumer.setNamesrvAddr("192.168.241.198:9876");
consumer.subscribe("BatchTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs){
System.out.println("queueId=" + msg.getQueueId() + "," + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
4. 过滤消息
RocketMq在消息过滤这块做得很强大,它可以通过Tag过滤消息,可以通过SQL表达式筛选消息,它也可以支持java脚本过滤。
其中通过SQL表达式筛选 和 java脚本过滤 需要在broker的配置文件中把对应的配置打开。
enablePropertyFilter=true
Topic 与 Tag 都是业务上用来归类的标识,区分在于 Topic 是一级分类,而 Tag 可以说是二级分类,关系如图所示。
image.png
生产者
/***
* TAG-FILTER-1000 ---> 布隆过滤器
* 过滤掉的那些消息。直接就跳过了么。下次就不会继续过滤这些了。是么。
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("filter_sample_group");
producer.setNamesrvAddr("192.168.241.198:9876");
producer.start();
for (int i = 0; i < 3; i++) {
Message msg = new Message("TopicFilter",
"TAG-FILTER",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
msg.putUserProperty("a",String.valueOf(i));
if(i % 2 == 0){
msg.putUserProperty("b","yangguo");
}else{
msg.putUserProperty("b","xiaolong girl");
}
producer.send(msg);
}
producer.shutdown();
}
消费者
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter_sample_group");
/**
* 注册中心
*/
consumer.setNamesrvAddr("192.168.241.198:9876;192.168.241.199:9876");
/**
* 订阅主题
* 一种资源去换取另外一种资源
*/
consumer.subscribe("TopicFilter", MessageSelector.bySql("a between 0 and 3 and b = 'yangguo'"));
/**
* 注册监听器,监听主题消息
*/
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs){
try {
System.out.println("consumeThread=" + Thread.currentThread().getName()
+ ", queueId=" + msg.getQueueId() + ", content:"
+ new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Filter Consumer Started.%n");
}
5. 延迟消息
定时消息是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点 或者等待特定的时间后才能被消费。
使用场景:如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的 状态,如果还是未付款就取消订单释放库存。
当前支持的延迟时间
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
分别对应级别
1 2 3....................
生产者
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ExampleConsumer");
//;192.168.241.199:9876
producer.setNamesrvAddr("192.168.241.198:9876;192.168.241.199:9876");
producer.start();
int totalMessagesToSend = 3;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
//延时消费
message.setDelayTimeLevel(6);
// Send the message
producer.send(message);
}
System.out.printf("message send is completed .%n");
producer.shutdown();
}
消费者
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
//;192.168.241.199:9876
consumer.setNamesrvAddr("192.168.241.198:9876");
consumer.subscribe("TestTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
// Print approximate delay time period
System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
+ "message content is :" + new String(message.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
//System.out.printf("Consumer Started.%n");
}
注意1:延迟消息发送到broker的时候,broker会专门新建一个中转主题SCHEDULED_TOPIC_XXXX来存放消息,目前开原版只支持18个级别,相当于中转主题下存在18个队列文件分别存储这18个级别。同时broker后台开启个线程,只要延迟消息的时间到了,才会把延迟消息放置于真正的topic下。
注意2:开源版下的延迟消息并不适合高并发的延迟消息,若业务存在高并发的延迟消息,需要考虑使用商业版的RocketMQ。
注意3:客户端集群消息的消费来源于pullRequestQueue,pullRequestQueue中的消息来源在于客户端中存在一个线程从broker中主动pull。
注意4:客户端从namesrv同步信息周期30s,客户端与broker心跳周期30s,客户端心跳消费偏移量同步周期5s。
注意5:客户端执行失败的消息,客户端会发回到broker中,broker端会新建一个RETRY_TOPIC_XXXX来存储,大概10S后会再次发给客户端消费,默认16次。
6. 顺序消息
生产者
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ordered_group_name");
producer.setNamesrvAddr("192.168.241.198:9876");
producer.start();
String[] tags = new String[]{"TagA", "TagC", "TagD"};
// 订单列表
List<OrderStep> orderList = buildOrders();
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < 10; i++) {
// 加个时间前缀
String body = dateStr + " Hello RocketMQ "+ i + " " + orderList.get(i);
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i,
body.getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg; //根据订单id选择发送queue
long index = id % mqs.size();
return mqs.get((int) index);
}
}, orderList.get(i).getOrderId());//订单id
System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
}
producer.shutdown();
}
消费者
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ordered_group_name");
consumer.setNamesrvAddr("192.168.241.198:9876");
/**
* 设置消费位置
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// 可以看到每个queue有唯一的consume来消费, 订单对每个queue(分区)有序
try {
System.out.println("consumeThread=" + Thread.currentThread().getName() + ", queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
7. 事务消息
半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了消息队列 MQ 服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记。半事务消息会单独存储在HALF_TOPIC中。
消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确 认丢失,消息队列 MQ 服务端通过扫描发现某条消息长期处于“半事务消息”时,需要 主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该询问过程即 消息回查。
注意:事务消息中的实现在于product端与broker端是双向通信的,互为客户端和服务端
image.png生产者
private void testTransaction() throws MessagingException {
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg = MessageBuilder.withPayload("Hello RocketMQ " + i).
setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build();
/**
* TX_PGROUP_NAME 必须同 {@link TransactionListenerImpl} 类的注解 txProducerGroup
* @RocketMQTransactionListener(txProducerGroup = "myTxProducerGroup")
*/
SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(TX_PGROUP_NAME,
springTransTopic + ":" + tags[i % tags.length], msg, null);
System.out.printf("------ send Transactional msg body = %s , sendResult=%s %n",
msg.getPayload(), sendResult.getSendStatus());
Thread.sleep(10);
} catch (Exception e) {
e.printStackTrace();
}
}
}
监听
@RocketMQTransactionListener(txProducerGroup = "myTxProducerGroup")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<String, Integer>();
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String transId = (String)msg.getHeaders().get(RocketMQHeaders.PREFIX + RocketMQHeaders.TRANSACTION_ID);
System.out.printf("#### executeLocalTransaction is executed, msgTransactionId=%s %n",
transId);
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(transId, status);
if (status == 0) {
// 事务提交
System.out.printf(" # COMMIT # Simulating msg %s related local transaction exec succeeded! ### %n", msg.getPayload());
return RocketMQLocalTransactionState.COMMIT;
}
if (status == 1) {
// 本地事务回滚
System.out.printf(" # ROLLBACK # Simulating %s related local transaction exec failed! %n", msg.getPayload());
return RocketMQLocalTransactionState.ROLLBACK;
}
// 事务状态不确定,待Broker发起 ASK 回查本地事务状态
System.out.printf(" # UNKNOW # Simulating %s related local transaction exec UNKNOWN! \n");
return RocketMQLocalTransactionState.UNKNOWN;
}
/**
* 在{@link TransactionListenerImpl#executeLocalTransaction(org.springframework.messaging.Message, java.lang.Object)}
* 中执行本地事务时可能失败,或者异步提交,导致事务状态暂时不能确定,broker在一定时间后
* 将会发起重试,broker会向producer-group发起ask回查,
* 这里producer->相当于server端,broker相当于client端,所以由此可以看出broker&producer-group是
* 双向通信的。
* @param msg
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String transId = (String)msg.getHeaders().get(RocketMQHeaders.PREFIX + RocketMQHeaders.TRANSACTION_ID);
RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT;
Integer status = localTrans.get(transId);
if (null != status) {
switch (status) {
case 0:
retState = RocketMQLocalTransactionState.UNKNOWN;
break;
case 1:
retState = RocketMQLocalTransactionState.COMMIT;
break;
case 2:
retState = RocketMQLocalTransactionState.ROLLBACK;
break;
}
}
System.out.printf("------ !!! checkLocalTransaction is executed once," +
" msgTransactionId=%s, TransactionState=%s status=%s %n",
transId, retState, status);
return retState;
}
}
七、消息存储整体架构
image.png消息存储架构图中主要有下面三个跟消息存储相关的文件构成。
- CommitLog
消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容, 消息内容不是定长的。单个文件大小默认1G ,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为 1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始 偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一 个文件;
- ConsumeQueue
消息消费队列,引入的目的主要是提高消息消费的性能,由于 RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历 commitlog文件中根据topic检索消息是非常低效的。Consumer即可根据 ConsumeQueue来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset, 消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的 commitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层 组织结构,具体存储路径为: $HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样consumequeue文 件采取定长设计,每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节 的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访 问每一个条目,每个ConsumeQueue文件大小约5.72M;
- IndexFile
IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方 法。Index文件的存储位置是:{fileName},文件名fileName是以 创建时的时间戳命名的,固定的单个IndexFile文件大小:40+500Wx4+2000Wx20= 420000040个字节大小,约为400M,一个IndexFile可以保存 2000W个索引,IndexFile 的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现 为hash索引。
零拷贝刷盘
以文件下载为例,服务端的主要任务是:将服务端主机磁盘中的文件不做修改地从已连接的socket发出去。操作系统底层I/O过程如下图所示:
image.png
过程共产生了四次数据拷贝,在此过程中,我们没有对文件内容做任何修改,那么在内核空 间和用户空间来回拷贝数据无疑就是一种浪费,而零拷贝主要就是为了解决这种低效性。
什么是零拷贝技术?
零拷贝主要的任务就是避免CPU将数据从一块存储拷贝到另外一块存储,主要就是利用各种零拷贝技术,避免让CPU做大量的数据拷贝任务,减少不必要的拷贝,或者让别的组件 来做这一类简单的数据传输任务,让CPU解脱出来专注于别的任务。这样就可以让系统资源的利用更加有效。
原理是磁盘上的数据会通过DMA被拷贝的内核缓冲区,接着操作系统会把这段内核缓冲 区与应用程序共享,这样就不需要把内核缓冲区的内容往用户空间拷贝。应用程序再调用 write(),操作系统直接将内核缓冲区的内容拷贝到socket缓冲区中,这一切都发生在内核 态,最后,socket缓冲区再把数据发到网卡去。
image.png注意:连续的磁盘空间才不用经过用户空间的整合,而直接实现页缓存与socket缓冲区的共享,从而减少了内核空间到用户空间状态的转换,并且减少了2次内核空间与用户空间复制操作,进而提高了整个系统的性能。这就是rocketmq开辟磁盘空间的时候为什么选择直接开启足够大的磁盘空间文件进行存储消息的原因(CommitLog IndexFile)。
网友评论