spring-boot-starter-amqp项目对消息各种支持。
可以参阅官方文档
https://docs.spring.io/spring-boot/docs/1.5.15.RELEASE/reference/htmlsingle/#boot-features-rabbitmq
RabbitMQ是一个基于AMQP协议的轻量级,可靠,可扩展且可移植的消息代理。Spring使用RabbitMQ AMQP协议进行通信。
abbitMQ配置由外部配置属性控制 spring.rabbitmq.*。例如,您可以在以下部分声明以下部分 application.properties:
spring.rabbitmq.host = localhost
spring.rabbitmq.port = 5672
spring.rabbitmq.username = admin
spring.rabbitmq.password = secret
有关rabbitmq的属性相关配置可以参照 RabbitProperties 类中的相关源码
默认host是localhost ,port是5672,virtualHost是"/",用户名和密码是guest
一、 SpringBoot集成RabbitMq的最简框架的搭建
1、在pom.xml中添加必要的依赖
<!-- spring boot amqp包 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
当SpringBoot项目中在pom.xml中引入了上面的这个依赖,那么久已经持有了该RabbitTemplate对象了
默认配置就是上述相关的描述了
二、 SpringBoot集成RabbitMQ快速入门
1、新建一个maven项目 springboot-rabbitmq
image.png
2、在pom.xml中引入必要的依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.6.RELEASE</version>
<relativePath/>
</parent>
<groupId>com.yubin.springboot</groupId>
<artifactId>springboot-rabbitmq</artifactId>
<version>1.0.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.7</java.version>
</properties>
<dependencies>
<!-- Spring boot web 集成包 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- spring boot 测试包 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<!-- spring boot amqp包 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
3、在application.properties中配置rabbitmq相关的属性
# ==================== rabbitmq ===========
# rabbitmq 主机地址
spring.rabbitmq.host=127.0.0.1
# rabbitmq 主机端口号
spring.rabbitmq.port=5672
# rabbitmq 用户名
spring.rabbitmq.username=yubin
# rabbitmq 密码
spring.rabbitmq.password=yubin
# rabbitmq 虚拟主机
spring.rabbitmq.virtual-host=/yubin
4、创建启动类
/**
* SpringBoot集成RabbitMq用例启动类
*
* @Author YUBIN
*/
@SpringBootApplication
public class RabbitMqDemoApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitMqDemoApplication.class, args);
}
}
5、定义队列信息
package com.yubin.springboot.rabbitmq.configuration;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMq的相关配置
*
* @Author YUBIN
*/
@Configuration // 相当于xml配置文件
public class RabbitMqConfiguration {
@Bean
public Queue helloQueue() {
return new Queue("hello-queue");
}
@Bean
public Queue userQueue() {
return new Queue("user-queue");
}
}
6、定义生产者
package com.yubin.springboot.rabbitmq.producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* rabbitmq生产者示例
*
* @Author YUBIN
*/
@Component // 将该类交给Spring管理
public class RabbitMqProducerDemo {
private static Logger logger = LoggerFactory.getLogger(RabbitMqProducerDemo.class);
@Autowired // 注入rabbitmq 模板
private AmqpTemplate rabbitTemplate;
/**
* 发送消息的方法 hello-queue
*/
public void sendHelloMessage() {
// 定义消息体
String message = "RabbitMqProducerDemo hello queue send message";
rabbitTemplate.convertAndSend("hello-queue",message);
logger.info("==================RabbitMqProducerDemo hello queue send message success");
}
/**
* 发送消息的方法 user-queue
*/
public void sendUserMessage() {
// 定义消息体
String message = "RabbitMqProducerDemo user queue send message";
rabbitTemplate.convertAndSend("user-queue",message);
logger.info("===================RabbitMqProducerDemo user queue send message success");
}
}
7、定义消费者
package com.yubin.springboot.rabbitmq.consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* RabbitMq消费者示例
*
* @Author YUBIN
* @create 2018-08-08
*/
@Component // 交给Spring管理
//@RabbitListener(queues = {"hello-queue","user-queue"})
public class RabbitMqConsumerDemo {
private static Logger logger = LoggerFactory.getLogger(RabbitMqConsumerDemo.class);
//@RabbitHandler
@RabbitListener(queues = "hello-queue")
public void executeHello(String message) {
logger.info("executeHello================接收到的消息是:" + message);
}
@RabbitListener(queues = "user-queue")
public void executeUser(String message) {
logger.info("executeUser=================接收到的消息是:" + message);
}
}
8、测试类
/**
* SpringBoot集成RabbitMq测试类
*
* @Author YUBIN
*/
@RunWith(SpringRunner.class) //SpringRunner相当于 SpringJUnit4ClassRunner的别名类
@SpringBootTest(classes = RabbitMqDemoApplication.class)
public class RabbitMqDemoTest {
@Autowired
private RabbitMqProducerDemo producerDemo;
@Test
public void test1() {
producerDemo.sendHelloMessage();
producerDemo.sendUserMessage();
}
}
三、 SpringBoot集成RabbitMq各种模式的案例
1、简单队列
image.png
如上述案例所示
2、工作模式(默认是多劳多得的)
image.png
在RabbitMqConfiguration类中增加work-queue
@Bean
public Queue workQueue() {
return new Queue("work-queue");
}
在RabbitMqProducerDemo类中增加一个发送消息的方法
/**
* 发送消息的方法 work-queue
*/
public void sendWorkMessage(String message) {
// 定义消息体
rabbitTemplate.convertAndSend("work-queue",message);
logger.info("==================RabbitMqProducerDemo work queue send success:" + message);
}
在RabbitMqConsumerDemo类中增加两个消费者方法
@RabbitListener(queues = "work-queue")
public void executeWork1(String message) {
logger.info("executeWork1================接收到的消息是:" + message);
}
@RabbitListener(queues = "work-queue")
public void executeWork2(String message) throws InterruptedException {
Thread.sleep(100);
logger.info("executeWork2================接收到的消息是:" + message);
}
测试类的书写
@Test
public void test2() throws InterruptedException {
for (int i = 0; i < 100; i++) {
producerDemo.sendWorkMessage(i + "");
Thread.sleep(i*10);
}
}
3、发布订阅模式 fanount
image.png
在RabbitMqConfiguration中定义队列,交换机以及将交换机和队列进行绑定,当然队列和交换机的绑定也是可以通过在客户端(浏览器上进行绑定)
@Bean
public Queue fanoutQueueA() {
return new Queue("fanout-queue-A");
}
@Bean
public Queue fanoutQueueB() {
return new Queue("fanout-queue-B");
}
@Bean
public Queue fanoutQueueC() {
return new Queue("fanout-queue-C");
}
// 交换机的声明
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout-exchange");
}
/**
* 将交换机fanoutExchange与队列fanoutQueueA进行绑定 该种模式的路由key是无效的
* @param fanoutQueueA
* @param fanoutExchange
* @return
*/
@Bean
public Binding bindingFanoutExchangeToFanoutQueueA(Queue fanoutQueueA, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueueA).to(fanoutExchange);
}
/**
* 将交换机fanoutExchange与队列fanoutQueueA进行绑定 该种模式的路由key是无效的
* @param fanoutQueueB
* @param fanoutExchange
* @return
*/
@Bean
public Binding bindingFanoutExchangeToFanoutQueueB(Queue fanoutQueueB, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueueB).to(fanoutExchange);
}
/**
* 将交换机fanoutExchange与队列fanoutQueueA进行绑定 该种模式的路由key是无效的
* @param fanoutQueueC
* @param fanoutExchange
* @return
*/
@Bean
public Binding bindingFanoutExchangeToFanoutQueueC(Queue fanoutQueueC, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueueC).to(fanoutExchange);
}
在生产者类中RabbitMqProducerDemo添加
/**
* 发送消息的方法
* @param message 消息体
* @param exchangeName 交换机的名字
* @param routingKey 路由key
*/
public void sendMessage(String message,String exchangeName,String routingKey) {
// 发生消息 这里的 routingKey 是无效的
rabbitTemplate.convertAndSend(exchangeName,routingKey,message);
logger.info("==================RabbitMqProducerDemo " + exchangeName + ":" + routingKey + " send success:" + message);
}
在消费者类中添加
@RabbitListener(queues = "fanout-queue-A")
public void fanoutQueueA(String message) throws InterruptedException {
logger.info("fanoutQueueA================接收到的消息是:" + message);
}
@RabbitListener(queues = "fanout-queue-B")
public void fanoutQueueB(String message) throws InterruptedException {
logger.info("fanoutQueueB================接收到的消息是:" + message);
}
@RabbitListener(queues = "fanout-queue-C")
public void fanoutQueueC(String message) throws InterruptedException {
logger.info("fanoutQueueC================接收到的消息是:" + message);
}
测试
@Test
public void test3() throws InterruptedException {
producerDemo.sendMessage("哈哈A","fanout-exchange","fanout-queue-A");
producerDemo.sendMessage("哈哈B","fanout-exchange","fanout-queue-B");
producerDemo.sendMessage("哈哈C","fanout-exchange","fanout-queue-C");
}
4、路由模式
image.png
在RabbitMqConfiguration中定义队列,交换机以及将交换机和队列进行绑定,当然队列和交换机的绑定也是可以通过在客户端(浏览器上进行绑定)
// 定义队列
@Bean
public Queue directQueueA() {
return new Queue("direct-queue-A");
}
@Bean
public Queue directQueueB() {
return new Queue("direct-queue-B");
}
@Bean
public Queue directQueueC() {
return new Queue("direct-queue-C");
}
// 声明交换机
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct-exchange");
}
// 交换机与队列进行绑定并定义routingKey
@Bean
public Binding bindingDirectQueueAToDirectExchangeU(Queue directQueueA, DirectExchange directExchange) {
return BindingBuilder.bind(directQueueA).to(directExchange).with("update");
}
@Bean
public Binding bindingDirectQueueAToDirectExchangeI(Queue directQueueA, DirectExchange directExchange) {
return BindingBuilder.bind(directQueueA).to(directExchange).with("insert");
}
@Bean
public Binding bindingDirectQueueAToDirectExchangeD(Queue directQueueA, DirectExchange directExchange) {
return BindingBuilder.bind(directQueueA).to(directExchange).with("delete");
}
@Bean
public Binding bindingDirectQueueBToDirectExchangeD(Queue directQueueB, DirectExchange directExchange) {
return BindingBuilder.bind(directQueueB).to(directExchange).with("delete");
}
@Bean
public Binding bindingDirectQueueCToDirectExchangeI(Queue directQueueC, DirectExchange directExchange) {
return BindingBuilder.bind(directQueueC).to(directExchange).with("insert");
}
在消费者类中添加
@RabbitListener(queues = "direct-queue-A")
public void directQueueA(String message) throws InterruptedException {
logger.info("directQueueA================接收到的消息是:" + message);
}
@RabbitListener(queues = "direct-queue-B")
public void directQueueB(String message) throws InterruptedException {
logger.info("directQueueB================接收到的消息是:" + message);
}
@RabbitListener(queues = "direct-queue-C")
public void directQueueC(String message) throws InterruptedException {
logger.info("directQueueC================接收到的消息是:" + message);
}
测试类
@Test
public void test4() {
producerDemo.sendMessage("哈哈-update","direct-exchange","update");
producerDemo.sendMessage("哈哈-insert","direct-exchange","insert");
producerDemo.sendMessage("哈哈-delete","direct-exchange","delete");
}
5、通配符模式
在RabbitMqConfiguration中定义队列,交换机以及将交换机和队列进行绑定,当然队列和交换机的绑定也是可以通过在客户端(浏览器上进行绑定)
// 声明通配符模式的队列
@Bean
public Queue topicQueueA() {
return new Queue("topic-queue-A");
}
@Bean
public Queue topicQueueB() {
return new Queue("topic-queue-B");
}
// 声明通配符模式交换机
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topic-exchange");
}
// 通配符模式下交换机与队列进行绑定并定义routingKey
@Bean
public Binding bindingTopicQueueAToTopicExchange(Queue topicQueueA, TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueueA).to(topicExchange).with("topic.update");
}
@Bean
public Binding bindingTopicQueueBToTopicExchangeI(Queue topicQueueB, TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueueB).to(topicExchange).with("topic.#");
}
在消费者类中添加
@RabbitListener(queues = "topic-queue-A")
public void topicQueueA(String message) throws InterruptedException {
logger.info("topicQueueA================接收到的消息是:" + message);
}
@RabbitListener(queues = "topic-queue-B")
public void topicQueueB(String message) throws InterruptedException {
logger.info("topicQueueB================接收到的消息是:" + message);
}
测试类
@Test
public void test5() {
producerDemo.sendMessage("哈哈-update","topic-exchange","topic.update");
producerDemo.sendMessage("哈哈-insert","topic-exchange","topic.insert");
producerDemo.sendMessage("哈哈-delete","topic-exchange","topic.delete");
}
四、 SpringBoot集成RabbitMq扩展
当然如果你对这种形式的配置不习惯的话,你也可以使用外部的配置文件来使用
在启动类上使用@ImportResource注解来引入其他的xml配置文件
image.png
五、 SpringBoot配置多个RabbitMq
有时候由于业务的复杂性,需要配置多个RabbitMq
步骤
1、在application.properties中增加一个rabbitmq配置
image.png
2、在配置类中增加
image.png
3、新建一个生产者
image.png
4、新建一个消费者
image.png
5、测试类
image.png
网友评论