什么是消息中间件?
- 消息代理规范
-
JMS(Java Message Service)JAVA消息服务:
基于JVM消息代理的规范。ActiveMQ、HornetMQ是JMS实现 -
AMQP(Advanced Message Queuing Protocol):
高级消息队列协议,也是一个消息代理的规范,兼容JMS
RabbitMQ是AMQP的实现
-
- 作用:(异步处理、应用解耦、流量削峰、分布式事务管理)
- 通过消息服务中间件来提升系统异步通信、扩展解耦能力
- 当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地
RabbitMQ
-
RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现。
-
核心概念
-
Message
-
消息,消息是不具名的,它由消息头和消息体组成
-
消息头,包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等
-
Publisher
-
消息的生产者,也是一个向交换器发布消息的客户端应用程序
-
Exchange
-
交换器,将生产者消息路由给服务器中的队列
-
类型有direct(默认),fanout, topic, 和headers,具有不同转发策略
-
Queue
-
消息队列,保存消息直到发送给消费者
-
Binding
-
绑定,用于消息队列和交换器之间的关联
-
Connection
-
网络连接,比如一个TCP连接
-
Consumer
-
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序
-
Virtual Host
-
虚拟主机,表示一批交换器、消息队列和相关对象。
-
vhost 是 AMQP 概念的基础,必须在连接时指定
-
RabbitMQ 默认的 vhost 是
/
-
Broker
-
消息队列服务器实体
-
-
运行机制
消息路由:AMQP 中增加了Exchange 和 Binding 的角色, Binding 决定交换器的消息应该发送到那个队列。
- Exchange 类型
-
direct
点对点模式,消息中的路由键(routing key)如果和 Binding 中的 binding
key 一致, 交换器就将消息发到对应的队列中。 -
fanout
广播模式,每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去 -
topic
将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。
识别通配符: # 匹配 0 个或多个单词, *匹配一个单词
SpringBoot 整合 RabbitMQ
- 导入依赖(前提得安装好RabbitMQ)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--自定义消息转化器Jackson2JsonMessageConverter所需依赖-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
- 配置文件
# 指定rebbitmq服务器主机
spring.rabbitmq.host=192.168.31.162
spring.rabbitmq.username=guest #默认值为guest
spring.rabbitmq.password=guest #默认值为guest
spring.rabbitmq.port=5672 #默认端口号是5672
spring,rabbitmq.virtual-host=/ #配置RabbitMQ虚拟主机路径 /,默认可以省略
- RabbitMQ的使用
RabbitAutoConfiguration中有内部类RabbitTemplateConfiguration,在该类中向容器中分别导入了RabbitTemplate
和AmqpAdmin
在测试类中分别注入:
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private AmqpAdmin amqpAdmin;
- RabbitTemplate消息发送处理组件, 可用来发送和接收消息
//发送消息,利用的是点对点的exchange模式
rabbitTemplate.convertAndSend("amq.direct","ustc","aaaa");
Book book = new Book();
book.setName("西游记");
book.setPrice(23.2f);
//Book要实现Serializable接口
rabbitTemplate.convertAndSend("amq.direct","ustc",book);
//convertAndSend(String exchange,String rutingkey,Object object)
//接收消息
Object o = rabbitTemplate.receiveAndConvert("ustc");
System.out.println(o.getClass()); //class cn.edu.ustc.springboot.bean.Book
System.out.println(o); //Book{name='西游记', price=23.2}
- 由于默认的消息转化器是SimpleMessageConverter,对于对象以jdk序列化方式存储,若要以Json方式存储对象,就要自定义消息转换器。
@Configuration
public class AmqpConfig {
@Bean
public MessageConverter messageConverter() {
//在容器中导入Json的消息转换器
return new Jackson2JsonMessageConverter();
}
}
- AmqpAdmin管理组件,可以利用可视化工具,对RabbitMQ增删改exchange、binding、queue.,当然也可以利用代码进行编写。
//创建Direct类型的Exchange
amqpAdmin.declareExchange(new DirectExchange("admin.direct"));
//创建Queue
amqpAdmin.declareQueue(new Queue("admin.test"));
//将创建的队列与Exchange绑定
amqpAdmin.declareBinding(new Binding("admin.test", Binding.DestinationType.QUEUE,"admin.direct","admin.test",null));
-
消息的监听:
在回调方法上标注@RabbitListener
注解,并设置其属性queues
,注册监听队列,当该队列收到消息时,标注方法遍会调用
可分别使用Message和保存消息所属对象进行消息接收,若使用Object对象进行消息接收,实际上接收到的也是Message
@Service
public class BookService {
@RabbitListener(queues = {"admin.test"})
public void receive1(Book book){
System.out.println("收到消息:"+book);
}
@RabbitListener(queues = {"admin.test"})
public void receive1(Object object){
System.out.println("收到消息:"+object.getClass());
//收到消息:class org.springframework.amqp.core.Message
}
@RabbitListener(queues = {"admin.test"})
public void receive2(Message message){
System.out.println("收到消息"+message.getHeaders()+"---"+message.getPayload());
}
}
网友评论