持久化:
pom文件
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--rabbitmq-->
yml
mqIp: 106.2.17.XXX
spring:
rabbitmq:
addresses: ${mqIp}:5672
username: xx
password: xx
virtual-host: /
publisher-confrems: true
publisher-returns: true
listener:
simple:
acknowledge-mode: manual
retry:
enabled: true #是否开启重试
initial-interval: 3000ms #重试时间间隔
max-attempts: 3 #重试次数
max-interval: 15000ms #重试最大时间间隔
multiplier: 2 #倍数
配置config
package org.rcisoft.internal.core.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQClusterConfig {
/*
* @Autowired RabbitTemplate rabbitTemplate;
*
* @Autowired AmqpTemplate amqpTemplate;
*/
public static final String QUEUE_NAME = "node3Queue";
public static final String EXCHANGE_NAME="fyExchange";
// 创建持久化的node3Queue
@Bean
Queue queue() {
return QueueBuilder.durable(QUEUE_NAME).build();
}
// 创建持久化的Exchange
@Bean
org.springframework.amqp.core.Exchange exchange() {
return ExchangeBuilder.directExchange(EXCHANGE_NAME).durable(true).build();
}
// 创建绑定
@Bean
Binding bindings(){
return BindingBuilder.bind(queue()).to(exchange()).with("").and(null);
}
}
工具类
package org.rcisoft.internal.core.util;
import cn.hutool.json.JSONUtil;
import org.rcisoft.internal.core.config.RabbitMQClusterConfig;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
import java.util.Map;
@Component
public class RabbitClusterUtil {
@Autowired
RabbitTemplate rabbitTemplate;
//发送消息方法调用: 构建Message消息
public void sendMsg(Object message) {
// 发布持久化消息
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
//id + 时间戳 全局唯一
CorrelationData correlationData = new CorrelationData("1234567890");
rabbitTemplate.convertAndSend(RabbitMQClusterConfig.EXCHANGE_NAME, "", JSONUtil.toJsonStr(JSONUtil.parseObj(message)).getBytes(), correlationData);
}
public Object getMsg() throws UnsupportedEncodingException {
// 发布持久化消息
Object message = rabbitTemplate.receiveAndConvert(RabbitMQClusterConfig.QUEUE_NAME, 2000L);
if(message==null) {
return "没有任何数据";
}
String msg = null;
if (message instanceof Message) {
msg = new String(((Message) message).getBody(), "utf-8");
}
msg = new String((byte[])message, "utf-8");
return msg;
}
}
消费者
package com.isoft.common.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.client.utils.JSONUtils;
import com.isoft.framework.config.RabbitMQClusterConfig;
import com.isoft.project.monitor.domain.SysOperLog;
import com.isoft.project.monitor.service.ISysOperLogService;
import com.rabbitmq.client.Channel;
import com.rabbitmq.tools.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@Slf4j
@Component
public class RabbitMQReceiver {
@Autowired
RabbitTemplate rabbitTemplate;
@Autowired
private ISysOperLogService operLogService;
List<SysOperLog> sysOperLogList = new ArrayList<>();
@RabbitListener(queues = RabbitMQClusterConfig.ZT_QUEUE_NAME)
public void directConsumer(String msg, Channel channel, Message message) throws IOException {
try{
//这里表示消息消费成功
msg = new String(((Message) message).getBody(), "utf-8");
SysOperLog sysOperLog = JSON.parseObject(msg,SysOperLog.class);
sysOperLogList.add(sysOperLog);
if (sysOperLogList.size() >=10){
operLogService.insertBatchOperlog(sysOperLogList);
}
//确认消息,false表示不批量处理
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e){
//这里表示消息消费失败,当消息消费失败时,如果没有进行处理,会导致MQ中Unacked的消息越来越多,最终占用内存越来越大
//这里也返回ACK,记录报错日志,后续根据日志进行恢复处理
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
log.error("消息消费失败:{}", e.getMessage());
}
}
}
### @RabbitListener 监听写在配置文件里
~~~
// 交换器名称和队列
@Value("${spring.rabbitmq.operLogQueues}")
public String[] operLogQueues;
@Bean
public String[] operLogQueues(){
return operLogQueues;
}
@RabbitListener(queues = {"#{operLogQueues}"})
public void directConsumer(String msg, Channel channel, Message message) throws IOException {
}
@RabbitListener(queues = "${spring.rabbitmq.msgQueue}")
public void msgConsumer(String msg, Channel channel, Message message) throws IOException {
try{
//这里表示消息消费成功
synchronized (msgLock) {
msg = new String(((Message) message).getBody(), "utf-8");
MessageInstanceDTO messageInstance = JSON.parseObject(msg,MessageInstanceDTO.class);
msgList.add(messageInstance);
if (msgList.size() >= 20){
messageInstanceMapper.saveBatch(msgList);
msgList.clear();
}
}
//确认消息,false表示不批量处理
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e){
//这里表示消息消费失败,当消息消费失败时,如果没有进行处理,会导致MQ中Unacked的消息越来越多,最终占用内存越来越大
//这里也返回ACK,记录报错日志,后续根据日志进行恢复处理
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
log.error("消息消费失败:{}", e.getMessage());
}
}
~~~
在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?如果不进行特殊配置的话,默认情况下发布操作是不会返回任何信息给生产者的,也就是默认情况下我们的生产者是不知道消息有没有正确到达broker的,如果在消息到达broker之前已经丢失的话,持久化操作也解决不了这个问题,因为消息根本就没到达代理服务器,你怎么进行持久化呢?
但是持久化会占用
RabbitMQ为我们提供了两种方式:
方式一:通过AMQP事务机制实现,这也是从AMQP协议层面提供的解决方案;
方式二:通过将channel设置成confirm模式来实现;
网友评论