基本知识
为什么使用消息中间件
在有多个系统组成的应用中,常常出现A系统会影响B、C、D数据的情况。通常做法是在A中调用其他系统的接口,这样系统之间的依赖太大,如果后续添加新的系统,就需要在A中添加相应的逻辑。这样做耦合程度太大,不利于维护。
加入MQ后,A系统中不用添加其他系统的调用,只需要发送消息,其他系统监听消息,在自己系统中处理。新增或者删除也不需要改动A系统的代码,只需要在自己中取消该类型的消息监听就行。
很多时候涉及多服务之间调用的情况,客户端发起请求,A中回去调用B、C、D的接口,最后再将执行结果返回到客户端,这样一个流程中A接口的执行时间,收到其他服务的影响,是他们执行时间的总和,如果A不关心B、C、D他们的执行情况,就可以使用MQ。A发送消息后直接返回,从而提升接口的响应时间。
当系统面临大量请求时,会对数据库造成很大压力,引入MQ后,你可以根据数据库的实际处理能力,每次从MQ中拿一定数量的数据处理,处理完从中取。
生产者/消费者
RabbitMQ 的安装
普通安装
直接去官网下载安装包。
docker安装
// 拉去镜像
docker pull rabbitmq
// 启动容器
docker run -it --name rabbitmq \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 \
-d rabbitmq
// 进入容器开启管理界面
docker exec -it rabbitmq sh
//开启管理界面
rabbitmq-plugins enable rabbitmq_management
通过访问 http://localhost:15672/ 即可看到管理界面
在Springboot集成RabbitMQ
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置队列和交换机
@Configuration
public class RabbitmqConfig {
@Bean
public Queue msgQueue(){
/**
* name: 队列名称
* durable 是否持久化
* exclusive 是否是排他队列 只有创建者可以使用
* autoDelete 声明此队列为临时队列,最后一个消费者使用完自动删除
*/
return new Queue("MSG_MQ", true, false, false);
}
@Bean
public DirectExchange msgExchange(){
return new DirectExchange("MSG_ECHANGE", true, false);
}
@Bean
public Binding mailBinding(){
return BindingBuilder
.bind(mailQueue())
.to(msgExchange())
.with("MSG_ROUTING");
}
}
生产者发送消息
@RequestMapping("/v1/demo")
@RestController
public class DemoController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sed_queue")
public void sendMsg(){
rabbitTemplate.convertAndSend("MSG_ECHANGE", "MSG_ROUTING", "你好惨:"+ System.currentTimeMillis());
}
}
消费者接受消息
使用 @RabbitListener 去监听消息队列,队列中有消息了就去消费
@Component
public class RabbitListner {
@RabbitListener(queuesToDeclare = @Queue("MSG_MQ"))
public void handleMsg(String msg){
System.out.println("msg-"+ msg);
}
}
queues 和queuesToDeclare 不同点 :使用queuesToDeclare时,服务启动时回去MQ中检测监听的队列是否存在,没有这个队列会就会去创建
RabbitMQ的组成
- Broker:消息队列服务进程。此进程包括两个部分:Exchange和Queue。
- Exchange:消息队列交换机。按一定的规则将消息路由转发到某个队列。
- Queue:消息队列,存储消息的队列。
- Producer:消息生产者。生产方客户端将消息同交换机路由发送到队列中。
- Consumer:消息消费者。消费队列中存储的消息。
四种交换机
- DirectExchange :直连交换机,需要绑定一个队列,同时需要指定 routeKey 值,类似于点对点发送。上面demo中使用的就是 DirectExchange
- FanoutExchange : 将有需要的队列与此交换机绑定后,一个发送到交换机上的消息会被转发到与这个交换机相连的所有队列上。这种模式类似于发布订阅。
@Bean
public Queue faQueue1(){
/**
* name: 队列名称
* durable 是否持久化
* exclusive 是否是排他队列 只有创建者可以使用
* autoDelete 声明此队列为临时队列,最后一个消费者使用完自动删除
*/
return new Queue("fa.queue1", true, false, false);
}
@Bean
public Queue faQueue2(){
/**
* name: 队列名称
* durable 是否持久化
* exclusive 是否是排他队列 只有创建者可以使用
* autoDelete 声明此队列为临时队列,最后一个消费者使用完自动删除
*/
return new Queue("fa.queue2", true, false, false);
}
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanout.exchange", true, false);
}
@Bean
public Binding bindingFanoutExchange(){
return BindingBuilder
.bind(faQueue1())
.to(fanoutExchange());
}
@Bean
public Binding bindingFanoutExchange1(){
return BindingBuilder
.bind(faQueue2())
.to(fanoutExchange());
}
@RabbitListener(queuesToDeclare = @Queue("fa.queue1"))
public void faQueue1(String msg){
System.out.println("faQueue1-"+msg);
}
@RabbitListener(queuesToDeclare = @Queue("fa.queue2"))
public void faQueue2(String msg){
System.out.println("faQueue2-"+msg);
}
@GetMapping("/sed_fanout")
public void sendFanoutMsg(){
rabbitTemplate.convertAndSend("fanout.exchange", null, "fanoutExchange:"+ System.currentTimeMillis());
}
- TopicExchange : 主题交换机又可以叫通配符交换机。这种交换机通过通配符去匹配,然后路由到对应的队列。通配符 # 和 * 分别代表匹配多个和一个。
@Bean
public Queue topicQueue1(){
/**
* name: 队列名称
* durable 是否持久化
* exclusive 是否是排他队列 只有创建者可以使用
* autoDelete 声明此队列为临时队列,最后一个消费者使用完自动删除
*/
return new Queue("topic.queue1", true, false, false);
}
@Bean
public Queue topicQueue2(){
/**
* name: 队列名称
* durable 是否持久化
* exclusive 是否是排他队列 只有创建者可以使用
* autoDelete 声明此队列为临时队列,最后一个消费者使用完自动删除
*/
return new Queue("topic.queue2", true, false, false);
}
@Bean
public TopicExchange topicExchange1(){
return new TopicExchange("topic.exchange1", true, false);
}
@Bean
public Binding topicBinding1(){
return BindingBuilder
.bind(topicQueue1())
.to(topicExchange1())
.with("top.*");
}
@Bean
public Binding topicBinding2(){
return BindingBuilder
.bind(topicQueue2())
.to(topicExchange1())
.with("top.#");
}
@RabbitListener(queuesToDeclare = @Queue("topic.queue1"))
public void topicQueue1(String msg){
System.out.println("topicQueue1-"+msg);
}
@RabbitListener(queuesToDeclare = @Queue("topic.queue2"))
public void topicQueue2(String msg){
System.out.println("topicQueue2-"+msg);
}
@GetMapping("/sed_topic")
public void sendFanoutMsg(String key){
rabbitTemplate.convertAndSend("topic.exchange1", key, "TopicExchange:"+ System.currentTimeMillis());
}
- HeadersExchange : 这种交换机用的相对没这么多。它跟上面三种有点区别,它的路由不是用routingKey进行路由匹配,而是在匹配请求头中所带的键值进行路由。这种交换机用的不多。
@Bean
public Queue headQueue(){
/**
* name: 队列名称
* durable 是否持久化
* exclusive 是否是排他队列 只有创建者可以使用
* autoDelete 声明此队列为临时队列,最后一个消费者使用完自动删除
*/
return new Queue("head.queue1", true, false, false);
}
@Bean
public Queue headQueue1(){
/**
* name: 队列名称
* durable 是否持久化
* exclusive 是否是排他队列 只有创建者可以使用
* autoDelete 声明此队列为临时队列,最后一个消费者使用完自动删除
*/
return new Queue("head.queue2", true, false, false);
}
@Bean
public HeadersExchange headersExchange(){
return new HeadersExchange("head.exchange", true, false);
}
@Bean
public Binding headBinding(){
Map<String, Object> headers = new HashMap<>();
headers.put("abk", "asd");
return BindingBuilder
.bind(headQueue())
.to(headersExchange())
.whereAll(headers)
.match();
}
@Bean
public Binding headBinding1(){
Map<String, Object> headers = new HashMap<>();
headers.put("abk", "ack");
return BindingBuilder
.bind(headQueue1())
.to(headersExchange())
.whereAll(headers)
.match();
}
@RabbitListener(queuesToDeclare = @Queue("head.queue1"))
public void headQueue1(String msg){
System.out.println("headQueue1-"+msg);
}
@RabbitListener(queuesToDeclare = @Queue("head.queue2"))
public void headQueue2(String msg){
System.out.println("headQueue2-"+msg);
}
@GetMapping("/sed_head_msg")
public void sendHeaderMsg1(@RequestParam String msg,
@RequestBody Map<String, Object> map){
MessageProperties messageProperties = new MessageProperties();
//消息持久化
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
messageProperties.setContentType("UTF-8");
//添加消息
messageProperties.getHeaders().putAll(map);
Message message = new Message(msg.getBytes(), messageProperties);
rabbitTemplate.convertAndSend("head.exchange", null, message);
}
用postman调用请求
后台打印出
说明匹配到了 head.queue2
同理,设置head的值
消息可靠性
图中显示的是一条消息传递的整个过程,我们大致可以分析出那些环节会导致消息不可靠或者说消息丢失。
- 生产者发送消息到MQ过程中,MQ挂了,这时会出现消息丢失。
- 生产者发送消息到MQ但是没有持久化队列,消费者还没消费,MQ挂了,消息会丢失。
- 消费者消费了消息,但是出现报错或者程序挂了,这时消息也丢失了。
针对于以上的是三种情况,Rabbit为我们提供了对应的解决方案:持久化、confirm机制、ACK事务机制。
消息持久化
配置Exchange持久化和Queue持久化。
在创建Queue 和Exchange时设置 durable 为true
你也可以使用默认值,默认为true
交换机同样如此
消息确认机制
在生产者发送消息到MQ这段过程中,MQ挂了,导致消息丢失。Rabbit提供confirm和returnMessage方法去处理消息丢失。
springboot 添加配置
## 新版中使用 publisher-confirm-type 有三个参数
# none(禁用)
# correlated(触发confirm回调)
# simple(具有correlated的功能 同时可以使rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie)
# 旧版中 publisher-confirms 默认 false
spring.rabbitmq.publisher-confirm-type=simple
# 消息没有匹配到队列 触发returnMessage 回调
spring.rabbitmq.publisher-returns= true
# publisher-returns 和 mandatory 同时使用时优先使用 mandatory
spring.rabbitmq.template.mandatory= true
实现 RabbitTemplate.ConfirmCallback , RabbitTemplate.ReturnCallback
@Component
public class RabbitCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String error) {
if(ack){
System.out.println("消息发送成功");
} else {
System.out.println("消息发送失败");
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("replyCode:").append(replyCode).append(",")
.append("replyText:").append(replyText).append(",")
.append("exchange:").append(exchange).append(",")
.append("routingKey:").append(routingKey).append(",");
}
}
没有匹配到路由触发returnMessage
找到交换机触发confirm
没有找到交换机和队列
ACK 事务机制
消息确认机制解决了消息发送MQ这个过程中的问题,ACK则是解决消费者处理过程中消息丢失的问题。
消费者接受消息,在处理过程中出现失败手动拒签,重新放回队列等待再次消费,消费成功后手动签收。
配置手动模式
### 开启手动模式
spring.rabbitmq.listener.simple.acknowledge-mode=manual
## 最小消费者数量
spring.rabbitmq.listener.simple.concurrency=1
## 最大消费者数量
spring.rabbitmq.listener.simple.max-concurrency=1
改造消费者
@RabbitListener(queuesToDeclare = @Queue("MSG_MQ"))
public void handleMsg(String msg, Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
if("success".equals(msg)){
channel.basicAck(deliveryTag, false);
} else if("reply".equals(msg)) {
// basicReject 和 basicNack的区别 basicReject不支持批量 basicNack不支持
// channel.basicReject(deliveryTag, true);
channel.basicNack(deliveryTag, false, true);
} else {
channel.basicNack(deliveryTag, false, false);
}
}
basicAck : 成功确认消息
- deliveryTag: 消息的index
- mutiple: 是否批量确认, 为true时,一次ack所有小于deliveryTag的消息
basicReject: 失败拒绝
- deliveryTag: 消息的index
- requeue: 是否重新放入队列
basicNack: 失败拒绝 - deliveryTag: 消息的index
- mutiple:批量拒绝,一次性拒绝所有小于deliveryTag的消息
- requeue: 是否重新放入队列
ack带来的问题
- nack死循环
reply 的消息重新放入队列后,程序还是处理不了,会出现死循环,不断地消费,放入队列,知道问题解决。 我的想法是用数据库去保存消息信息。然后通过定时任务再去处理或者通过界面反馈通知 - double ack
开启自动ACK的时候又在代码中手动处理,导致一个消息触发两次ack,有一次ack会失败。 - 性能消耗
手动ack模式会比自动模式慢是10倍左右,很多时候使用默认的就行了。 - 手动ack,不及时回复会导致队列异常
网友评论