各位看官可以关注博主个人博客,了解更多信息。
作者:Surpasser
链接地址:https://surpass.org.cn
前言
上篇我们说到了消息队列RabbitMQ的模式概念,那么这里将会针对模式使用SpringBoot联合RabbitMQ做一个案例,实现消息的生产和消费。
这一篇也是这个主题的最后一篇了,建议配合着看。助于理解。
博主会将Demo工程放在Gitee上,有兴趣的可以拉下来自己试试。
Gitee地址:https://gitee.com/lemon_ant/os.git
正文
准备工作
新建SpringBoot项目
添加配置文件
server.port=8080
spring.application.name=cl
#RabbitMq所在服务器IP
spring.rabbitmq.host=127.0.0.1
#连接端口号
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=root
#用户密码
spring.rabbitmq.password=123456
# 开启发送确认
spring.rabbitmq.publisher-confirm-type=correlated
# 开启发送失败退回
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.virtual-host=/
添加pom文件
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
启动类
@SpringBootApplication
public class OsApplication {
public static void main(String[] args) {
SpringApplication.run(OsApplication.class, args);
}
}
点对点模式
-
队列初始化
//当没有这个队列的时候会自动创建 @Configuration public class PointInitialization { @Bean Queue toPoint(){ Queue queue = new Queue("point.to.point",true); return queue; } }
-
生产者
@Component public class PointProducer { @Autowired private RabbitTemplate rabbitTemplate; public void send(String name){ String sendMsg = "点对点队列:" + name + " " + new Date(); //指定队列 this.rabbitTemplate.convertAndSend("point.to.point",sendMsg); } }
-
消费者
@Component public class PointConsumer { //监听的队列名 @RabbitListener(queues = "point.to.point") public void processOne(String name) { System.out.println("point.to.point:" + name); } }
-
测试类(模仿控制层)
@RestController @RequestMapping("/Point") public class PointController { @Autowired private PointProducer sayProducer; @RequestMapping("/point/{name}") public String send(@PathVariable String name){ sayProducer.send(name); return "发送成功"; } }
-
使用postman模拟请求
- 控制台结果
[图片上传失败...(image-3e7425-1591871192134)]
work模式
-
队列初始化
@Configuration public class WorkInitialization { //当没有这个队列的时候会自动创建 @Bean Queue work(){ Queue queue = new Queue("WorkingMode",true); return queue; } }
-
生产者
@Component public class WorkProducer { @Autowired private RabbitTemplate rabbitTemplate; public void send(String name){ String sendMsg = "工作模式:" + name + " " + new Date(); //指定队列 this.rabbitTemplate.convertAndSend("WorkingMode",sendMsg); } }
-
消费者
//三个队列同时监听 @Component public class WorkConsumer { @RabbitListener(queues = "WorkingMode") public void processOne(String name) { System.out.println("WorkingMode1:" + name); } @RabbitListener(queues = "WorkingMode") public void processTwo(String name) { System.out.println("WorkingMode2:" + name); } @RabbitListener(queues = "WorkingMode") public void processThree(String name) { System.out.println("WorkingMode3:" + name); } }
-
测试类(模仿控制层)
@RestController @RequestMapping("/work") public class WorkController { @Autowired private WorkProducer sayProducer; @RequestMapping("/work/{name}") public String send(@PathVariable String name){ sayProducer.send(name); return "发送成功"; } }
-
使用postman模拟请求
- 控制台结果
[图片上传失败...(image-e9beeb-1591871192134)]
<font color=red>注意看时间,说明消息是轮询分发的,一个消息只由一个消费者消费。</font>
发布/订阅者模式(Publish/Subscribe)
-
队列初始化
//类型为fanout @Configuration public class PublishInitialization { //当没有这个队列的时候会自动创建 @Bean Queue publishOne(){ Queue queue = new Queue("queue.publish.one",true); return queue; } @Bean Queue publishTwo(){ Queue queue = new Queue("queue.publish.two",true); return queue; } @Bean Queue publishThree(){ Queue queue = new Queue("queue.publish.three",true); return queue; } //创建交换器 @Bean FanoutExchange pulishExchange(){ FanoutExchange directExchange = new FanoutExchange("publishExchange"); return directExchange; } //绑定队列(不用指定routing key),参数名字要和bean名字一致 @Bean Binding bindingPublishOne(Queue publishOne,FanoutExchange pulishExchange){ Binding binding = BindingBuilder.bind(publishOne).to(pulishExchange); return binding; } @Bean Binding bindingPublishTwo(Queue publishTwo,FanoutExchange pulishExchange){ Binding binding = BindingBuilder.bind(publishTwo).to(pulishExchange); return binding; } @Bean Binding bindingPublishThree(Queue publishThree,FanoutExchange pulishExchange){ Binding binding = BindingBuilder.bind(publishThree).to(pulishExchange); return binding; } }
-
生产者
@Component public class PublishProducer { @Autowired private RabbitTemplate rabbitTemplate; public void send(String name){ String sendMsg = "发布订阅模式:" + name + " " + new Date(); //指定队列 this.rabbitTemplate.convertAndSend("publishExchange","",sendMsg); } }
-
消费者
@Component public class PublishConsumer { @RabbitListener(queues = "queue.publish.one") public void processOne(String name) { System.out.println("queue.publish.one:" + name); } @RabbitListener(queues = "queue.publish.two") public void processTwo(String name) { System.out.println("queue.publish.two:" + name); } @RabbitListener(queues = "queue.publish.three") public void processThree(String name) { System.out.println("queue.publish.three:" + name); } }
-
测试类(模仿控制层)
@RestController @RequestMapping("/Publish") public class PublishController { @Autowired private PublishProducer sayProducer; @RequestMapping("/publish/{name}") public String send(@PathVariable String name){ sayProducer.send(name); return "发送成功"; } }
-
使用postman模拟请求
- 控制台结果
<font color=red>注意看时间,交换机会将消息推送到所有绑定到它的队列。</font>
路由模式
-
队列初始化
//类型为direct @Configuration public class RoutingInitialization { //当没有这个队列的时候会自动创建 @Bean Queue routingOne(){ Queue queue = new Queue("queue.routing.one",true); return queue; } @Bean Queue routingTwo(){ Queue queue = new Queue("queue.routing.two",true); return queue; } @Bean Queue routingThree(){ Queue queue = new Queue("queue.routing.three",true); return queue; } //创建交换器 @Bean DirectExchange routingExchange(){ DirectExchange directExchange = new DirectExchange("routingExchange"); return directExchange; } //绑定队列 @Bean Binding bindingRoutingOne(Queue routingOne,DirectExchange routingExchange){ Binding binding = BindingBuilder.bind(routingOne).to(routingExchange).with("1"); return binding; } @Bean Binding bindingRoutingTwo(Queue routingTwo,DirectExchange routingExchange){ Binding binding = BindingBuilder.bind(routingTwo).to(routingExchange).with("2"); return binding; } @Bean Binding bindingRoutingThree(Queue routingThree,DirectExchange routingExchange){ Binding binding = BindingBuilder.bind(routingThree).to(routingExchange).with("3"); return binding; } }
-
生产者
@Component public class RoutingProducer { @Autowired private RabbitTemplate rabbitTemplate; public void send(String type){ String sendMsg = "路由模式:" + type + " " + new Date(); //指定队列 if (type.equals("1")){ this.rabbitTemplate.convertAndSend("routingExchange","1",sendMsg); } if (type.equals("2")){ this.rabbitTemplate.convertAndSend("routingExchange","2",sendMsg); } if (type.equals("3")){ this.rabbitTemplate.convertAndSend("routingExchange","3",sendMsg); } } }
-
消费者
@Component public class RoutingConsumer { @RabbitListener(queues = "queue.routing.one") public void processOne(String name) { System.out.println("queue.routing.one:" + name); } @RabbitListener(queues = "queue.routing.two") public void processTwo(String name) { System.out.println("queue.routing.two:" + name); } @RabbitListener(queues = "queue.routing.three") public void processThree(String name) { System.out.println("queue.routing.three:" + name); } }
-
测试类(模仿控制层)
@RestController @RequestMapping("/Routing") public class RoutingController { @Autowired private RoutingProducer sayProducer; @RequestMapping("/routing/{name}") public String send(@PathVariable String name){ sayProducer.send(name); return "发送成功"; } }
-
使用postman模拟请求
[图片上传失败...(image-1c2e63-1591871192134)]
[图片上传失败...(image-c0c993-1591871192134)]
<font color=red>我这里测试传的就是routing key,方便看。</font>
- 控制台结果
[图片上传失败...(image-610dd9-1591871192134)]
[图片上传失败...(image-d8749-1591871192134)]
[图片上传失败...(image-25a6c2-1591871192134)]
<font color=red>这里用时间来区别。</font>
主题模式(Topic)
-
队列初始化
//类型为topic @Configuration public class TopicInitialization { //当没有这个队列的时候会自动创建 @Bean Queue topicOne(){ Queue queue = new Queue("queue.topic.one",true); return queue; } @Bean Queue topicTwo(){ Queue queue = new Queue("queue.topic.two",true); return queue; } @Bean Queue topicThree(){ Queue queue = new Queue("queue.topic.three",true); return queue; } //创建交换器 @Bean TopicExchange topicExchange(){ TopicExchange directExchange = new TopicExchange("topicExchange"); return directExchange; } //绑定队列 @Bean Binding bindingTopicOne(Queue topicOne,TopicExchange topicExchange){ Binding binding = BindingBuilder.bind(topicOne).to(topicExchange).with("#.error"); return binding; } @Bean Binding bindingTopicTwo(Queue topicTwo,TopicExchange topicExchange){ Binding binding = BindingBuilder.bind(topicTwo).to(topicExchange).with("#.log"); return binding; } @Bean Binding bindingTopicThree(Queue topicThree,TopicExchange topicExchange){ Binding binding = BindingBuilder.bind(topicThree).to(topicExchange).with("good.#.timer"); return binding; } }
-
生产者
@Component public class TopicProducer { @Autowired private RabbitTemplate rabbitTemplate; public void send(String routing){ String sendMsg = "主题模式:" + routing + " " + new Date(); //指定队列 this.rabbitTemplate.convertAndSend("topicExchange",routing,sendMsg); } }
-
消费者
@Component public class TopicConsumer { @RabbitListener(queues = "queue.topic.one") public void processOne(String name) { System.out.println("queue.topic.one:" + name); } @RabbitListener(queues = "queue.topic.two") public void processTwo(String name) { System.out.println("queue.topic.two:" + name); } @RabbitListener(queues = "queue.topic.three") public void processThree(String name) { System.out.println("queue.topic.three:" + name); } }
-
测试类(模仿控制层)
@RestController @RequestMapping("/Topic") public class TopicController { @Autowired private TopicProducer sayProducer; @RequestMapping("/topic/{type}") public String send(@PathVariable String type){ sayProducer.send(type); return "发送成功"; } }
-
请求以及对应结果
-
[图片上传失败...(image-7d504-1591871192134)]
[图片上传失败...(image-9566f5-1591871192134)] -
[图片上传失败...(image-c1a54c-1591871192134)]
[图片上传失败...(image-5ee38a-1591871192134)]
-
[图片上传失败...(image-dc3b14-1591871192134)]
[图片上传失败...(image-13d98b-1591871192134)]
<font color=red>注意看请求的key和打印日志的对应关系。</font>
-
尾言
消息队列在这里基本就结束了,结合前面两篇基本就能够了解队列的基本概念和用法了。
网友评论