RabbitMQ消息确认(消费者端消息确认)
// 该代码针对消费者
autoAck = true;
channel.basicConsume(QUEUE_NAME, autoAck , deliverCallback, consumerTag -> { });
当我们设置autoAck参数为true时,RabbitMQ一旦向消费者发送了一条消息,便立即将其标记为删除,而不会管消费者是否正确处理该消息。当消费者在处理该消息途中挂掉,那么当前处理的消息便会丢失,并且还会丢失所有发送给该消费者且未处理的消息。
为确保消息不丢失,RabbitMQ支持消息确认。消费者消费结束后返回确认,以告知RabbitMQ已经接受并处理了该消息,RabbitMQ在收到确认信息后,可以自由的删除它。
如果RabbitMQ发送一条消息给消费者后,没有收到确认信息。RabbitMQ会有两种处理方案:
第一种,如果消费者在不发送确认的情况下死亡(其通道已关闭,连接已关闭或TCP链接丢失),RabbitMQ会将该消息重新排队。
第二种,消费者挂掉,但通道未关闭,RabbitMQ收不到消息超时。RabbitMQ会重新发送消息,直至通道关闭或成功收到确认消息为止。
需要注意的是,RabbitMQ无法释放任何未确认的消息,这将导致RabbitMQ消耗越来越多的内存。可以通过以下命令调试该错误。
# linux
sudo rabbitmqctl list_queues name message_ready messages_unacknowledged
# window
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
RabbitMQ的消息确认(生产者端消息确认,事务)
public class Send {
private static final String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msgString = "hello tx message";
try{
channel.txSelect(); // 启动事务
channel.basicPublish("", QUEUE_NAME, null, msgString.getBytes());
channel.txCommit(); // 事务提交
}catch (Exception e){
channel.txRollback(); // 回滚
}
}
}
类似于mysql中的事务,当出现异常时,会执行回滚,该消息不会发送。这种方式会极大的影响消息队列吞吐量,
RabbitMQ消息确认Confirm(同步)
单条消息confirm
public class send {
private static final String QUEUE_NAME = "confirm";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 将channel设置为confirm模式
channel.confirmSelect();
// 单条消息发送
String msgString = "hello confirm message";
channel.basicPublish("", QUEUE_NAME, null, msgString.getBytes());
if(!channel.waitForConfirms()){
System.out.println("message send failed");
}else{
System.out.println("message send ok");
}
channel.close();
connection.close();
}
}
批量消息confirm
public class send {
private static final String QUEUE_NAME = "confirm";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 将channel设置为confirm模式
channel.confirmSelect();
// 批量消息发送
String msgString = "hello confirm message";
for(int i = 0; i < 20; i++){
channel.basicPublish("", QUEUE_NAME, null, msgString.getBytes());
}
if(!channel.waitForConfirms()){
System.out.println("message send failed");
}else{
System.out.println("message send ok");
}
channel.close();
connection.close();
}
}
RabbitMQ消息确认Confirm(异步)
public class AsynConfirm {
private static String QUEUE_NAME = "confirm_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Connection conn = ConnectionUtils.getConnection();
Channel channel = conn.createChannel();
// 创建map用于记录发送的消息
ConcurrentNavigableMap<Long, String> map = new ConcurrentSkipListMap<>();
// 启用confirm
channel.confirmSelect();
// 消息成功发送后的回调函数
ConfirmCallback successCallBack = (sequenceNumber, multiple) -> {
if(multiple){
ConcurrentNavigableMap<Long, String> confirmed = map.headMap(sequenceNumber, true);
confirmed.clear();
}else{
map.remove(sequenceNumber);
}
};
// 消息失败后的回调函数
ConfirmCallback errorCallBack = (sequenceNumber, multiple) -> {
String body = map.get(sequenceNumber);
System.err.format(
"Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
body, sequenceNumber, multiple
);
};
// 添加监听器
channel.addConfirmListener(successCallBack, errorCallBack);
// 消息发送
while(true){
String msg = "hello confirm";
// 消息的ID
long no = channel.getNextPublishSeqNo();
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
map.put(no, msg);
}
}
}
RabbitMQ消息持久化
上面提到的消息确认机制,可以防止消费者意外死亡时不丢失消息。但如果RabbitMQ服务器意外死亡了,该如何保证消息队列中的消息不丢失。
这时候就需要将消息队列中的消息持久化到本地,当RabbitMQ意外挂掉后再重启,会从本地读取数据到消息队列中(内存)。
开启队列持久化,需要同时修改消费者及生产者队列声明代码
// 开启消息队列持久化
boolean durable = true;
channel.queueDeclare("queue_name", durable, false, false, null);
注意:
该方法并不能完全保证不会丢失消息。尽管它告诉RabbitMQ将消息保存到磁盘,但是RabbitMQ接受消息到将其写入磁盘,途中仍有一小段时间。另外,RabbitMQ不会对每条消息都执行异步写入磁盘的操作,但是对于简单队列而言,这已经绰绰有余了。
网友评论