一、消息确认与回调
默认情况下,RabbitMQ发送消息以及接收消息是自动确认的,意思也就是说,消息发送方发送消息的时候,认为消息已经成功发送到了RabbitMQ服务器,而当消息发送给消费者后,RabbitMQ服务器就立即自动确认,然后将消息从队列中删除了。而这样的自动机制会造成消息的丢失,我们常常听到“丢消息”的字眼。
为了解决消息的丢失,RabbitMQ便产生了手动确认的机制:
- 发送者:
- 当消息不能路由到任何队列时,会进行确认失败操作,如果发送方设置了mandatory=true模式,则先会调用basic.return方法,然后调用basic.ack方法;
- 当消息可以路由时,消息被发送到所有绑定的队列时,进行消息的确认basic.ack。
- 接收者:
- 当消息成功被消费时,可以进行消息的确认basic.ack;
- 当消息不能正常被消费时,可以进行消息的反确认basic.nack 或者拒绝basic.reject。
1.1 修改配置
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: mitter
password: mitter
virtual-host: mitter_vhost # 注意:这里前面不能带/,默认的“/”理解成字符串就行,和Linux的目录斜杠还不是一回事
publisher-confirms: true # 消息发送到交换机确认机制,是否确认回调
publisher-returns: true # 消息发送到交换机确认机制,是否返回回调
listener:
simple:
acknowledge-mode: manual # 采用手动应答
concurrency: 1 # 指定最小的消费者数量
max-concurrency: 100 # 指定最大的消费者数量
retry:
enabled: true # 是否支持重试
1.2 配置交换机、队列
@Configuration
public class MeassageAckConfig {
public static final String MESSAGE_ACK_EXCHANGE = "direct-message-ack-exchange";
public static final String MESSAGE_ACK_QUEUE = "message-ack-queue";
public static final String MESSAGE_ACK_ROUTE_KEY = "message.ack.key";
@Bean
public Queue messageAckQueue() {
return QueueBuilder.durable(MESSAGE_ACK_QUEUE).build();
}
@Bean
public DirectExchange directMessageAckExchange() {
return (DirectExchange) ExchangeBuilder.directExchange(MESSAGE_ACK_EXCHANGE).durable(true).build();
}
@Bean
public Binding directMessageBinding(DirectExchange directMessageAckExchange, Queue messageAckQueue) {
return BindingBuilder.bind(messageAckQueue).to(directMessageAckExchange).with(MESSAGE_ACK_ROUTE_KEY);
}
}
1.3 消息生产者
/**
* 生产者
*/
@Component
public class MessageAckProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 给hello队列发送消息
*/
public void send() {
for (int i = 0; i < 100; i++) {
String msg = "hello,序号: " + i;
System.out.println("Producer," + msg);
rabbitTemplate.convertAndSend(MeassageAckConfig.MESSAGE_ACK_EXCHANGE, MeassageAckConfig.MESSAGE_ACK_ROUTE_KEY, msg);
}
}
}
1.4 消息消费者
/**
* 消费者
*/
@Component
public class MessageAckConsumer {
private static final Logger logger = LoggerFactory.getLogger(MessageAckConsumer.class);
@RabbitListener(queues = MeassageAckConfig.MESSAGE_ACK_QUEUE)
public void process(Message message, Channel channel) {
try {
// 采用手动应答模式,手动确认应答更为安全稳定
logger.info("receive: " + new String(message.getBody()));
// 制造异常,向队列回放消息
if ("\"hello,序号: 50\"".equals(new String(message.getBody()))) {
int a = 1/0;
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
logger.error("process error by message: {} and channel: {}", message.getMessageProperties().getCorrelationId(),
channel.getConnection().getAddress());
/*try {
// 拒绝消息,multiple=false,值拒绝当前的消息,requeue=true,重新放回队列
// 一般不回放,不然会一致消费,记录日志查找原因
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
} catch (IOException ex) {
ex.printStackTrace();
}*/
}
}
}
1.5 测试
@RestController
public class MessageAckController {
@Autowired
private MessageAckProducer messageAckProducer;
@GetMapping(value = "/messageAck")
public void testMessageAck() {
messageAckProducer.send();
}
}
1604840762465.png
七、消息序列化
- 涉及网络传输的应用序列化不可避免,发送端以某种规则将消息转成 byte 数组进行发送,接收端则以约定的规则进行 byte[] 数组的解析
- RabbitMQ 的序列化是指 Message 的 body 属性,即我们真正需要传输的内容,RabbitMQ 抽象出一个 MessageConvert 接口处理消息的序列化,其实现有 SimpleMessageConverter(默认)、Jackson2JsonMessageConverter 等
- 当调用了 convertAndSend 方法时会使用 MessageConvert 进行消息的序列化
- SimpleMessageConverter 对于要发送的消息体 body 为 byte[] 时不进行处理,如果是 String 则转成字节数组,如果是 Java 对象,则使用 jdk 序列化将消息转成字节数组,转出来的结果较大,含class类名,类相应方法等信息。因此性能较差
- 当使用 RabbitMQ 作为中间件时,数据量比较大,此时就要考虑使用类似 Jackson2JsonMessageConverter 等序列化形式以此提高性能
序列化配置:
@Configuration
public class RabbitConfig {
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(this.connectionFactory());
// 生产者端发送消息,使用Jackson2JsonMessageConverter序列化
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
// 消息发送失败返回到队列中, yml需要配置 publisher-returns: true
rabbitTemplate.setMandatory(true);
// 消息确认, yml需要配置 publisher-confirms: true
// 消息回调
// ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调,即消息发送到exchange ack
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
logger.debug("消息发送到exchange成功!");
} else {
logger.debug("消息发送到exchange失败,原因: {}", cause);
}
});
// 消息返回, yml需要配置 publisher-returns: true
// ReturnCallback接口用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调,即消息发送不到任何一个队列中 ack
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
String correlationId = message.getMessageProperties().getCorrelationId();
logger.debug("消息:{} 发送失败,应答码:{} 原因:{} 交换机: {} 路由键: {}", correlationId, replyCode, replyText, exchange, routingKey);
});
return rabbitTemplate;
}
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// 消费者端,接收消息使用Jackson2JsonMessageConverter反序列化
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
}
发送消息:
@RestController
public class MessageSerializableController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping(value = "/messageSerializable/{name}")
public void messageSerializable(@PathVariable(value = "name") String name) {
User user = new User(name, 20, new Date());
rabbitTemplate.convertAndSend(QueueConstant.QUEUE_NOTIFY_TEST, user);
}
}
接收消息:
@Component
public class MessageSerilizableConsumer {
/**
* 使用@Payload获取body信息
* @param user 直接反序列化的对象
*/
@RabbitListener(queues = {QueueConstant.QUEUE_NOTIFY_TEST})
public void receiveTestQueue(@Payload User user) {
System.out.println(user.getName());
}
}
测试:
1604844242748.png八、Headers
Headers应用场景,比如对于headers中有某些属性的消息可以选择性处理,对应管理台的Headers:
1604845932396.png
生产者:
@RestController
public class HeadersController {
@Autowired
private RabbitTemplate rabbitTemplate;
private final static MessagePostProcessor MESSAGE_POST_PROCESSOR = message -> {
message.getMessageProperties().setContentType("application/json");
message.getMessageProperties().setContentEncoding("UTF-8");
// 设置Headers
message.getMessageProperties().setHeader("name", "mitter");
return message;
};
@GetMapping(value = "/headers/{name}")
public void headers(@PathVariable(value = "name") String name) {
User user = new User(name, 20, new Date());
rabbitTemplate.convertAndSend(QueueConstant.QUEUE_NOTIFY_TEST, user, MESSAGE_POST_PROCESSOR);
}
}
消费者:
@Component
public class MessageSerilizableConsumer {
/**
* 使用@Payload获取body信息,使用@Headers获取Headers信息
* @param user 直接反序列化的对象
*/
@RabbitListener(queues = {QueueConstant.QUEUE_NOTIFY_TEST})
public void receiveTestQueue(@Payload User user, @Headers Map<String,Object> headers) {
System.out.println(user.getName());
System.out.println(JSON.toJSONString(headers));
}
}
网友评论