- 1 常见MQ比较
- 2.RabbitMQ介绍
- 3 RabbitMQ高级特性
- 4.RabbitMQ高级整合
- 5.1 了解RabbitMQ集群架构模式
- 5.2 从零开始构建高可靠RabbitMQ集群
- 5.3 集群配置与集群运维故障\失败转移
- 5.4 高级插件的使用
- 6.大厂SET架构演化
- 7.大厂MQ实现思路和设计方案
(吐槽:简书markdown居然不支持TOC,建议ctrl+f手动搜索标题跳转)
1 常见MQ比较
ActiveMQ
老牌,大数据高并发性能较差
Kafka
大数据方向,性能最高,不支持事务,可靠性差
单机每秒100k
仅使用内存 pagecache
架构模式 zk协调,node复制
RocketMQ
被阿里应用于交易\流计算\日志处理\消息推送,Java实现
可靠\事务\高性能
商业版收费
RabbitMQ
基于AMQP协议,Erlang语言实现
可靠性好,性能优于ActiveMQ
HA-proxy KeepAlived做高可用负载均衡
2.RabbitMQ介绍
2.1 介绍
滴滴\美团\头条\去哪儿...等大厂在用
与Spring AMQP完美整合
集群模式丰富
数据不丢失前提下实现高可靠\高可用
高性能原因
Erlang最初应用于交换机领域,有和Socket一样的延迟
2.2 AMQP协议
高级消息队列协议
二进制协议,统一应用层标准
生产者 消费者 Server Virtual Host Exchange Message Queue
2.3 AMQP核心概念
Server
接受Client连接
Channel
网络信道,所有操作都在Channel进行,每个Channel代表一个会话任务
Message
传送的数据,由Properties和Body组成
Virtual Host
逻辑隔离,同VH中不能有相同名称的Exchange和Queue
Exchange
根据路由键转发消息到队列
Binding
Exchange和Queue的虚拟连接
Routing Key
路由规则,如何路由消息
Queue
消息队列,保存消息转发给消费者
2.4 RabbitMQ整体架构
生产者-Exchange->Queue->消费者
发送消息需要指定Exchange和Routing Key
2.5 RabbitMQ安装使用
Erlang与RabbitMQ版本要符合
这里基于CentOS 7
# 安装所需依赖
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz lsof vim -y
# 安装Erlang
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
# 安装Socat(密钥)
rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm --force --nodeps
# 安装RabbitMQ
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
# 配置文件修改
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
{loopback_users, ["guest"]},
# 启动 &表示后台启动
rabbitmq-server start &
# 服务停止
# rabbitmqctl stop-app
# 查看插件
rabbitmq-plugins list
# 安装管控台插件
rabbitmq-plugins enable rabbitmq_management
# 浏览器访问 用户名密码都输入guest
http://node-01:15672/
2.6 命令行与管控台
基础操作
# 启动应用
rabbitmqctl start_app
# 关闭应用
rabbitmqctl stop_app
# 节点状态
rabbitmqctl status
# 添加用户
rabbitmqctl add_user username password
# 列出所有用户
rabbitmqctl list_users
# 删除用户
rabbitmqctl delete_user username
# 清除用户权限
rabbitmqctl clear_permissions -p vhostpath username
# 列出用户权限
rabbitmqctl list_user_permissions username
# 修改密码
rabbitmqctl change_password username newpassword
# 设置用户权限
rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*"
# 创建虚拟主机
rabbitmqctl add_vhost vhostpath
# 列出所有虚拟主机
rabbitmqctl list_vhosts
# 列出虚拟主机上所有权限
rabbitmqctl list_permissions -p vhostpath
# 删除虚拟主机
rabbitmqctl delete_vhost vhostpath
# 查看所有队列
rabbitmqctl list_queues
# 清除队列中信息
rabbitmqctl -p vhostpath purge_queue blue
高级操作
# 移除所有数据,要在stop后使用
rabbitmqctl reset
# 组成集群 --ram是内存级别存储
rabbitmqctl join_cluster <clusternode> [--ram]
# 查看集群状态
rabbitmqctl cluster_status
# 修改集群节点存储形式
rabbitmqctl change_cluster_node_type disc | ram
# 忘记节点(比如启动不起来的节点,失败转移)
rabbitmqctl forget_cluster_node [--offline]
实际操作
# 查看服务是否启动
lsof -i:5672
rabbitmqctl list_queues
# 默认会有一个 /
rabbitmqctl list_vhosts
# 状态
rabbitmqctl status
控制台访问
http://node-01:15672/
Exchages标签页
amq.direct 直连
amq.fanout 广播
amq.topic 主题订阅
Features D表示持久化
2.7 生产者消费者模型
2.7.1 quickstart
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.14.RELEASE</version>
</parent>
<groupId>cn.dfun</groupId>
<artifactId>rabbitmq-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
生产者
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.68.101");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 2.通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
// 3.通过Connection创建Channel
Channel channel = connection.createChannel();
// 4.通过Channel发送数据
String msg = "Hello RabbitMQ!";
for (int i = 0; i <5 ; i++) {
// 交换机/routing key,default exchange根据routing key匹配queue名称
channel.basicPublish("", "test001", null, msg.getBytes());
}
// 5.关闭连接
channel.close();
connection.close();
}
}
消费者
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 1.创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.68.101");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 2.通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
// 3.通过Connection创建Channel
Channel channel = connection.createChannel();
// 4.声明(创建一个队列) 持久化/非独占(独占可以保证顺序消费)/不自动删除/无扩展参数
String queueName = "test001";
channel.queueDeclare(queueName, true, false, false, null);
// 5.创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
// 6.设置Channel 不自动签收
channel.basicConsume(queueName, true, queueingConsumer);
// 7.获取消息 无参方法一直阻塞
while(true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("消费端: " + msg);
// envelope可设置手动签收\获取exchange等
// Envelope envelope = delivery.getEnvelope();
}
}
}
2.8 Exchange交换机
Type
类型 direct topic fanout headers
Durability
是否持久化
Auto Delete
最后一个绑定到Exchange的Queue删除后,自动删除该Exchange
2.8.1 Direct Exchange
routing key需完全匹配
生产者
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.68.101");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 2.通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
// 3.通过Connection创建Channel
Channel channel = connection.createChannel();
// 4.声明
String exchangeName = "test direct exchange";
String routingKey = "test.direct";
// 5.发送
String msg = "Hello Direct Exchange...";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
}
}
消费者
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.68.101");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 自动重连
connectionFactory.setAutomaticRecoveryEnabled(true);
// 每3秒重连一次
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 5.声明
String exchangeName = "test direct exchange";
String exchangeType = "direct";
String queueName = "test direct queue";
String routingKey = "test.direct";
// 声明交换机
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 声明队列
channel.queueDeclare(queueName, false, false, false, null);
// 绑定
channel.queueBind(queueName, exchangeName, routingKey);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);
while(true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("消费端: " + msg);
}
}
}
2.8.2 Topic Exchange
消息被转发到所有关心routing key中指定topic的queue上
routing key和topic模糊匹配
队列需要绑定一个topic
可以使用通配符
# 匹配一个或多个词
* 匹配一个词
生产者
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.68.101");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 声明
String exchangeName = "test_topic_exchange";
String routingKey1 = "user.save";
String routingKey2 = "user.update";
String routingKey3 = "user.delete.abc";
String msg = "Hello Topic Exchange...";
channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());
channel.close();
connection.close();
}
}
消费者
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.68.101");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 声明
String exchangeName = "test_topic_exchange";
String exchangeType = "topic";
String queueName = "test_topic_queue";
String routingKey = "user.*";
// String routingKey = "user.#";
// 声明交换机
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 声明队列
channel.queueDeclare(queueName, false, false, false, null);
// 交换机和队列绑定
channel.queueBind(queueName, exchangeName, routingKey);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);
while(true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("消费端: " + msg);
}
}
}
使用user.# 3条消息都接收
使用user.* 只接收2条消息
2.8.3 Fanout Exchange
不处理路由键,只需将队列绑定到交换机
消息被转发到与该交换机绑定的所有队列
Fanout转发消息是最快的
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.68.101");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 声明
String exchangeName = "test_fanout_exchange";
for (int i = 0; i < 10; i++) {
String msg = "Hello Fanout Exchange...";
channel.basicPublish(exchangeName, "", null, msg.getBytes());
}
channel.close();
connection.close();
}
}
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.68.101");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 声明
String exchangeName = "test_fanout_exchange";
String exchangeType = "fanout";
String queueName = "test_fanout_queue";
// 不设置路由键
String routingKey = "";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);
while(true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("消费端: " + msg);
}
}
}
2.9 绑定\队列\消息\虚拟主机详解
Binding-绑定
Exchange和Exchange\Queue之间的连接关系
Binding包含routing key
Queue-队列
是否持久化 是否自动删除
Message-消息
Properties和Body组成
常用属性: 送达模式\headers(自定义属性)
其他属性: content_type\content_encoding\priorty
correlation_id(唯一id\幂等)\expiration(过期时间)...
指定message属性
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.68.101");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 自定义属性
Map<String, Object> headers = new HashMap<>();
headers.put("my1", "111");
headers.put("my2", "222");
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
// 未被消费server重启后消失
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("15000")
.headers(headers)
.build();
String msg = "Hello RabbitMQ!";
for (int i = 0; i <5 ; i++) {
// 指定properties
channel.basicPublish("", "test001", properties, msg.getBytes());
}
// 5.关闭连接
channel.close();
connection.close();
}
}
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.68.101");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String queueName = "test001";
channel.queueDeclare(queueName, true, false, false, null);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);
while(true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("消费端: " + msg);
// 获取properties
Map<String, Object> headers = delivery.getProperties().getHeaders();
System.out.println("headers get my1:" + headers.get("my1"));
}
}
}
Virtual Host-虚拟主机
逻辑隔离
同一Virtual Host不能有相同的Exchange和Queue
3 RabbitMQ高级特性
3.1 保证100%投递成功
生产端可靠性投递
保障成功发出
保障MQ成功接收
发送端收到MQ确认应答
消息补偿机制
大厂主流解决方案
a.消息落库,对状态打标

