在rabbitmq中,我们可以通过持久化数据,解决rabbitmq的服务器异常 的数据丢失问题。问题:生产者将消息发送出去后,是否到达rabbitmq服务器?默认的情况下是不知道的。
- 两种方式解决:
- AMQP 实现了事物机制
- confirm 模式
-
AMQP 事物机制
- txSelect 用户将当前channel设置成transation模式
- txCommit 用于提交事物
- txRollback 回滚事物
这种模式比较耗时,降低了rabbitmq的吞吐量
-
confirm模式
- 生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker回传给生产者的确认消息中delivery-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理;
- confirm模式最大的好处是 异步
- rabbitmq如果服务器异常或者崩溃,就会发送一个nack消息
- 开启confirm模式 `channel.confirmSelect();
- 编程模式
- 普通 发一条 waitForConfirm()
- 批量 发一批 waitForConfirms()
- 异步confirm模式 提供一个回调方法
- confirm单条
- image.png
- confirm多条
public class SendMany {
private static final String QUEUE_NAME = "confirm_test_1";
public static void main(String[] args) throws IOException, InterruptedException {
Connection connections = ConnectionUtils.getConnections();
Channel channel = connections.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 生产者调用confirmSelect 将channel设置为confirm模式
channel.confirmSelect();
String msg = "hello confirm msg";
System.out.println("send--confirm---" + msg);
for (int i = 0; i < 10; i++) {
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
}
if (!channel.waitForConfirms()) {
System.out.println("massage send failed");
} else {
System.out.println("massage send ok");
}
connections.close();
}
}
- 异步confirm模式
- Channel对象提供的ConfirmListener()回调方法只包含deliveryTag(当前Chanel发出的消息序号),我们需要自己为每一个Channel维护一个unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构。实际上,SDK中的waitForConfirms()方法也是通过SortedSet维护消息序号的。
public class Send3 {
//下面代码并没有处理handleAck和handleNack
private static final String QUEUE_NAME = "confirm_test_1";
public static void main(String[] args) throws IOException, InterruptedException {
Connection connections = ConnectionUtils.getConnections();
Channel channel = connections.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 生产者调用confirmSelect 将channel设置为confirm模式
channel.confirmSelect();
// 存放未确认的消息标识
final SortedSet<Long> confirmSet = Collections.synchronizedNavigableSet(new TreeSet<Long>());
// 添加通道监听
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long deliverTag, boolean mutiple) throws IOException {
if (mutiple) {
System.out.println("---handleAck-------mutiple----");
confirmSet.headSet(deliverTag + 1).clear();
} else {
System.out.println("---handleAck-------mutiple---false");
confirmSet.remove(deliverTag);
}
}
public void handleNack(long deliverTag, boolean mutiple) throws IOException {
if (mutiple) {
System.out.println("---handleNack-------mutiple----");
confirmSet.headSet(deliverTag + 1).clear();
} else {
System.out.println("---handleNack-------mutiple---false");
confirmSet.remove(deliverTag);
}
}
});
String msg = "hello confirm msg";
while (true) {
long seqNo = channel.getNextPublishSeqNo();
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
confirmSet.add(seqNo);
}
}
}
网友评论