RabbitMQ
安装
- 一般来说安装 RabbitMQ 之前要安装 Erlang ,可以去Erlang官网下载。接着去RabbitMQ官网下载安装包,之后解压缩即可。根据操作系统不同官网提供了相应的安装说明:Windows、Debian / Ubuntu、RPM-based Linux、Mac
如果是Mac 用户,个人推荐使用 HomeBrew 来安装,安装前要先更新 brew:
brew update
brew install rabbitmq
- 启动
启动很简单,找到安装后的 RabbitMQ 所在目录下的 sbin 目录,可以看到该目录下有6个以 rabbitmq 开头的可执行文件,直接执行 rabbitmq-server 即可,下面将 RabbitMQ 的安装位置以 . 代替,启动命令就是:
./sbin/rabbitmq-server
#如果电脑已安装查看目录
[root@VM-0-2-centos mq]# find / -name rabbitmq-server
/etc/logrotate.d/rabbitmq-server
/usr/sbin/rabbitmq-server
/usr/lib/rabbitmq/bin/rabbitmq-server
/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.18/sbin/rabbitmq-server
/usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server
#后台启动
./sbin/rabbitmq-server -detached
- 浏览器查看 http://ip:15672/ 登录即可 默认账户密码:guest
- 设置用户和密码(https://www.cnblogs.com/whs123/p/14184317.html)
[root@VM-0-2-centos sbin]# ./rabbitmqctl add_user xx admin123
Adding user "xxx" ...
[root@VM-0-2-centos sbin]# ./rabbitmqctl set_user_tags xxxadministrator
Setting tags for user "xxx" to [administrator] ...
rabbitmq使用
六种消费模型:
- P - Q - C(基本消息模型)
P:生产者,也就是要发送消息的程序
C:消费者:消息的接受者,会一直等待消息到来。
queue:消息队列,图中红色部分。可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
- pom
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.2</version>
</dependency>
- 工具类
public class RabbitMqUtils {
private static ConnectionFactory connectionFactory;
static {
//重量级类常见,在类加载创建一次
connectionFactory = new ConnectionFactory();
}
// 定义创建连接
public static Connection getConnection() {
//获取连接对象
try {
//创建mq连接工厂对象
connectionFactory.setHost("x.y.z.j");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/gzsz");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
connectionFactory.setUsername("lg123");
connectionFactory.setPassword("lg123");
Connection connection = connectionFactory.newConnection();
return connection;
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return null;
}
//关闭通道,关闭连接
public static void closeChannel(Channel channel, Connection connection) {
try {
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
//
}
- 生产者
public class Provider {
private final static String QUEUE_NAME = "simple_queue";
public void sendMessage() throws IOException, TimeoutException {
// 1. 获取链接
Connection connection = RabbitMqUtils.getConnection();
// 2. 获取通道对象
Channel channel = connection.createChannel();
// 3. 通道绑到对应消息队列
//参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
/**
* 参数明细
* 1、queue 队列名称
* 2、durable 是否持久化,如果持久化,mq重启后队列还在
* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 4.通过通道向指定的队列发布消息
//参数:String exchange, String routingKey, BasicProperties props, byte[] body
/**
* 参数明细:
* 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")
* 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
* 3、props,消息的属性
* 4、body,消息内容
*/
String msg = "hello rabbitmq";
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
System.out.println("发送消息:" + msg.getBytes());
// 5. 关闭通道和链接
RabbitMqUtils.closeChannel(channel,connection);
}
public static void main(String[] args) throws IOException, TimeoutException {
Provider provider = new Provider();
provider.sendMessage();
}
}
- 消费者
public class Customer {
private final static String QUEUE_NAME = "simple_queue";
public void acceptMessage() throws IOException, TimeoutException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//消费消息
//参数1: 队列名称;参数2:开启消息自动确认机制
//参数3: 消费完成回调接口
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body =" + new String(body));
}
});
// channel.close();
// connection.close();
}
public static void main(String[] args) throws IOException, TimeoutException {
Customer customer = new Customer();
customer.acceptMessage();
}
}
消息确认机制(ACK)
通过刚才的案例可以看出,消息一旦被消费者接收,队列中的消息就会被删除。
那么问题来了:RabbitMQ怎么知道消息被接收了呢?
如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败,但是RabbitMQ无从得知,这样消息就丢失了!
因此,RabbitMQ有一个ACK机制。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:
自动ACK:消息一旦被接收,消费者自动发送ACK
手动ACK:消息接收后,不会发送ACK,需要手动调用
大家觉得哪种更好呢?
这需要看消息的重要性:
如果消息不太重要,丢失也没有影响,那么自动ACK会比较方便
如果消息非常重要,不容丢失。那么最好在消费完成后手动ACK,否则接收消息后就自动ACK,RabbitMQ就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失了。
我们之前的测试都是自动ACK的,如果要手动ACK,需要改动我们的代码:
消费者:
public class Customer {
private final static String QUEUE_NAME = "simple_queue";
public void acceptMessage() throws IOException, TimeoutException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//消费消息
//参数1: 队列名称;参数2:开启消息自动确认机制
//参数3: 消费完成回调接口
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者: body =" + new String(body));
// 手动进行ACK
/*
* void basicAck(long deliveryTag, boolean multiple) throws IOException;
* deliveryTag:用来标识消息的id
* multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
*/
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
// channel.close();
// connection.close();
}
public static void main(String[] args) throws IOException, TimeoutException {
Customer customer = new Customer();
customer.acceptMessage();
}
}
- p -q- [c1, c2 ] (work消息模型,竞争消息模型)
P:生产者:任务的发布者
C1:消费者1:领取任务并且完成任务,假设完成速度较慢(模拟耗时)
C2:消费者2:领取任务并且完成任务,假设完成速度较快
- 生产者
public class Provider {
private final static String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
//通道声明队列
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
//发布消息
for (int i = 1; i <= 10; i++) {
channel.basicPublish("",QUEUE_NAME, null, ("task..." + i).getBytes());
System.out.println("生产者-发送消息:"+ "task..." + i);
}
RabbitMqUtils.closeChannel(channel,connection);
}
}
- 消费者1
public class Customer1 {
private final static String QUEUE_NAME = "work_queue";
public void acceptMessage() throws IOException, TimeoutException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
// channel.basicQos(1);
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:" + new String(body));
//手动确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
// channel.close();
// connection.close();
}
public static void main(String[] args) throws IOException, TimeoutException {
Customer1 customer1 = new Customer1();
customer1.acceptMessage();
System.out.println("消费者1已启动");
}
}
- 消费者2
public class Customer2 {
private final static String QUEUE_NAME = "work_queue";
public void acceptMessage() throws IOException, TimeoutException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
// 不平均分配
// channel.basicQos(1);
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
//参数2:消息自动确认
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者2:" + new String(body));
// channel.basicAck(envelope.getDeliveryTag(), false);
}
});
// channel.close();
// connection.close();
}
public static void main(String[] args) throws IOException, TimeoutException {
Customer2 customer2 = new Customer2();
customer2.acceptMessage();
System.out.println("消费者2已启动");
}
}
能者多劳
刚才的实现有问题吗?
消费者1比消费者2的效率要低,一次任务的耗时较长
然而两人最终消费的消息数量是一样的
消费者1大量时间处于空闲状态,消费者2一直忙碌
现在的状态属于是把任务平均分配,正确的做法应该是消费越快的人,消费的越多。
怎么实现呢?
通过 BasicQos 方法设置prefetchCount = 1。这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理1个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它。相反,它会将其分派给不是仍然忙碌的下一个Consumer。
值得注意的是:prefetchCount在手动ack的情况下才生效,自动ack不生效。
public class Customer1 {
private final static String QUEUE_NAME = "work_queue";
public void acceptMessage() throws IOException, TimeoutException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
//prefetchCount在手动ack的情况下才生效,自动ack不生效
channel.basicQos(1);
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:" + new String(body));
//手动确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
// channel.close();
// connection.close();
}
public static void main(String[] args) throws IOException, TimeoutException {
Customer1 customer1 = new Customer1();
customer1.acceptMessage();
System.out.println("消费者1已启动");
}
}
订阅模型分类
说明下:
1、一个生产者多个消费者
2、每个消费者都有一个自己的队列
3、生产者没有将消息直接发送给队列,而是发送给exchange(交换机、转发器)
4、每个队列都需要绑定到交换机上
5、生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者消费
例子:注册->发邮件、发短信
X(Exchanges)
:交换机一方面:接收生产者发送的消息。另一方面:知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
Exchange类型有以下几种:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key 的队列
Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Header:header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配队列。
Header模式不展开了,感兴趣可以参考这篇文章https://blog.csdn.net/zhu_tianwei/article/details/40923131
Exchange(交换机)
只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
- P-X-[q1,q2]-[c1,c2] (Publish/subscribe(交换机类型:Fanout,也称为广播 ))
和前面两种模式不同:
1) 声明Exchange,不再声明Queue
2) 发送消息到Exchange,不再发送到Queue
- 生产者
public class Provider {
private final static String EXCHANGE_NAME = "test_fanout_exchange";
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
// 通道声明到交互机
//参数1: 交互机名称; 参数2: 交换机类型: fanout
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String msg = "注册成功";
channel.basicPublish(EXCHANGE_NAME,"", null, msg.getBytes());
//发布消息
// for (int i = 1; i <= 10; i++) {
// channel.basicPublish("logs","", null, (i + "fanout").getBytes());
//
// }
System.out.println("生产者:发送消息:" + msg.getBytes());
RabbitMqUtils.closeChannel(channel,connection);
}
}
- 消费者1
public class Customer1 {
private final static String EXCHANGE_NAME = "test_fanout_exchange";// 交换机
private final static String QUEUE_NAME = "fanout_exchange_queue_sms";// 发送短信队列
public void acceptMessage() throws IOException, TimeoutException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
// channel.exchangeDeclare("logs","fanout");
// String queueName = channel.queueDeclare().getQueue();
//声明队列名称
channel.queueDeclare(QUEUE_NAME, false,false,false,null);
//绑定队列到交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1【短信服务】:" + new String(body));
}
});
}
public static void main(String[] args) throws IOException, TimeoutException {
Customer1 customer1 = new Customer1();
customer1.acceptMessage();
}
}
- 消费者2
public class Customer2 {
private final static String EXCHANGE_NAME = "test_fanout_exchange";// 交换机
private final static String QUEUE_NAME = "fanout_exchange_queue_email";// 发送邮件队列
public void acceptMessage() throws IOException, TimeoutException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
// channel.exchangeDeclare("logs","fanout");
// String queueName = channel.queueDeclare().getQueue();
//声明队列名称
channel.queueDeclare(QUEUE_NAME, false,false,false,null);
//绑定队列到交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2【邮件服务】:" + new String(body));
}
});
}
public static void main(String[] args) throws IOException, TimeoutException {
Customer2 customer1 = new Customer2();
customer1.acceptMessage();
}
}
思考
1、publish/subscribe与work queues有什么区别。
区别:
1)work queues不用定义交换机,而publish/subscribe需要定义交换机。
2)publish/subscribe的生产方是面向交换机发送消息,work queues的生产方是面向队列发送消息(底层使用默认交换机)。
3)publish/subscribe需要设置队列和交换机的绑定,work queues不需要设置,实际上work queues会将队列绑定到默认的交换机 。
相同点:
所以两者实现的发布/订阅的效果是一样的,多个消费端监听同一个队列不会重复消费消息。
2、实际工作用 publish/subscribe还是work queues。
建议使用 publish/subscribe,发布订阅模式比工作队列模式更强大(也可以做到同一队列竞争),并且发布订阅模式可以指定自己专用的交换机。
分享两道面试题:
面试题:
避免消息堆积?
1) 采用workqueue,多个消费者监听同一队列。
2)接收到消息以后,而是通过线程池,异步消费。
如何避免消息丢失?
1) 消费者的ACK机制。可以防止消费者丢失消息。
但是,如果在消费者消费之前,MQ就宕机了,消息就没了?
2)可以将消息进行持久化。要将消息持久化,前提是:队列、Exchange都持久化
网友评论