消息中间件三大特性:异步、削峰、解耦
一、 安装部署
1.安装
官网下载,解压后直接运行(单Master)
#启动namesrv
nohup sh bin/mqnamesrv &
#查看日志
tail -f ~/logs/rocketmqlogs/namesrv.log
#先启动broker
nohup sh bin/mqbroker -n localhost:9876 &
查看日志
tail -f ~/logs/rocketmqlogs/broker.log
注意Broker的IP设置,手动修改属性
echo "brokerIP1=主机外网IP" > broker.conf
指定配置文件启动:nohup sh bin/mqbroker -n ${namesrvIp}:9876 -c conf/broker.conf
参考:https://github.com/apache/rocketmq#readme
2.部署模型
![](https://img.haomeiwen.com/i7885406/361de78cd328547c.png)
NameServer
NameServer是一个简单的 Topic 路由注册中心,支持 Topic、Broker 的动态注册与发现, NameServer是无状态化,节点之间无任何信息同步。
主要包括两个功能:
- Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活
- 路由信息管理,每个NameServer将保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息。Producer和Consumer通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。
Broker
Broker主要负责消息的存储、投递和查询以及服务高可用保证. Broker采用Master-Slave架构解决高可用问题,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master可以多个,提升消息写入能力
1.每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer
2.Producer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取Topic路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态
3.Consumer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的Master、Slave 建立长连接,且定时向 Master、Slave发送心跳。Consumer 既可以从 Master 订阅消息,也可以从Slave订阅消息。
部署方式
- 单Master模式:只有一个节点,一般用于测试环境
- 多Master模式:多个Master,提升负载能力,但是不部署Slave,单台Master故障,该Master上未被消费的消息在机器恢复之前不可消费
- 多Master-Slave异步复制:解决单台Master故障问题,消费者可以从Slave消费,由于是异步,主备有短暂消息延迟(毫秒级)
- 多Master-Slave同步复制: 同上,只是采用主备采用同步机制,性能略低10%左右,发送单个消息RT会略高,但是消息无延迟
一般生产环境采用的是多Master-Slave异步复制模式
二、消息模型
![](https://img.haomeiwen.com/i7885406/fb0f474bededc50d.png)
消费者组
为了消息消费能力的水平扩展,引入消费者组。
消费模式
同一个消费者组的消费模式:
- 集群模式(默认)
同一个 ConsumerGroup 中的 Consumer 实例是负载均衡消费,即任意一条消息只需要被消费组内的任意一个消费者处理即可 - 广播模式
同一个 ConsumerGroup 中的每个 Consumer 实例都处理全部的队列
1.集群模式下扩缩消费者数量也无法提升或降低消费能力,但当Topic的总队列数小于消费者的数量时,消费者将分配不到队列,即使消费者再多也无法提升消费能力
3.广播模式下扩缩消费者数量也无法提升或降低消费能力
- 支持推Push和拉Pull两种模式
三、基本概念
队列
为了支持高并发和水平扩展,需要对 Topic 进行分区,称为队列。一个 Topic 可能有多个队列,并且可能分布在不同的 Broker 上
主题Topic&Tag
Topic 与 Tag 都是业务上用来归类的标识,区别在于 Topic 是一级分类,而 Tag 可以理解为是二级分类。可使用 Tag 可以实现对 Topic 中的消息进行过滤。通常情况下,不同的 Topic 之间的消息没有必然的联系,而 Tag 则用来区分同一个 Topic 下相互关联的消息
Keys
消息的唯一标识码,方便定位消息丢失问题。
消费位点
每个队列都会记录自己的最小位点、最大位点,针对消费者组,会有消费位点,在集群模式下,消费位点是由客户端提给交服务端保存的,在广播模式下,消费位点是由客户端自己保存的
四、代码示例
消息发送分为三种方式:同步、异步、单向传输,前两种是可靠的,无论是否成功都有相应,最后一种只管发送没有返回结果
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 初始化一个producer并设置Producer group name
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); //(1)
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876"); //(2)
// 启动producer
producer.start();
for (int i = 0; i < 100; i++) {
// 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
); //(3)
// 利用producer进行发送,并同步等待发送结果
SendResult sendResult = producer.send(msg); //(4)
System.out.printf("%s%n", sendResult);
}
// 一旦producer不再使用,关闭producer
producer.shutdown();
}
}
Push消费
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 初始化consumer,并设置consumer group name
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// 设置NameServer地址
consumer.setNamesrvAddr("localhost:9876");
//设置消费模式,默认是集群模式
//consumer.setMessageModel(MessageModel.BROADCASTING);
//订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息
consumer.subscribe("TopicTest", "*");
//注册回调接口来处理从Broker中收到的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(new String(msgs.get(0).getBody(),StandardCharsets.UTF_8))
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS为消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动Consumer
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
Pull 消费
public class LitePullConsumerSubscribe {
public static volatile boolean running = true;
public static void main(String[] args) throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
litePullConsumer.subscribe("TopicTest", "*");
litePullConsumer.setPullBatchSize(20);
litePullConsumer.start();
try {
while (running) {
List<MessageExt> messageExts = litePullConsumer.poll();
System.out.printf("%s%n", messageExts);
}
} finally {
litePullConsumer.shutdown();
}
}
}
控制台
rocketmq-dashboard
官网:https://github.com/apache/rocketmq-dashboard
官网:https://rocketmq.apache.org/
Github: https://github.com/apache/rocketmq
网友评论