官网:https://spring.io/guides/gs/messaging-rabbitmq/
RabbitMQ 介绍
RabbitMQ是实现AMQP(高级消息队列协议)消息中间件的一种, kafuka是另外一种, 本文只介绍RabbitMQ的使用方法.
RabbitMQ主要是为了实现系统之间的双向解耦而实现的,消息的发送者无需知道消息使用者的存在,反之亦然.
当生产者大量产生数据时, 消费者无法快速消费, 那么需要一个中间层,保存这个数据.
AMQP的主要特征是面向消息(Message)、队列(Queue)、路由(Exchange包括点对点和发布/订阅)、可靠性、安全
对于RabbitMQ来说, 除了生产者, 消费者, 消息队列以外, 还添加了一个模块,即交换机(Exchange).
它使得生产者和消息队列之间产生了隔离,生产者将消息发送给交换机,而交换机则根据调度策略把相应的消息转发给对应的消息队列.
交换机(Exchange, 有时候又叫路由)的主要作用是接收相应的消息并且绑定到指定的队列.
交换机有四种类型,分别为 Direct ,topic,Fanout,headers(本文只介绍前3种)
3种交换类型的性能比较
fanout > direct >> topic, 比例大约为11:10:6
3种方式的工作过程
3种模式对比.jpg程序讲解
首先需要引入依赖
增加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Type A: Direct
没有路由交换的概念, 生产者发送Message到指定的队列, 消费者从指定的队列中获得消息), 最简单的一种模式
是RabbitMQ默认的交换机模式,也是最简单的模式.创建消息队列的时候,指定一个BindingKey.
当发送者发送消息的时候,指定对应的Key.当Key和消息队列的BindingKey一致的时候,消息将会被发送到该消息队列中.
发送端
SenderConf.java
package com.tts.sender;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//相当于初始化一个queue
@Configuration
public class SenderConf {
@Bean
public Queue queue(){
return new Queue("queue");
}
}
SenderController.java
package com.tts.sender;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class SenderController {
@Autowired
private AmqpTemplate amqpTemplate;
@GetMapping("/send")
public void send(){
amqpTemplate.convertAndSend("queue","hello, rabbit.");
}
}
接收端
MyReceiver.java
@Component
@Slf4j
public class MyReceiver {
@RabbitListener(queues = "queue")
public void Receiver(String msg){
log.info("Recieve:{}", msg);
}
}
Type B: Topic
消息由交换机(Exchange)转发, 在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息。
路由键(我理解的是queue的名字)必须是一串字符,用句号(.)隔开,比如说 message.boy.china.children, 或者 message.boy.china等.
星号(*),主要用于匹配路由键指定位置的一个单词,比如说,一个路由模式是这样子:message..children.*,那么就只能匹配路由键是这样子的: 第一个单词是 message, 第四个单词是children。
井号(#)就表示相当于一个或者多个单词,例如一个匹配模式是agreements.eu.berlin.#,那么,以agreements.eu.berlin开头的路由键都是可以的。
发送端
SenderConf.java
package com.tts.sender;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class SenderConf {
//1: 初始化消息队列topic.message
@Bean(name = "message")
public Queue queueMessage(){
return new Queue("topic.message");
}
//2: 初始化消息队列topic.messages
@Bean(name = "messages")
public Queue queueMessages(){
return new Queue("topic.messages");
}
//3: 配置交换机
@Bean
public TopicExchange exchange(){
return new TopicExchange("exchange");
}
//绑定消息到交换机上, 需要匹配topic.message
@Bean
Binding bindingExchangeMessage(@Qualifier("message") Queue queueMessage, TopicExchange exchange){
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}
//绑定消息到交换机上, 需要匹配topic即可
@Bean
Binding bindingExchangeMessages(@Qualifier("messages") Queue queueMessages, TopicExchange exchange){
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}
}
SenderController.java
package com.tts.sender;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class SenderController {
@Autowired
private AmqpTemplate amqpTemplate;
//topic.message, topic.messages都可以收到消息
@GetMapping("/send")
public void send(){
amqpTemplate.convertAndSend("exchange","topic.message","hello, rabbit.");
}
//只有topic.messages可以收到消息
@GetMapping("/send")
public void send(){
amqpTemplate.convertAndSend("exchange","topic.messages","hello, rabbit.");
}
}
接收端
MyReceiver.java
package com.tts.receiver;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class MyReceiver {
@RabbitListener(queues = "topic.message")
public void process1(String msg){
log.info("Message:{}", msg);
}
@RabbitListener(queues = "topic.messages")
public void process2(String msgs){
log.info("Messages:{}", msgs);
}
}
Type C: Fanout
Fanout与direct的区别是, direct使用了默认的 Exchange "", fanout可以自己指定, 我理解是direct和topic的中间形式
任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上。
1.可以理解为路由表的模式
2.这种模式不需要RouteKey(即使使用了, 也不起作用)
3.这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定。
4.如果接受到消息的Exchange没有与任何Queue绑定,则消息会被抛弃。
发送端
SenderConf.java
package com.tts.sender;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class SenderConf {
@Bean(name="aMessage")
public Queue aMessage(){
return new Queue("fanout.A");
}
@Bean(name="bMessage")
public Queue bMessage(){
return new Queue("fanout.B");
}
@Bean(name="cMessage")
public Queue cMessage(){
return new Queue("fanout.C");
}
@Bean
FanoutExchange fanoutExchange(){
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeA(@Qualifier("aMessage") Queue Message, FanoutExchange fanoutExchange){
return BindingBuilder.bind(Message).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(@Qualifier("bMessage") Queue Message, FanoutExchange fanoutExchange){
return BindingBuilder.bind(Message).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(@Qualifier("cMessage") Queue Message, FanoutExchange fanoutExchange){
return BindingBuilder.bind(Message).to(fanoutExchange);
}
}
SenderController.java
package com.tts.sender;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class SenderController {
@Autowired
private AmqpTemplate amqpTemplate;
@GetMapping("/send")
public void send(){
//参数2可以随便写
amqpTemplate.convertAndSend("fanoutExchange","xxxxx","fanoutExchange MESSAGE!!");
}
}
接收端
MyReceiver.java
package com.tts.receiver;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class MyReceiver {
@RabbitListener(queues = "fanout.A")
public void process1(String msg){
log.info("fanout.A", msg);
}
@RabbitListener(queues = "fanout.B")
public void process2(String msgs){
log.info("fanout.B", msgs);
}
@RabbitListener(queues = "fanout.B")
public void process3(String msgs){
log.info("fanout.C", msgs);
}
}
网友评论