高并发场景很少用分布式事务,而使用消息补偿
业务数据(如订单)和消息落库
消息发出后使用Confirm Listener监听,一段时间后未接受应答则过期,分布式定时任务重发消息
设置重试次数,超出则设为失败,补偿系统解决
由于存在数据持久化操作,不适于高并发场景
b.延迟投递,二次确认,回调检查

生产者第一次发送前业务信息落库
紧接着发送延迟检查消息
消费端处理后发送确认消息
回调服务监听消费确认消息,做消息入库
回调服务发送检查消息
3.2 幂等性
回顾乐观锁设置版本号
并行执行多次相同操作结果相同
消费端实现幂等避免消息重复消费
业界主流幂等
a.唯一ID + 指纹码
利用数据库主键去重
优点: 实现简单
缺点: 高并发瓶颈
解决: 跟进ID进行分库分表进行算法路由
b.利用Redis原子性
问题:
如果落库,数据库和缓存如何做到原子性
如果不落库,都存储到缓存,如何设置定时同步策略,缓存可靠性问题
3.3 Confirm确认消息
Broker收到消息对生产者发送应答
生产端Confirm Listener监听应答
异步操作
步骤:
1.channel开启确认模式
2.channel添加监听
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.68.101");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 指定消息确认模式
channel.confirmSelect();
String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.save";
String msg = "Hello Confirm...";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long l, boolean b) throws IOException {
System.out.println("-------ack-----------");
}
@Override
public void handleNack(long l, boolean b) throws IOException {
System.out.println("-------no ack-----------");
}
});
}
}
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.68.101");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.#";
String queueName = "test_confirm_queue";
channel.exchangeDeclare(exchangeName, "topic", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
// 自动签收
channel.basicConsume(queueName, true, queueingConsumer);
while(true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("消费端:" + msg);
}
}
}
如果ack和no ack都没收到,则使用定时任务重新发送消息
3.4 Return消息机制
Return Listener处理一些不可路由的消息
配置项:
Mandatory 设置为true, 监听器会接收到不可路由的消息
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.68.101");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_return_exchange";
String routingKey = "return.save";
String routingKeyError = "abc.save";
String msg = "Hello Return...";
// Mandatory设为true路由不到不会删除
channel.basicPublish(exchangeName, routingKeyError, true, null, msg.getBytes());
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
System.out.println("-------handle return-----------");
System.out.println("replyCode: " + i);
System.out.println("replyText: " + s);
System.out.println("exchange: " + s1);
System.out.println("routingKey: " + s2);
System.out.println("properties: " + basicProperties);
System.out.println("body: " + new String(bytes));
}
});
}
}
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.68.101");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_return_exchange";
String routingKey = "return.#";
String queueName = "test_return_queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);
while(true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("消费端: " + msg);
}
}
}
3.5 消费端自定义监听
自定义Consumer类继承DefaultConsumer重写handleDelivery方法
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.68.101");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_consumer_exchange";
String routingKey = "consumer.save";
String msg = "Hello Consumer Listener...";
for (int i = 0; i < 5; i++) {
channel.basicPublish(exchangeName, routingKey, false, null, msg.getBytes());
}
}
}
public class MyConsumer extends DefaultConsumer {
// 继承DefaultConsumer
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 重写该方法
String msg = new String(body);
System.out.println(msg);
}
}
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.68.101");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_consumer_exchange";
String routingKey = "consumer.#";
String queueName = "test_consumer_queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
// 自定义监听
channel.basicConsume(queueName, true, new MyConsumer(channel));
}
}
3.6 消费端限流
假设Rabbitmq服务区有上万条未处理消息
巨量的消息瞬间推送,单个客户端无法同时处理
生产端一般无法做限流,对消费端限流
qos-服务质量保证功能
在非自动确认前提下,一定数目消息未被确认,不消费新的消息
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.68.101");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_qos_exchange";
String routingKey = "qos.save";
String msg = "Hello Consumer Listener...";
for (int i = 0; i < 5; i++) {
channel.basicPublish(exchangeName, routingKey, false, null, msg.getBytes());
}
}
}
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.68.101");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_qos_exchange";
String routingKey = "qos.#";
String queueName = "test_qos_queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
// ack前消费条数,实际工作中设为1
channel.basicQos(0, 1, false);
// autoAck设为false手工签收
channel.basicConsume(queueName, false, new MyConsumer(channel));
}
}
public class MyConsumer extends DefaultConsumer {
private Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 重写该方法
String msg = new String(body);
System.out.println(msg);
// 取到消息标签/不支持批量签收
System.out.println("----consumer ack---------");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
3.7 消费端ACK与重回队列
消费端ACK
消费者进行消费时,如果由于业务异常可以记录日志,然后进行补偿
如果由于服务器宕机,则需要手工ACK保障消费端消费成功
重回队列
失败的消息重新投递给Broker
实际中一般都会关闭重回队列
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.68.101");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_ack_exchange";
String routingKey = "ack.save";
for (int i = 0; i < 5; i++) {
String msg = "Hello ACK Listener..." + i;
// 设置属性
Map<String, Object> headers = new HashMap<>();
headers.put("num", i);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
// 持久化
.deliveryMode(2)
.contentEncoding("UTF-8")
.headers(headers)
.build();
channel.basicPublish(exchangeName, routingKey, true, properties, msg.getBytes());
}
}
}
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.68.101");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_ack_exchange";
String routingKey = "ack.#";
String queueName = "test_ack_queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
// 自定义监听
channel.basicConsume(queueName, false, new MyConsumer(channel));
}
}
public class MyConsumer extends DefaultConsumer {
private Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 重写该方法
String msg = new String(body);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(msg);
if((Integer)properties.getHeaders().get("num") == 0) {
// 不批量签收/重回队列
channel.basicNack(envelope.getDeliveryTag(), false, true);
} else {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
}
3.8 TTL-队列消息
Time To Live,即生存时间
RabbitMQ支持消息和队列的过期时间
通过管控台设置消息过期时间:

3.9 DLX-死信队列
任何MQ都有死信队列
Dead-Letter-Exchange
消息在一个队列中变成死信之后,被重新public到另一个Exchange,这个Exchange就是DLX
死信情况:
消息被拒绝或者nack,并且不重回队列
消息TTL过期
队列到达最大长度
DLX就是一个正常的Exchange,能在任何Exchange上被指定,可以进行监听
设置:
Exchange: dlx.exchange
Queue: dlx.queue
RoutingKey: #
队列加扩展参数: arguments.put("x-dead-letter-exchange", "dlx.exchange)
生产者
String exchangeName = "test_dlx_exchange";
String routingKey = "dlx.save";
String msg = "Hello DLX...";
for (int i = 0; i < 5; i++) {
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
// 10秒过期,转到死信队列
.expiration("10000")
.build();
channel.basicPublish(exchangeName, routingKey, false, properties, msg.getBytes());
}
消费者
// 这里是普通的交换机\队列\路由,而非DLX
String exchangeName = "test_dlx_exchange";
String routingKey = "dlx.#";
String queueName = "test_dlx_queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
// DLX设置
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "dlx.exchange");
channel.queueDeclare(queueName, true, false, false, arguments);
channel.queueBind(queueName, exchangeName, routingKey);
// 声明DLX
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "#");
启动消费端,然后停止,再启动生产端
可以看到10s后,消息由test_dlx_queue队列转到dlx.queue队列

