1.RabbitMQ的概述与重要概念
RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。
MQ的主要用途包含如下:
- 异步处理,比如注册发送邮件和短信
- 应用解耦,比如电商系统中,订单服务和库存服务解耦
- 流量削峰,比如秒杀,抢红包活动
重要的概念可以通过如下图进行了解:
![](https://img.haomeiwen.com/i10030809/7fb0607bae524cc0.png)
- Message 由消息头和消息体组成,消息头由一组可选属性组成,消息体是不透明的
- Publisher 消息的生产者,表示一个向Exchange发布消息的客户端应用程序
- Exchange 交换器,接收Publisher发送的消息并路由给Queue
- Binding 绑定,关联Exchange和Queue
- Queue 消息队列,用于保存消息直到发送给Consumer,Message会一直在队列里直至被消费者取走
- Connection TCP连接
- Channel 信道,是建立在真实的TCP连接里的虚拟连接,对于操作系统来说建立和销毁一次TCP是非常昂贵的开销,因为引入信道来复用一条TCP连接
- Consumer 消费者,表示从一个Queue中取得消息的一个客户端应用程序
- Broker RabbitMQ服务器实体
- Virtual Host 虚拟主机,表示一个mini版的RabbitMQ服务器,拥有自己的Queue、Exchange、Binding和权限机制,默认的vhost是/,必须在连接时指定
MQ对比
![](https://img.haomeiwen.com/i10030809/e9b5fa3089d56a36.png)
2.RabbitMQ Window安装介绍
RabbitMQ安装依赖Erlang,因此安装之前需要先安装Erlang环境,如下:
otp_win64_22.3.exe Erlang的Window安装包,安装包中没有Erlang关键字
rabbitmq-server-3.8.3.exe
Erlang和RabbitMQ有对应的版本关系,请点击查看官网信息
![](https://img.haomeiwen.com/i10030809/ba16ceb32b40aeba.png)
3.插件安装
软件安装完成之后,需要安装管理界面插件。打开RabbitMQ Command Prompt命令行界面,输入如下命令:
rabbitmq-plugins enable rabbitmq_management
安装完成之后,打开网址:http://localhost:15672/
默认账号为guest,密码为guest,进入系统之后创建admin账号,并修改guest密码
4.RabbitMQ 运维篇
4.1 单机模式(开发测试环境推荐)
单机模式参考Window安装即可(暂不提供Linux版本)。
4.2 普通集群模式
组成集群需要两步操作,该操作同样适用于镜像模式
- 该模式下需要保证不同机器之间的erlang cooike一致,可将其中一台机器的erlang cookie拷贝到其他机器上。
- 将节点加入集群,假如有三个节点,可在节点2,3两台机器上加入到节点1,如下:
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbit@node1
rabbitmqctl start_app
查看集群状态 rabbitmqctl cluster_status
普通集群模式中每个节点都有相同的元数据,即相同的队列结构。但是消息(实际数据)只存在其中一个节点上,因此若消费者连接到非数据节点的时候,消息会先传递给消费者连接的节点,再提供给消费者。因此该模式下由两个重要的特点,1. 若存储消息节点宕机了,整个集群不可用,因此此模式并非高可用;2. 节点之间可能存在大量的数据传递,占用带宽高。即使如此,若使用此种模式,客户端应尽快均匀散布到各个节点上。
原理图如下:
![](https://img.haomeiwen.com/i10030809/0802eee6b132111c.png)
4.3 镜像集群模式(生产环境必须)
在创建普通集群的基础上,设置策略(policy),该操作可通过web ui设置,如下:
![](https://img.haomeiwen.com/i10030809/aec9ed9f47449173.png)
也可以通过命令设置
// 为每个以“rock.wechat”开头的队列设置所有节点的镜像,并且设置为自动同步模式
rabbitmqctl set_policy -p rock ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
该模式是一个HA方案(高可用方案),RabbitMQ是没有中心的,不会因为一个节点挂了导致整个集群不可用,解决了普通模式中的问题,与普通模式不同的是,消息会主动在镜像节点之间同步,而不是在客户端获取数据时再拉取数据。
因数据在不同节点之间主动同步,因此带宽要求更高,降低了系统的性能。这种模式适合对消息可靠性要求较高的场合中使用。
原理图如下:
![](https://img.haomeiwen.com/i10030809/2b87e3b51b217194.png)
5.RabbitMQ 实战篇
5.1 管理规范
5.1.1 命名规范
exchange:以ex开头,规则为ex.业务域.应用名称.消息类型
ex.businame.appname.msgtype
queue:以q开头,规则为q.业务域.应用名称.消息类型
q.businame.appname.msgtype
5.1.2 用户管理规范
- 提供给应用使用的用户类型为none
- 只授权用户特定的exchange(写)和queue(读)访问权限,这样代码就无法创建交换器和队列
5.1.3 其他规范
- 队列和交换器由MQ管理员与研发人员沟通规则后,统一由MQ管理员进行创建。
- 代码中禁止进行创建交换器和队列的操作(若用户管理规范,此操作无法执行)。
5.2 环境准备
- maven依赖配置
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.0.0</version>
</dependency>
- 新建一个标准的maven结构工程,编写MQ工具类
public class ConnectionUtil {
public static Connection get(String username, String pwd) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername(username);
factory.setPassword(pwd);
try {
return factory.newConnection();
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
}
5.3 应用实例
abbitMQ常用的Exchange Type有三种
- direct 当消息的Routing key与Binding key完全匹配时,将消息路由到Queue中
- fanout 将消息广播到与Exchange绑定的所有Queue,效率最高
- topic Binding key使用模式,“#”匹配一个或多个词,“*”只匹配一个词,当消息的Routing key模糊匹配该模式才进行路由
5.3.1 Direct模式
public class DirectSend {
public static final String EXCHANGE_NAME = "exchange-test-direct";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.get("producer-a", "producer-a");
Channel channel = connection.createChannel();
// channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String msg = "哈哈123";
channel.basicPublish(EXCHANGE_NAME, "delete", null, msg.getBytes("utf-8"));
System.out.println("[X] send: " + msg);
channel.close();
connection.close();
}
}
public class DirectRec2 {
// public static final String EXCHANGE_NAME = "exchange-test-direct";
public static final String QUEUE_NAME = "queue-test-direct-2";
public static void main(String[] args) throws IOException {
Connection connection = ConnectionUtil.get("comsumer-b", "comsumer-b");
final Channel channel = connection.createChannel();
// channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
// channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
// channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
channel.basicQos(1); // 同一时刻服务器只会发送一条消息给消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[Y2] receive msg: " + msg);
//休眠
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 将消费者绑定到队列,并设置自动确认消息(即无需显示确认,如何设置请慎重考虑)
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
5.3.2 Fanout模式
public class SubscribeSend {
public static final String EXCHANGE_NAME = "exchange-test-fanout-01";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.get("producer-a", "producer-a");
Channel channel = connection.createChannel();
// channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String msg = "hello world";
channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes("utf-8"));
System.out.println("[X] send: " + msg);
channel.close();
connection.close();
}
}
public class SubscribeRec2 {
public static final String QUEUE_NAME = "queue-test-fanout-02";
// public static final String EXCHANGE_NAME = "exchange-test-01";
public static void main(String[] args) throws IOException {
Connection connection = ConnectionUtil.get("comsumer-b", "comsumer-b");
final Channel channel = connection.createChannel();
// channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[Y2] receive msg: " + msg);
//休眠
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 将消费者绑定到队列,并设置自动确认消息(即无需显示确认,如何设置请慎重考虑)
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
5.3.3 Topic模式
public class TopicSend {
private final static String EXCHANGE_NAME = "exchange-test-topic";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.get("producer-a", "producer-a");
Channel channel = connection.createChannel();
// 声明exchange
// channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 消息内容
String message = "Hello World!!";
channel.basicPublish(EXCHANGE_NAME, "routekey.1", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
public class TopRec1 {
public static final String QUEUE_NAME = "queue-test-topic-01";
// public static final String EXCHANGE_NAME = "exchange-test-topic";
public static void main(String[] args) throws IOException {
Connection connection = ConnectionUtil.get("comsumer-b", "comsumer-b");
final Channel channel = connection.createChannel();
// channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routekey.*");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[Y1] receive msg: " + msg);
//休眠
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 将消费者绑定到队列,并设置自动确认消息(即无需显示确认,如何设置请慎重考虑)
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
5.3.4 ACK消息确认(推荐使用方式)
public class MultMqSend {
private final static String EX_NAME = "exchange-test-ack-01";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.get("producer-a", "producer-a");
Channel channel = connection.createChannel();
// channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg = "hello";
for (int i = 0; i < 100; i++) {
channel.basicPublish(EX_NAME, "rk", null, (msg + i).getBytes("UTF-8"));
System.out.println("[X] send " + (msg + i));
}
channel.close();
connection.close();
}
}
public class MultiMqRecManualConfirm1 {
private final static String QUEUE_NAME = "queue-test-ack-01";
public static void main(String[] args) throws IOException {
Connection connection = ConnectionUtil.get("comsumer-b", "comsumer-b");
final Channel channel = connection.createChannel();
// channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[Y1] receive msg: " + msg);
//休眠
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 将消费者绑定到队列,并设置自动确认消息(即无需显示确认,如何设置请慎重考虑)
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
public class MultiMqRecManualConfirm2 {
private final static String QUEUE_NAME = "queue-test-ack-01";
public static void main(String[] args) throws IOException {
Connection connection = ConnectionUtil.get("comsumer-b", "comsumer-b");
final Channel channel = connection.createChannel();
// channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[Y1] receive msg: " + msg);
//休眠
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 将消费者绑定到队列,并设置自动确认消息(即无需显示确认,如何设置请慎重考虑)
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
confirm模式解决了公平轮训的问题,哪个消费者处理更快,处理的消息更多(能者多劳)。这个案例解决了消费者消费消息可靠性问题,但是没有解决发送者发送消息可靠性问题。
5.3.5 basicQo和basicAck关系
两者是配套使用的。
// channel.basicQos(1)指该消费者在接收到队列里的消息但没有返回确认结果之前,
// 队列不会将新的消息分发给该消费者。队列中没有被消费的消息不会被删除,还是存在于队列中。
channel.basicQos(1);
// 确认消息
channel.basicAck(envelope.getDeliveryTag(), false);
网友评论