前言
- 消息队列(Message Queue),是分布式系统中重要的组件,其通用的使用场景可以简单地描述为:当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息队列的时候。消息队列主要解决了应用耦合、异步处理、流量削锋等问题。
- RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
一、RabbitMQ安装
由于RabbitMQ是基于erlang的,所以,在正式安装RabbitMQ之前,需要先安装一下erlang。具体安装过程可参考windows10环境下的RabbitMQ安装步骤(图文)。
二、pom.xml文件
这里是通过idea直接勾选得到的。
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
三、配置文件
配置rabbitMQ的地址。
server:
port: 8080
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
四、使用RabbitMQ
-
简单使用
- 队列配置
@Configuration
public class RabbitConfig {
@Bean
public Queue helloQueue() {
return new Queue("hello");
}
}
- 发送者
@Component
public class HelloSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "hello " + new Date();
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("hello", context);
}
}
- 接收者
@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver : " + hello);
}
}
- controller类
@RestController
public class TestController {
@Autowired
private HelloSender helloSender;
@GetMapping("hello")
public String helloTest() {
helloSender.send();
return "success";
}
}
-
一对多
- 队列配置
@Configuration
public class RabbitConfig {
@Bean
public Queue oneQueue() {
return new Queue("oneQueue");
}
}
- 发送者
@Component
public class OneSender {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(int i) {
String context = "spirng boot oneQueue queue"+" ****** "+i;
System.out.println("Sender1" + context);
this.amqpTemplate.convertAndSend("oneQueue", context);
}
}
- 接收者
两个接收者。
@Component
@RabbitListener(queues = "oneQueue")
public class OneReceiver1 {
@RabbitHandler
public void process(String Str) {
System.out.println("Receiver1:" + Str);
}
}
@Component
@RabbitListener(queues = "oneQueue")
public class OneReceiver2 {
@RabbitHandler
public void process(String Str) {
System.out.println("Receiver2:" + Str);
}
}
- controller类
@RestController
public class TestController {
@Autowired
private OneSender oneSender;
@GetMapping("oneToMany")
public String oneToManyTest() {
for (int i = 0; i < 10; i++) {
oneSender.send(i);
}
return "success";
}
}
- 总结
一个发送者,N个接受者,经过测试会均匀的将消息发送到N个接收者中。
-
多对多
- 队列配置
@Configuration
public class RabbitConfig {
@Bean
public Queue manyQueue() {
return new Queue("manyQueue");
}
}
- 发送者
@Component
public class ManySender1 {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(int i) {
String context = i + "";
System.out.println("Sender1: " + context + "--send:");
amqpTemplate.convertAndSend("manyQueue", context);
}
}
@Component
public class ManySender2 {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(int i) {
String context = i + "";
System.out.println("Sender1: " + context + "--send:");
amqpTemplate.convertAndSend("manyQueue", context);
}
}
- 接收者
@Component
@RabbitListener(queues = "manyQueue")
public class ManyReceiver1 {
@RabbitHandler
public void process(String Str) {
System.out.println("Receiver1:" + Str);
}
}
@Component
@RabbitListener(queues = "manyQueue")
public class ManyReceiver2 {
@RabbitHandler
public void process(String Str) {
System.out.println("Receiver2:" + Str);
}
}
- controller类
@RestController
public class TestController {
@Autowired
private ManySender1 manySender1;
@Autowired
private ManySender2 manySender2;
@GetMapping("manyToMany")
public String manyToManyTest() {
for (int i = 0; i < 10; i++) {
manySender1.send(i);
manySender2.send(i);
}
return "success";
}
}
- 总结
和一对多一样,接收端仍然会均匀接收到消息。
-
对象的支持
- 队列配置
@Configuration
public class RabbitConfig {
@Bean
public Queue queue3() {
return new Queue("object_queue");
}
}
- 实体类
public class User implements Serializable {
private String username;
private String password;
public User(String username, String password) {
this.username = username;
this.password = password;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
@Override
public String toString() {
return "User{" +
"username='" + username + '\'' +
", password='" + password + '\'' +
'}';
}
}
- 发送者
@Component
public class ObjectSender {
@Autowired
AmqpTemplate amqpTemplate;
public void sendUser(User user) {
System.out.println("Send object:" + user.toString());
this.amqpTemplate.convertAndSend("object_queue", user);
}
}
- 接收者
@Component
@RabbitListener(queues = "object_queue")
public class ObjectReceiver {
@RabbitHandler
public void objectReceiver(User user) {
System.out.println("Receiver object:" + user.toString());
}
}
- controller类
@RestController
public class TestController {
@Autowired
private ObjectSender objectSender;
@GetMapping("objectSender")
public String objectSenderTest() {
User user = new User("admin", "123456");
objectSender.sendUser(user);
return "success";
}
}
-
Topic Exchange
- 队列配置
@Configuration
public class TopicRabbitConfig {
final static String message = "topic.message";
final static String messages = "topic.messages";
//创建两个 Queue
@Bean
public Queue queueMessage() {
return new Queue(TopicRabbitConfig.message);
}
@Bean
public Queue queueMessages() {
return new Queue(TopicRabbitConfig.messages);
}
//配置 TopicExchange,指定名称为 topicExchange
@Bean
TopicExchange exchange() {
return new TopicExchange("topicExchange");
}
//给队列绑定 exchange 和 routing_key
@Bean
Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}
@Bean
Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}
}
- 发送者
@Component
public class TopicSender {
@Autowired
AmqpTemplate amqpTemplate;
public void send1() {
String context = "hi, i am message 1";
System.out.println("Sender : " + context);
amqpTemplate.convertAndSend("topicExchange", "topic.message", context);
}
public void send2() {
String context = "hi, i am messages 2";
System.out.println("Sender : " + context);
amqpTemplate.convertAndSend("topicExchange", "topic.messages", context);
}
}
- 接收者
@Component
@RabbitListener(queues = "topic.message")
public class TopicReceiver1 {
@RabbitHandler
public void process(String message){
System.out.println("Receiver topic.message :"+ message);
}
}
@Component
@RabbitListener(queues = "topic.messages")
public class TopicReceiver2 {
@RabbitHandler
public void process(String message){
System.out.println("Receiver topic.messages: "+ message);
}
}
- controller类
@RestController
public class TestController {
@Autowired
private TopicSender topicSender;
@GetMapping("topicSender")
public String topicSenderTest() {
topicSender.send1();
// topicSender.send2();
return "success";
}
}
- 总结
使用 queueMessages 同时匹配两个队列,queueMessage 只匹配 “topic.message” 队列。
-
Fanout Exchange
- 队列配置
@Configuration
public class FanoutRabbitConfig {
//创建三个队列
@Bean
public Queue AMessage() {
return new Queue("fanout.A");
}
@Bean
public Queue BMessage() {
return new Queue("fanout.B");
}
@Bean
public Queue CMessage() {
return new Queue("fanout.C");
}
//创建exchange,指定交换策略
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
//分别给三个队列指定exchange,这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略:
@Bean
public Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}
@Bean
public Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(BMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(CMessage).to(fanoutExchange);
}
}
- 发送者
@Component
public class FanoutSender {
@Autowired
AmqpTemplate amqpTemplate;
public void send(){
String context = "hi, fanout msg ";
System.out.println("Sender : " + context);
//这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略:
amqpTemplate.convertAndSend("fanoutExchange","", context);
}
}
- 接收者
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {
@RabbitHandler
public void process(String message){
System.out.println("Receiver form fanout.A: "+message);
}
}
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {
@RabbitHandler
public void process(String message){
System.out.println("Receiver form fanout.B: "+message);
}
}
@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC {
@RabbitHandler
public void process(String message){
System.out.println("Receiver form fanout.C: "+message);
}
}
- controller类
@RestController
public class TestController {
@Autowired
private FanoutSender fanoutSender;
@GetMapping("fanoutSender")
public String fanoutSenderTest() {
fanoutSender.send();
return "success";
}
}
- 总结
绑定到 fanout 交换机上面的队列都收到了消息。
网友评论