4.RabbitMQ高级整合
4.1 RabbitMQ整合Spring AMQP
RabbitAdmin 管控
SpringAMQP 声明 通过注解注入
RabbitTemplate
SimpleMessageListenerContainer
MessageListenerAdapter
MessageConverter 转换器,序列/反序列化
4.1.1 RabbitAdmin
autoStartup设为true,否则Spring容器不会加载
底层是从Spring容器中获取Exchange\Binding\RoutingKey及Queue的Bean
使用RabbitTemplate的execute方法执行RabbitMQ的基础功能操作
代码
新建Spring Boot项目,版本1.5.14.RELEASE
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
配置
@Configuration
@ComponentScan({"cn.dfun.demo.rabbitmqspring.*"})
public class RabbitMQConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("192.168.68.101:5672");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}
测试
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMQTest {
@Resource
private RabbitAdmin rabbitAdmin;
@Test
public void testAdmin(){
// 声明交换机
rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));
rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));
rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));
// 声明队列
rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));
rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));
rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));
// 绑定
rabbitAdmin.declareBinding(new Binding("test.direct.queue",
Binding.DestinationType.QUEUE,
"test.direct",
"direct", new HashMap<>()));
rabbitAdmin.declareBinding(BindingBuilder
// 绑定时创建队列
.bind(new Queue("test.topic.queue", false))
// 绑定时创建交换机
.to(new TopicExchange("test.topic", false, false))
// 指定路由键
.with("user.#")
);
rabbitAdmin.declareBinding(BindingBuilder
// 绑定时创建队列
.bind(new Queue("test.fanout.queue", false))
// 绑定时创建交换机,Fanout无需指定路由键
.to(new FanoutExchange("test.fanout", false, false))
);
// 清空队列
rabbitAdmin.purgeQueue("test.topic", false);
}
}
4.1.2 Spring AMQP声明
使用@Bean方式声明Exchange\Queue\Binding
@Configuration
@ComponentScan({"cn.dfun.demo.rabbitmqspring.*"})
public class RabbitMQConfig {
...
@Bean
public TopicExchange exchange001() {
return new TopicExchange("topic001", true, false);
}
@Bean
public Queue queue001() {
// 队列持久
return new Queue("queue001", true);
}
@Bean
public Binding binding001() {
return BindingBuilder.bind(queue001()).
to(exchange001()).with("spring.*");
}
}
4.1.3 消息模板-RabbitTemplate
发送消息的关键类
提供可靠性消息\回调监听\返回值确认等丰富的消息发送方法
需要注入到Spring容器中
配置
@Configuration
@ComponentScan({"cn.dfun.demo.rabbitmqspring.*"})
public class RabbitMQConfig {
...
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 可以添加监听等
return rabbitTemplate;
}
}
测试方法
@Test
public void testSendMessage() throws Exception {
// 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.getHeaders().put("desc", "信息描述");
messageProperties.getHeaders().put("type", "自定义消息类型");
Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
// 发送
rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
System.out.println("---添加额外设置---");
message.getMessageProperties().getHeaders().put("desc", "额外修改信息描述");
message.getMessageProperties().getHeaders().put("attr", "额外新加属性");
return message;
}
});
}
@Test
public void testSendMessage2() throws Exception {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Message message = new Message("Hello 1234".getBytes(), messageProperties);
// convertAndSend消息体可以直接发送Object
rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message");
// send方法必须传Message类型
rabbitTemplate.send("topic001", "spring.amqp", message);
}
4.1.4 简单消息监听容器-SimpleMessageListenerContainer
监听队列(多个) 自动启动 自动声明功能
事务特性
设置消费者数量
设置签收模式 是否重回队列 异常捕获
设置监听器 消息转换器等
可以在应用运行中动态设置,很多RabbitMQ自定制管控台也是根据这一特性实现的
配置
@Configuration
@ComponentScan({"cn.dfun.demo.rabbitmqspring.*"})
public class RabbitMQConfig {
...
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
// 可设置多个队列
container.setQueues(queue001());
// 消费者并发数
container.setConcurrentConsumers(1);
// 最大消费者并发数
container.setMaxConcurrentConsumers(5);
// 不重回队列
container.setDefaultRequeueRejected(false);
// 自动签收
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
// 标签策略
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
// 监听
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
// 消息处理
String msg = new String(message.getBody());
System.out.println("---消费者---" + msg);
}
});
return container;
}
}
4.1.5 消息监听适配器-MessageListenerAdapter
a.使用适配器对消息体格式转换
配置
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
...
// 自定义委派消息
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
// 指定方法名,如不指定则调用handleMessage默认方法
adapter.setDefaultListenerMethod("consumeMessage");
// 自定义转换器
adapter.setMessageConverter(new TextMessageConverter());
container.setMessageListener(adapter);
return container;
}
委派消息类
public class MessageDelegate {
public void handleMessage(byte[] messageBody) {
System.out.println("默认方法, 消息内容: " + new String(messageBody));
}
public void consumeMessage(byte[] messageBody) {
System.out.println("字节数组方法, 消息内容: " + new String(messageBody));
}
public void consumeMessage(String messageBody) {
System.out.println("字符串方法, 消息内容: " + messageBody);
}
}
消息转换器
a.对包含text属性的消息体进行类型转换
这里转换为String类型
public class TextMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
return new Message(o.toString().getBytes(), messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
String contentType = message.getMessageProperties().getContentType();
if(null != contentType && contentType.contains("text")) {
return new String(message.getBody());
}
return message.getBody();
}
}
b.队列名称和方法名称也可以一一指定
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
...
Map<String, String> queueOrTagToMethodName = new HashMap<>();
queueOrTagToMethodName.put("queue001", "method1");
queueOrTagToMethodName.put("queue002", "method2");
adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
container.setMessageListener(adapter);
return container;
}
4.1.6 消息转换器-MessageConverter
正常情况消息体为二进制形式,如希望内部转换或自定义转换器可使用MessageConverter
自定义转换器,实现MessageConverter接口,重写toMessage和fromMessage方法
Json转换器
java对象映射关系转换
自定义二进制转换器 如图片\PDF\流媒体
代码:
a.json转换器
实体类
public class Order {
private String id;
private String name;
private String content;
...
}
public class Packaged {
private String id;
private String name;
private String description;
...
}
配置类
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
// 指定json转换器
adapter.setMessageConverter(jackson2JsonMessageConverter);
委派消息类
public void consumeMessage(Map messageBody) {
// 使用Map接收Json类型的消息
System.out.println("map方法, 消息内容: " + messageBody);
}
测试类
@Test
public void testSendJsonMessage() throws Exception {
Order order = new Order();
order.setId("001");
order.setName("消息订单");
order.setContent("描述信息");
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(order);
System.out.println("order json" + json);
MessageProperties messageProperties = new MessageProperties();
// 注意这里一定要设置
messageProperties.setContentType("application/json");
Message message = new Message(json.getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.order", message);
}
b.发送Java对象消息
配置类
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
委托消息类
public void consumeMessage(Order order) {
System.out.println("order对象, 消息内容: id=" + order.getId());
}
测试
@Test
public void testSendJavaMessage() throws Exception {
Order order = new Order();
order.setId("001");
order.setName("消息订单");
order.setContent("描述信息");
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(order);
System.out.println("order json" + json);
MessageProperties messageProperties = new MessageProperties();
// 注意这里一定要设置
messageProperties.setContentType("application/json");
messageProperties.getHeaders().put("__TypeId__", "cn.dfun.demo.rabbitmqspring.entity.Order");
Message message = new Message(json.getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.order", message);
}
c.发送多个类型的java对象
配置类
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
Map<String, Class<?>> idClassMapping = new HashMap<>();
idClassMapping.put("order", Order.class);
idClassMapping.put("packaged", Packaged.class);
javaTypeMapper.setIdClassMapping(idClassMapping);
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
委派消息类
public void consumeMessage(Order order) {
System.out.println("order对象, 消息内容: id=" + order.getId());
}
public void consumeMessage(Packaged packaged) {
System.out.println("packaged对象, 消息内容: id=" + packaged.getId());
}
测试
@Test
public void testSendMappingMessage() throws Exception {
Order order = new Order();
order.setId("001");
order.setName("订单消息");
order.setContent("描述信息");
ObjectMapper mapper = new ObjectMapper();
String json1 = mapper.writeValueAsString(order);
System.out.println("order json" + json1);
MessageProperties messageProperties1 = new MessageProperties();
// 注意这里一定要设置
messageProperties1.setContentType("application/json");
messageProperties1.getHeaders().put("__TypeId__", "order");
Message message1 = new Message(json1.getBytes(), messageProperties1);
rabbitTemplate.send("topic001", "spring.order", message1);
Packaged packaged = new Packaged();
packaged.setId("001");
packaged.setName("包裹消息");
packaged.setDescription("包裹描述信息");
String json2 = mapper.writeValueAsString(packaged);
System.out.println("packaged json" + json2);
MessageProperties messageProperties2 = new MessageProperties();
// 注意这里一定要设置
messageProperties2.setContentType("application/json");
messageProperties2.getHeaders().put("__TypeId__", "packaged");
Message message2 = new Message(json2.getBytes(), messageProperties2);
rabbitTemplate.send("topic001", "spring.packaged", message2);
}
d.多种格式转换
对文本\json\图片\pdf分别指定转换器
配置
ContentTypeDelegatingMessageConverter converter = new ContentTypeDelegatingMessageConverter();
// 文本转换器
TextMessageConverter textConverter = new TextMessageConverter();
converter.addDelegate("text", textConverter);
converter.addDelegate("html/text", textConverter);
converter.addDelegate("xml/text", textConverter);
converter.addDelegate("text/plain", textConverter);
// json转换器
Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter();
converter.addDelegate("json", jsonConverter);
converter.addDelegate("application/json", jsonConverter);
// 图片转换器
ImageMessageConverter imageConverter = new ImageMessageConverter();
converter.addDelegate("image/png", imageConverter);
converter.addDelegate("image", imageConverter);
// pdf转换器
PDFMessageConverter pdfConverter = new PDFMessageConverter();
converter.addDelegate("application/pdf", pdfConverter);
adapter.setMessageConverter(converter);
配置图片和pdf对应的queue
@Bean
public Queue queue_image() {
return new Queue("image_queue", true); //队列持久
}
@Bean
public Queue queue_pdf() {
return new Queue("pdf_queue", true); //队列持久
}
配置上述queue
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
// 可设置多个队列
container.setQueues(queue001(), queue_image(), queue_pdf());
...
图片和PDF转换器
public class ImageMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
throw new MessageConversionException(" convert error ! ");
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
System.out.println("-----------Image MessageConverter");
Object _extName = message.getMessageProperties().getHeaders().get("extName");
String extName = _extName == null ? "png" : _extName.toString();
byte[] body = message.getBody();
String fileName = UUID.randomUUID().toString();
String path = "d:/dat/rabbit/" + fileName + "." + extName;
File f = new File(path);
try {
// 消息体字节流写入文件
Files.copy(new ByteArrayInputStream(body), f.toPath());
} catch (IOException e) {
e.printStackTrace();
}
return f;
}
}
public class PDFMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
throw new MessageConversionException(" convert error ! ");
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
System.out.println("-----------Pdf MessageConverter");
byte[] body = message.getBody();
String fileName = UUID.randomUUID().toString();
String path = "d:/dat/rabbit/" + fileName + ".pdf";
File f = new File(path);
try {
Files.copy(new ByteArrayInputStream(body), f.toPath());
} catch (IOException e) {
e.printStackTrace();
}
return f;
}
}
消息委派类添加方法
public void consumeMessage(File file) {
System.out.println("文件对象, 消息内容: " + file.getName());
}
测试
@Test
public void testSendExtConverterMessage() throws Exception {
/* byte[] body = Files.readAllBytes(Paths.get("d:/img", "jiedao.png"));
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("image/png");
messageProperties.getHeaders().put("extName", "png");
Message message = new Message(body, messageProperties);
rabbitTemplate.send("", "image_queue", message);*/
byte[] body = Files.readAllBytes(Paths.get("d:/doc", "2017alitech_01.pdf"));
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/pdf");
Message message = new Message(body, messageProperties);
rabbitTemplate.send("", "pdf_queue", message);
}
4.2 RabbitMQ整合Spring Boot
4.2.1 生产端配置
publisher-confirms
实现一个监听器用于监听Broker端返回的确认请求
publisher-returns
保证消息对Broker端是可达的
注意设置mandatory=true保证监听有效
生产端还可以设置其他属性,如发送重试,超时时间\次数\间隔等
代码:
新建springboot项目rabbitmq-springboot-producer
新建Exchange:exchage-01,RoutingKey:springboot.#,新建Queue:queue-1,并将exchange-1与queue-1相绑定
配置
application.properties
spring.rabbitmq.addresses=192.168.68.101:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
消息发送服务类
cn.dfun.demo.rabbitmq.springboot.producer.service.RabbitSender
@Component
public class RabbitSender {
@Autowired
private RabbitTemplate rabbitTemplate;
final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("correlationData: " + correlationData);
System.out.println("ack: " + ack);
if(!ack) {
// 可靠性投递更新数据库
System.out.println("异常处理...");
}
}
};
final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("return: exchange=" + exchange + " , routingKey=" + routingKey
+ ", replyCode" + replyCode + ", replyText" + replyText);
}
};
public void send(Object message, Map<String, Object> properties) throws Exception {
MessageHeaders messageHeader = new MessageHeaders(properties);
Message msg = MessageBuilder.createMessage(message, messageHeader);
// 确认模式
rabbitTemplate.setConfirmCallback(confirmCallback);
// 返回模式
rabbitTemplate.setReturnCallback(returnCallback);
// 全局唯一id +时间戳(实际消息id)
CorrelationData cd = new CorrelationData("123456789");
// 路由失败情况
rabbitTemplate.convertAndSend("exchange-1", "spring.hello", msg, cd);
// rabbitTemplate.convertAndSend("exchange-1", "springboot.hello", msg, cd);
}
}
测试
cn.dfun.demo.rabbitmq.springboot.producer.ApplicationTests
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {
@Resource
private RabbitSender rabbitSender;
// SimpleDateFormat有线程安全问题
private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
@Test
public void testSender() throws Exception {
Map<String, Object> properties = new HashMap<>();
properties.put("num", "12345");
properties.put("send_time", simpleDateFormat.format(new Date()));
rabbitSender.send("Hello RabbitMQ For Spring Boot", properties);
}
}
4.2.2 消费端配置
手工签收 并发数 最大并发数
手工签收保证消息的可靠性,可以在消息失败时重回队列\根据业务记录日志等
消费端最重要的是@RabbitMQListener注解,是一个组合注解,@QueueBinding @Queue @Exchange,一次性搞定消费端交换机\队列\绑定\路由\配置监听等
建议使用配置文件+注解
代码:
新建springboot项目rabbitmq-springboot-conumer,引入相关依赖
配置 application.properties
spring.rabbitmq.addresses=192.168.68.101:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10
接收消息类 cn.dfun.demo.rabbitmq.springboot.consumer.service.RabbitReceiver
@Component
public class RabbitReceiver {
@RabbitListener(bindings = @QueueBinding( // 注解形式会自动创建队列
value = @Queue(
value = "queue-1",
durable = "true"),
exchange = @Exchange(
value = "exchange-1",
durable = "true",
type = "topic",
ignoreDeclarationExceptions = "true"),
key = "springboot.*"
))
@RabbitHandler
public void onMessage(Message message, Channel channel) throws IOException {
System.out.println("---------------------");
// 根据实际规则转换
System.out.println("消费端: " + message.getPayload());
Long deleiveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
// 手工ACK
channel.basicAck(deleiveryTag, false);
System.out.println("");
}
}
4.2.3 生产端发送Java对象及消费端注解参数写入配置文件优化
消费端配置
spring.rabbitmq.listener.order.queue.name=queue-2
spring.rabbitmq.listener.order.queue.durable=true
spring.rabbitmq.listener.order.exchange.name=exchange-2
spring.rabbitmq.listener.order.exchange.durable=true
spring.rabbitmq.listener.order.exchange.type=topic
spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
spring.rabbitmq.listener.order.key=springboot.*
消费端接收方法
@RabbitListener(bindings = @QueueBinding( // 注解形式会自动创建队列
value = @Queue(
value = "${spring.rabbitmq.listener.order.queue.name}",
durable = "${spring.rabbitmq.listener.order.queue.durable}"),
exchange = @Exchange(
value = "${spring.rabbitmq.listener.order.exchange.name}",
durable = "${spring.rabbitmq.listener.order.exchange.durable}",
type = "${spring.rabbitmq.listener.order.exchange.type}",
ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
key = "${spring.rabbitmq.listener.order.key}"
))
@RabbitHandler
public void onOrderMessage(@Payload Order order, Channel channel,
@Headers Map<String, Object> headers) throws IOException {
System.out.println("---------------------");
// 根据实际规则转换
System.out.println("消费端: " + order.getId());
Long deleiveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
// 手工ACK
channel.basicAck(deleiveryTag, false);
}
生产端发送方法
public void sendOrder(Order order) throws Exception {
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
CorrelationData cd = new CorrelationData("987654321");
rabbitTemplate.convertAndSend("exchange-2", "springboot.hello", order, cd);
}
生产端测试方法
@Test
public void testOrderSender() throws Exception {
Order order = new Order("001", "第一个订单");
rabbitSender.sendOrder(order);
}
实体类,生产\消费端各copy一份
public class Order implements Serializable {
private String id;
private String name;
...
}
4.3 RabbitMQ整合Spring Cloud Stream
Spring Cloud Stream生产消费端可以使用不同MQ(RabbitMQ\Kafka),上层做了抽象
Barista接口
定义通道类型和通道名称,通道类型决定app使用这一通道发送还是接受消息
@Output注解
定义发送消息接口
@Input
定义消费者接口
@StreamListener
定义监听方法
使用Spring Cloud Stream只需用好这三个注解即可,适合高性能场景,但不能实现可靠性投递(为了兼容Kafka?)
代码:
a.建立spring boot父工程rabbitmq-springcloudstream
依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.8.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<version>1.3.4.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
b.建立两个子模块生产者rabbitmq-springcloudstream-producer和消费者rabbitmq-springcloudstream-consumer
c.生产者
配置 application.properties
server.port=8001
server.servlet.context-path=/producer
spring.application.name=producer
spring.cloud.stream.bindings.output_channel.destination=exchange-3
spring.cloud.stream.bindings.output_channel.group=queue-3
spring.cloud.stream.bindings.output_channel.binder=rabbit_cluster
spring.cloud.stream.binders.rabbit_cluster.type=rabbit
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.addresses=192.168.68.101:5672
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.username=guest
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.password=guest
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.virtual-host=/
定义接口
cn.dfun.demo.rabbitmq.springcloudstream.stream.Barista
public interface Barista {
String OUTPUT_CHNANEL = "output_channel";
@Output(Barista.OUTPUT_CHNANEL)
MessageChannel logoutput();
}
发送服务类
cn.dfun.demo.rabbitmq.springcloudstream.stream.RabbitmqSender
@EnableBinding(Barista.class)
@Service
public class RabbitmqSender {
@Resource
private Barista barista;
public String sendMessage(Object message, Map<String, Object> properties) {
try {
MessageHeaders messageHeaders = new MessageHeaders(properties);
Message msg = MessageBuilder.createMessage(message, messageHeaders);
boolean sendStatus = barista.logoutput().send(msg);
System.out.println("-------------------sending-----------------");
System.out.println("发送数据: " + message + ", sendStatus: " + sendStatus);
} catch (Exception e) {
System.out.println("-------------------error----------------");
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
return null;
}
}
测试类
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {
@Resource
private RabbitmqSender rabbitmqSender;
@Test
public void testSender() throws Exception {
for (int i = 0; i < 1; i++) {
try {
Map<String, Object> properties = new HashMap<>();
properties.put("SEARIAL_NUMBER", "12345");
properties.put("BANK_NUMBER", "abc");
properties.put("PLAT_SEND_TIMES", DateUtils.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss.SSS"));
rabbitmqSender.sendMessage("Hello.I am amp sender num :" + i, properties);
} catch (Exception e) {
System.out.println("----------------error--------------");
e.printStackTrace();
}
}
}
}
d.消费者
配置
server.port=8002
server.context-path=/consumer
spring.application.name=consumer
spring.cloud.stream.bindings.input_channel.destination=exchange-3
spring.cloud.stream.bindings.input_channel.group=queue-3
spring.cloud.stream.bindings.input_channel.binder=rabbit_cluster
spring.cloud.stream.bindings.input_channel.consumer.concurrency=1
spring.cloud.stream.rabbit.bindings.input_channel.consumer.requeue-rejected=false
spring.cloud.stream.rabbit.bindings.input_channel.consumer.acknowledge-mode=MANUAL
spring.cloud.stream.rabbit.bindings.input_channel.consumer.recovery-inteval=3000
spring.cloud.stream.rabbit.bindings.input_channel.consumer.durable-subscription=true
spring.cloud.stream.rabbit.bindings.input_channel.consumer.max-concurrency=5
spring.cloud.stream.binders.rabbit_cluster.type=rabbit
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.addresses=192.168.68.101:5672
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.username=guest
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.password=guest
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.virtual-host=/
定义接口
public interface Barista {
String INPUT_CHNANEL = "input_channel";
@Input(Barista.INPUT_CHNANEL)
SubscribableChannel loginput();
}
接受消息类
@EnableBinding(Barista.class)
@Service
public class RabbitmqReceiver {
@StreamListener(Barista.INPUT_CHNANEL)
public void receiver(Message message) throws Exception {
Channel channel = (Channel)message.getHeaders().get(AmqpHeaders.CHANNEL);
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
System.out.println("Input Stream接受数据: " + message);
System.out.println("消费完毕--------");
channel.basicAck(deliveryTag, false);
}
}
启动消费者,然后启动生产者测试类
异常:
org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect
...
Caused by: java.net.ConnectException: Connection refused: connect
?该异常不影响运行
5.RabbitMQ集群架构
5.1 了解RabbitMQ集群架构模式
主备模式
主节点正常,备节点不能读写
主节点宕机,备节点升级
可以一主多备
使用HaProxy做主备切换
远程模式
不同数据中心数据复制,跨集群互联
使用shovel插件后,近端同步确认,远端异步确认
队列压力过大时,通过备用队列发送给远端集群
# 启用插件
rabbitmq-plugins enable amqp_client
rabbitmq-plugins enable rabbitmq_shovel
# 创建配置文件(略)
touch /etc/rabbitmq/rabbitmq.config
# 源服务器和目的地服务器使用相同的配置文件
镜像模式(重要)
保证100%数据不丢失
实际中用的最多
主要实现数据同步
一般是2-3个节点,100%数据可靠性一般是3节点

多活模式
因为Shovel比较复杂,所以这种模式是异地数据复制的主流模式
需要依赖federation插件,实现持续可靠的AMQP数据通信
采用双中心或多中心各部署集群,中心之间可能会队列共享
federation不需要构建cluster,在broker之间传输,erlang和rabbitmq版本可以不同
下游从上游拉去消息,有实际Queue接收
5.2 从零开始构建高可靠RabbitMQ集群
5.2.1 RabbitMQ集群搭建
# 准备3个节点node-01\node-02\node-03分别安装rabbitmq
# 异常
Error: {could_not_read_pid,{error,enoent}}
# 强杀
ps -ef|grep rabbitmq
kill -9 1574
# 文件同步,node-01为master,cookie copy到其他节点
cd /var/lib/rabbitmq
# 查看隐藏文件
ls -ia
# 文件同步
scp .erlang.cookie root@node-02:/var/lib/rabbitmq
scp .erlang.cookie root@node-03:/var/lib/rabbitmq
# 3个节点切换目录
cd /usr/local
# 3个节点分别启动
rabbitmq-server -detached
# 查看启动情况
lsof -i:5672
# lsof安装
yum install lsof -y
# 抛异常提示rabbitmq已经在运行可以kill -9杀进程
# 02\03加入集群,node-02\node-03分别执行
rabbitmqctl stop_app
rabbitmqctl join_cluster --ram rabbit@node-01
rabbitmqctl start_app
# 移除节点命令
# rabbitmqctl forget_cluster_node rabbit@node-**
# 修改集群名称,任意节点执行
rabbitmqctl set_cluster_name rabbitmq_cluster1
# 查看集群状态
rabbitmqctl cluster_status
# 分别访问node-01:15672,node-02:15672,node-03:15672,可以看到三个节点的状态
# 若提示:Node statistics not available说明该节点未安装管控台插件,执行:
rabbitmq-plugins enable rabbitmq_management
# 设置镜像队列策略(数据同步),任意节点执行
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
5.2.2 Haproxy安装
Haproxy
非常高性能的负载均衡组件
# 准备两个节点node-04\node-05,分别执行
# 安装依赖
yum install gcc vim -y
# 编译安装
tar -zxvf haproxy-1.6.5.tar.gz -C /usr/local
cd /usr/local/haproxy-1.6.5/
make TARGET=linux31 PREFIX=/usr/local/haproxy
make install PREFIX=/usr/local/haproxy
# 创建配置目录
mkdir /etc/haproxy
# 赋权
groupadd -r -g 149 haproxy
useradd -g haproxy -r -s /sbin/nologin -u 149 haproxy
# 创建haproxy配置文件
touch /etc/haproxy/haproxy.cfg
配置文件
修改3个节点的主机名\ip以及ha管控台的ip
#logging options
global
log 127.0.0.1 local0 info
maxconn 5120
chroot /usr/local/haproxy
uid 99
gid 99
daemon
quiet
nbproc 20
pidfile /var/run/haproxy.pid
defaults
log global
#使用4层代理模式,”mode http”为7层代理模式
mode tcp
#if you set mode to tcp,then you nust change tcplog into httplog
option tcplog
option dontlognull
retries 3
option redispatch
maxconn 2000
contimeout 5s
##客户端空闲超时时间为 60秒 则HA 发起重连机制
clitimeout 60s
##服务器端链接超时时间为 15秒 则HA 发起重连机制
srvtimeout 15s
#front-end IP for consumers and producters
listen rabbitmq_cluster
bind 0.0.0.0:5672
#配置TCP模式
mode tcp
#balance url_param userid
#balance url_param session_id check_post 64
#balance hdr(User-Agent)
#balance hdr(host)
#balance hdr(Host) use_domain_only
#balance rdp-cookie
#balance leastconn
#balance source //ip
#简单的轮询
balance roundrobin
#rabbitmq集群节点配置 #inter 每隔五秒对mq集群做健康检查, 2次正确证明服务器可用,2次失败证明服务器不可用,并且配置主备机制
server node-01 192.168.68.101:5672 check inter 5000 rise 2 fall 2
server node-02 192.168.68.102:5672 check inter 5000 rise 2 fall 2
server node-03 192.168.68.103:5672 check inter 5000 rise 2 fall 2
#配置haproxy web监控,查看统计信息
listen stats
bind 192.168.68.104:8100
mode http
option httplog
stats enable
#设置haproxy监控地址为http://localhost:8100/rabbitmq-stats
stats uri /rabbitmq-stats
stats refresh 5s
# 启动haproxy
/usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg
# 查看haproxy进程状态
ps -ef | grep haproxy
# 浏览器访问控制台
http://192.168.68.104:8100/rabbitmq-stats
http://192.168.68.105:8100/rabbitmq-stats
5.2.3 KeepAlived组件
通过VRRP协议实现高可用
VRRP 虚拟路由器冗余协议,解决单点故障问题
Master发送心跳,心跳停止则切换Slave
# 解压
tar -zxvf keepalived-1.2.18.tar.gz -C /usr/local/
# 编译安装
cd ../keepalived-1.2.18/ && ./configure --prefix=/usr/local/keepalived
make && make install
# 复制配置文件
mkdir /etc/keepalived
cp /usr/local/keepalived/etc/keepalived/keepalived.conf /etc/keepalived
# 复制脚本
cp /usr/local/keepalived/etc/rc.d/init.d/keepalived /etc/init.d/
cp /usr/local/keepalived/etc/sysconfig/keepalived /etc/sysconfig/
ln -s /usr/local/sbin/keepalived /usr/sbin/
ln -s /usr/local/keepalived/sbin/keepalived /sbin/
# 如软链接已存在需要删除
rm -f /sbin/keepalived
# 开机启动
chkconfig keepalived on
配置文件
主节点
修改主机名\state\interface(网卡名)\虚拟ip
! Configuration File for keepalived
global_defs {
router_id node-04 ##标识节点的字符串,通常为hostname
}
vrrp_script chk_haproxy {
script "/etc/keepalived/haproxy_check.sh" ##执行脚本位置
interval 2 ##检测时间间隔
weight -20 ##如果条件成立则权重减20
}
vrrp_instance VI_1 {
state MASTER ## 主节点为MASTER,备份节点为BACKUP
interface ens33 ## 绑定虚拟IP的网络接口(网卡),与本机IP地址所在的网络接口相同(我这里是eth0)
virtual_router_id 79 ## 虚拟路由ID号(主备节点一定要相同)
mcast_src_ip 192.168.68.104 ## 本机ip地址
priority 100 ##优先级配置(0-254的值)
nopreempt
advert_int 1 ## 组播信息发送间隔,俩个节点必须配置一致,默认1s
authentication { ## 认证匹配
auth_type PASS
auth_pass bhz
}
track_script {
chk_haproxy
}
virtual_ipaddress {
192.168.68.70 ## 虚拟ip,可以指定多个
}
}
从节点配置
state为BACKUP
优先级priority小于主节点
主机名\网卡名称
虚拟ip与主节点一致
! Configuration File for keepalived
global_defs {
router_id node-05 ##标识节点的字符串,通常为hostname
}
vrrp_script chk_haproxy {
script "/etc/keepalived/haproxy_check.sh" ##执行脚本位置
interval 2 ##检测时间间隔
weight -20 ##如果条件成立则权重减20
}
vrrp_instance VI_1 {
state BACKUP ## 主节点为MASTER,备份节点为BACKUP
interface ens33 ## 绑定虚拟IP的网络接口(网卡),与本机IP地址所在的网络接口相同(我这里是eno16777736)
virtual_router_id 79 ## 虚拟路由ID号(主备节点一定要相同)
mcast_src_ip 192.168.68.105 ## 本机ip地址
priority 90 ##优先级配置(0-254的值)
nopreempt
advert_int 1 ## 组播信息发送间隔,俩个节点必须配置一致,默认1s
authentication { ## 认证匹配
auth_type PASS
auth_pass bhz
}
track_script {
chk_haproxy
}
virtual_ipaddress {
192.168.68.70 ## 虚拟ip,可以指定多个
}
}
# 两个节点创建脚本
vim /etc/keepalived/haproxy_check.sh
# 脚本赋权
chmod +x /etc/keepalived/haproxy_check.sh
脚本内容
#!/bin/bash
COUNT=`ps -C haproxy --no-header |wc -l`
if [ $COUNT -eq 0 ];then
/usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg
sleep 2
if [ `ps -C haproxy --no-header |wc -l` -eq 0 ];then
killall keepalived
fi
fi
# 启动
service keepalived start
# 停止
# service keepalived stop
# 状态
service keepalived status
# 重启
service restart
高可用测试
查看主节点ip可以看到配置的虚拟ip:
192.168.68.70,停止主节点keepalived服务,查看从节点服务可以看到虚拟ip从主节点漂移到从节点,主节点服务重启后虚拟ip漂移回主节点
架构图

5.2.4 集群配置文件
关键配置参数
tcp_listeners rabbitmq
监听端口,默认5672
disk_free_limit
磁盘低水位线,低于指定值停止接收数据
默认值为与内存关联1:1,也可定制为多少byte
vm_memory_high_watermark
内存低水位线,低于该水位线开启流控机制
默认值为0.4
hipe_compile
实验性参数提升性能,若出现erlang vm segfaults应关闭
force_fine_statistics
进行精细化统计,影响性能
集群节点模式
Disk磁盘存储\Ram内存存储
详细配置参考:
http://www.rabbitmq.com/configure.html#configuration-file
5.3 集群配置与集群运维故障\失败转移
前提:节点A和B组成镜像队列
场景1:A先停,B后停
该场景B是Master,只需先启动B再启动A,或者先启动A,30秒之内启动B
场景2:A\B同时停机
30秒之内连续启动A和B
场景3:A先停,B后停,且A无法恢复
B是Master,B启动后,调用:
rabbitmqctl forget_cluster_node A
解除与A的cluster关系,在将新的Slave节点加入B
场景4:A先停,B后停,且B无法恢复
由于B是主节点,所以不能直接启动A
A节点执行:
rabbitmqctl forget_cluster_node --offline B
然后A可以正常启动,再将新的Slave加入A
场景5:A先停,B后停,且A\B均无法恢复,但能得到A或B的磁盘文件
数据库文件默认在$RABBIT_HOME/var/lib目录,将其拷贝到新的节点,然后修改新节点的hostname为A或B的hostname,如果是A节点的磁盘文件,按照场景4处理,如果是B节点的磁盘文件,按照场景3处理
场景6:A先停,B后停,且A\B均无法恢复,且得不到磁盘文件
无解
5.4 高级插件的使用
延迟插件
消息延迟推送,定时任务执行,削峰限流降级的异步延迟消息机制
插件地址:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
http://www.rabbitmq.com/community-plugins.html
# 3个节点分别执行
cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/plugins
# 上传插件包rabbitmq_delayed_message_exchange-0.0.1.ez到该目录
# 查看插件清单
rabbitmq-plugins list
# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
异常:
# node-02\node-03
Error: {{case_clause,{timeout,['rabbit_delayed_messagerabbit@node-02',
...
测试:
a.使用管控台创建delay.exchange
指定type为x-delayed-message
设置参数 x-delayed-type: topic
b.创建队列delay.queue
c.exchange绑定队列,routingkey设为rabbit.#
d.exchange发送消息
设置headers: x-delay=15000, 15秒后发送
queue将在15秒后收到消息
6.大厂SET架构演化
6.1 SET化架构目标
SET 单元化架构
业务多元化发展,通过集群加机器具备了一定的可扩展性,但单个业务瓶颈可能影响全网业务
分布式集群问题
1.容灾
如数据库主库机房挂掉
2.资源扩展
如跨机房延迟
3.大集群拆分
同城"双活"
业务分流
存储层面跨机房写
一个数据中心故障手动切换流量
两地三中心
同城双活基础上,异地部署灾备中心
!(https://img.haomeiwen.com/i5880229/a3391999f7851105.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
问题:
冷备成本,同步问题
冷备关键时刻不敢切
数据单点写,数据库瓶颈无法解决
SET化方案目标
业务:解决容灾和扩展性问题,支撑业务高速发展
通用性:架构侧形成通用解决方案,方便各业务线接入
6.2 SET化架构策略

一个核心业务线部署到一个机房中
不阻断核心链路的业务放到中心机房
流量路由
可根据userid,判断某次请求到中心集群还是单元化集群
中心集群
未进行单元化改造,跟当前架构一致
单元化集群
只负责本单元的流量处理,实现流量拆分和故障隔离
前期只存储本单元数据,后续做数据同步,实现容灾切换
中间件
RPC KV MQ等
RPC 对于SET服务调用封闭到SET内
KV 支持分SET的数据生产和消费
MQ 支持分SET的消息生产和消费
数据同步
全局数据(如商家菜品)部署到中心集群,其他单元集群同步全局数据
异地多活后,各单元集群做异地同步容灾
SET部署到不同地区实现容灾
高效本地化服务
根据前端位置采集和域名解析策略,将流量路由到最近的SET
SET封装更灵活,SET一键创建/下线,SET一键发布等
SET内部服务异常时也可进行跨SET的服务调用
6.3 SET化架构原则
SET化架构实现对业务代码透明
切分规则
理论上由业务层面定制
实现上选最大业务维度划分
部署规范
一个SET并不一定限制在同机房
单个SET内不宜超过1000台物理机
6.4 SET化架构实现

Federation插件不需要cluser,在broker间传输,双方可以使用不同的user和virtual host,rabbitmq和erlang版本可以不一致
a.安装插件
# 准备两个节点分别安装rabbitmq
# 启用插件
rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_management
控制台点击Admin选项卡可见右侧多出两个选项即为安装成功

选择node-02作为downstream,node-01作为upstream
b.node-02创建exchange和queue
exchange
name:test.exchange
node-02创建queue
name:test.queue
绑定test.exchange和test.queue,routingkey为test
c.node-02创建upstream
点击:Admin->Federation Upstreams
name: upstream001
uri: amqp://192.168.68.101
d.node-02设置policies
点击:Admin->Policies
Name:policies001
Pattern:^test
Definition:federation-upstream-set=all
点击Add Policy后,点击右侧Federation Status可见多了两个连接
e.node-01发送消息
node-01控制台可以看到多了Connection和Channel,exchange和queue也被同步过来,通过test.exchange发送消息,node-02即可接收到
如果创建一个queue并将该queue与test.exchange绑定,则消息可以同时被node-01和node-02消费
SET化配置规则
1.Downstream从Upstream上主动拉消息,并非拉所有,而是明确定义exchange和绑定queue,使用amqp协议通信
2.Upstream可以把消息直接通过Federation Exchange路由给Downstream节点进行消费,并可以在Upstream
添加具体队列去消费消息,则一条消息分别发到两个集群以实现消息同步
3.可以根据业务规则去规划不同的集群监听不同的消息队列,从而达到SET化,保障性能\可靠性\数据一致性
7.大厂MQ实现思路和设计方案
7.1 架构方案

基本封装
序列化转换,异步发送
生产实例和消费实例的连接池
可靠性投递
消费端幂等,避免重复消费
扩展封装
迅速发送消息,日志\统计场景保证高吞吐
延迟消息,用于延迟检查\限流等场景
事务消息,保证100%可靠性,金融单笔大额场景
顺序消息,如下单复合性操作
消息补偿\重试
集群负载均衡
消息路由策略,指定某些消息路由到指定的SET集群
迅速发送
生产端不落库
确认模式
1.业务消息落库
2.消息落库
3.发送给broker
4.broker发送确认消息
5.生产端更改消息库状态
6.broker无确认,分布式定时任务对消息库进行补偿重试
批量消息
把多条消息放入集合统一提交
期望消费者进行批量化消费
不保障可靠性,需要补偿
延迟消息
使用延迟插件,指定delayTime属性
场景:
订单签收后不点击确认支付,系统在一定时间后自动支付
顺序消息
消息投递到同一队列,消费者独占(只能有一个)
需要统一提交,所有消息会和ID一致
添加顺序序号和本次顺序消息的size,消费者每次拿到消息落库
发送给自身的延迟消息(包含会话id\size),进行后续消费
收到延迟消息后,根据会话id和size抽取数据库数据处理
异常情况定时轮询补偿,比如生产端没有完全投递成功或者消费者落库异常等
事务消息
单笔大额,消息优先级提到最高,要求可靠性100%
兼顾银行系统,需要补偿,主动发送银行查询指令等
保障性能同时支持事务,我们没有选择传统的RabbitMQ和Spring集成的机制
解决方案:
可靠性投递,补偿
业务操作数据库和消息记录数据库使用同一数据源
重写Spring DataSourceTransactionManager,本地事务提交时发送消息,有可能事务提交成功但消息发送失败,需要进行补偿
幂等性
可能非幂等的原因
1.可靠性消息投递
2.broker和消费端传输网络抖动
3.消费端故障和异常
幂等性设计
统一id生成服务,外部统一ID发送服务不可用时使用本地ID生成服务策略
下游消费者路由增加ID规则路由组件,根据ID hash落库,也可以通过redis缓存
网友评论