在这一小节,我们来实现一个最简单的消息生产与消费案例,构建生产者和消费者模型。也就是说生产者发送消息,投递到RabbitMQ中,然后消费者去监听队列,获取数据进行消费。
涉及的相关对象或者概念
- ConnectionFactory:获取连接工厂
- Connection:一个连接
- Channel:数据通信信道,可发送和接收消息
- Queue:具体的消息存储队列
- Producer & Consumer:生产和消费者
工程准备
1.首先创建一个Spring Boot工程,这里使用Spring Tool Suite工具,选择导航菜单File --> New --> Spring Starter Project
然后一直下一步就可以了。后续对 RabbitMQ API
的使用都会在这个工程中进行。
2.在pom文件中引入Maven依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
生产者模型构建
- 创建连接工厂ConnectionFactory
- 通过连接工厂创建连接Connection
- 通过连接创建频道Channel
- 通过频道Channel发送数据
- 关闭连接
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Procuder {
public static void main(String[] args) throws Exception {
//1 创建一个ConnectionFactory, 并进行配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3 通过connection创建一个Channel
Channel channel = connection.createChannel();
//4 通过Channel发送数据
for(int i=0; i < 5; i++){
String msg = "Hello RabbitMQ!";
//1 exchange(交换机) 2 routingKey(路由规则) 3 props(修饰消息) 4 body(消息实体)
channel.basicPublish("", "test001", null, msg.getBytes());
}
//5 记得要关闭相关的连接
channel.close();
connection.close();
}
}
消费者模型构建
- 创建连接工厂ConnectionFactory
- 通过连接工厂创建连接Connection
- 通过连接创建频道Channel
- 声明一个队列Queue
- 创建一个消费者Consumer
- 设置Channel,指定消费者监听的队列
- 获取消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Consumer {
public static void main(String[] args) throws Exception {
//1 创建一个ConnectionFactory, 并进行配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.43.157");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3 通过connection创建一个Channel
Channel channel = connection.createChannel();
//4 声明(创建)一个队列
String queueName = "test001";
//queueDeclare(queue, durable, exclusive, autoDelete, arguments)
//queue:队列名称 durable:是否持久化 exclusive:独占,表示这个队列只有这一个channel能去监听,目的是为了保障顺序消费
//autoDelete:队列如果与其他的exchange都没有绑定关系,那么会自动进行删除 arguments:扩展参数
channel.queueDeclare(queueName, true, false, false, null);
//5 创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//6 设置Channel
//basicConsume(String queue, boolean autoAck, Consumer callback)
//queue:队列名称 autoAck:是否自动签收 callback:消费者
channel.basicConsume(queueName, true, queueingConsumer);
while(true){
//7 获取消息
//nextDelivery不带参数会一直阻塞,待参数表示超时时间,超过指定时间没有接收到消息就放行
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("消费端: " + msg);
//Envelope envelope = delivery.getEnvelope();
}
}
}
先启动consumer端,右键Run As --> Java Application
查看管控台:http://192.168.43.157:15672/
可以看到已经有一个连接、频道、队列和消费者了。
然后启动producer端,此时消费端控制台打印了如下内容,证明消息被成功消费了。
消费端: Hello RabbitMQ!
消费端: Hello RabbitMQ!
消费端: Hello RabbitMQ!
消费端: Hello RabbitMQ!
消费端: Hello RabbitMQ!
刷新管控台,可以看到首页显示出了两个线性图,这里能直观看到生产消费的进度以及队列消息的情况。
之所以需要先启动消费端,是因为需要先声明队列才能接收消息,此时队列已经创建好了,就可以先启动生产者再启动消费者,同样可以完成消费。
Default Exchange
这里有个问题就是在生产者投递消息时需要指定exchange
,但是我们指定的是空,为什么消息可以被正确投递到队列test001
中呢。
这是因为如果生产者在投递消息时不指定exchange
,那么会使用rabbitmq默认的exchange,可以通过管控台看到第一个就是,其他的都是rabbitmq内部的交换机。
这个AMPQ default
交换机的路由规则是按照指定的routingKey
去MQ中查找是否有相同名称的队列,如果有就将消息路由到该队列中,如果没有消息就发送失败。
在下一小节我们会对Exchange
进行详细讲解。
网友评论