- Spring Boot 对 AMQP 协议的消息队列产品提供了良好的支持,spring-amqp 模块对 AMQP 协议的一个抽象和封装,目的是为了简化 AMQP 协议的消息队列框架的使用
确保 RabbitMQ 正常运行
启动 RabbitMQ
/sbin/rabbitmq-server -detached
添加两个新用户
/sbin/rabbitmqctl add_user producer 123456
/sbin/rabbitmqctl add_user consumer 123456
为用户赋予权限(否则会出现401权限错误)
/sbin/rabbitmqctl set_permissions producer '.*' '.*' '.*'
/sbin/rabbitmqctl set_permissions consumer '.*' '.*' '.*'
结合 Spring Boot
引入 spring-boot-starter-amqp 依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
消息生产者
添加 RabbitMQ 配置属性
server:
port: 8003
spring:
rabbitmq:
host: localhost
username: producer
password: 123456
port: 5672
application:
name: producer
RabbitMQ 配置类
@Configuration
public class RabbitMQConfig {
//声明队列
@Bean
public Queue queue1() {
return new Queue("hello.queue1", true); // true表示持久化该队列
}
@Bean
public Queue queue2() {
return new Queue("hello.queue2", true);
}
//声明交互器
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topicExchange");
}
//绑定
@Bean
public Binding binding1() {
return BindingBuilder.bind(queue1()).to(topicExchange()).with("key.1");
}
@Bean
public Binding binding2() {
return BindingBuilder.bind(queue2()).to(topicExchange()).with("key.#");
}
}
消息生产者类
@Component
public class Sender {
@Autowired
private RabbitTemplate rabbitTemplate;
//发送消息
public void send(String msg){
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
System.out.println("开始发送消息 : " + msg.toLowerCase());
rabbitTemplate.convertSendAndReceive("topicExchange", "key.1", msg, correlationId);
System.out.println("结束发送消息 : " + msg.toLowerCase());
}
}
测试发送消息
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ProducerApplication.class)
public class SenderTest {
@Autowired
private Sender sender;
@Test
public void sender(){
sender.send("hello");
}
}
RabbitMQ 查看队列与消息
- 访问 RabbitMQ 图形管理界面,可看见已创建两个持久的队列 hello.queue1、hello.queue2
- 点击进 hello.queue1、hello.queue2 队列,通过 Get messages 获取消息
消息消费者
添加 RabbitMQ 配置属性
server:
port: 8004
spring:
rabbitmq:
host: localhost
username: consumer
password: 123456
port: 5672
listener:
simple:
concurrency: 2 #最小消息监听线程数
max-concurrency: 2 #最大消息监听线程数
消息监听类,通过 @RabbitListener 注解监听队列
@Component
public class Receiver {
@RabbitListener(queues = "hello.queue1")
public String processMessage1(String msg) {
System.out.println(Thread.currentThread().getName() + " 接收到来自hello.queue1队列的消息:" + msg);
return msg.toUpperCase();
}
@RabbitListener(queues = "hello.queue2")
public void processMessage2(String msg) {
System.out.println(Thread.currentThread().getName() + " 接收到来自hello.queue2队列的消息:" + msg);
}
}
测试
- 启动消息消费者服务
- 通过消息生产者生产消息
- RabbitMQ 会将消息分发给消息消费者,可在消息消费者服务控制台看到消息
网友评论