接这篇
在上文中,主要实现了可靠模式的consumer。而可靠模式的sender实现的相对简略,主要通过rabbitTemplate来完成。
本以为这样的实现基本是没有问题的。但是前段时间做了一个性能压力测试,但是发现在使用rabbitTemplate时,会有一定的丢数据问题。
当时的场景是用30个线程,无间隔的向rabbitmq发送数据,但是当运行一段时间后发现,会出现一些connection closed错误,rabbitTemplate虽然进行了自动重连,但是在重连的过程中,丢失了一部分数据。当时发送了300万条数据,丢失在2000条左右。
这种丢失率,对于一些对一致性要求很高的应用(比如扣款,转账)来说,是不可接受的。
在google了很久之后,在stackoverflow上找到rabbitTemplate作者对于这种问题的解决方案,他给的方案很简单,单纯的增加connection数:
connectionFactory.setChannelCacheSize(100);
修改之后,确实不再出现connection closed这种错误了,在发送了3000万条数据后,一条都没有丢失。
似乎问题已经完美的解决了,但是我又想到一个问题:当我们的网络在发生抖动时,这种方式还是不是安全的?
换句话说,如果我强制切断客户端和rabbitmq服务端的连接,数据还会丢失吗?
为了验证这种场景,我重新发送300万条数据,在发送过程中,在rabbitmq的管理界面上点击强制关闭连接:

然后发现,仍然存在丢失数据的问题。
看来这个问题,没有想象中的那么简单了。
在阅读了部分rabbitTemplate的代码之后发现:
1 rabbitTemplate的ack确认机制是异步的
2 这种确认机制是一种事后发现机制,并不能同步的发现问题
也就是说,即便打开了
connectionFactory.setPublisherConfirms(true);
rabbitTemplate.setMandatory(true);
并且实现了:
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (!ack) {
log.info("send message failed: " + cause + correlationData.toString());
}
});
依旧是不安全的。
rabbitTemplate的发送流程是这样的:
1 发送数据并返回(不确认rabbitmq服务器已成功接收)
2 异步的接收从rabbitmq返回的ack确认信息
3 收到ack后调用confirmCallback函数
注意:在confirmCallback中是没有原message的,所以无法在这个函数中调用重发,confirmCallback只有一个通知的作用
在这种情况下,如果在2,3步中任何时候切断连接,我们都无法确认数据是否真的已经成功发送出去,从而造成数据丢失的问题。
最完美的解决方案只有1种:
使用rabbitmq的事务机制。
但是在这种情况下,rabbitmq的效率极低,每秒钟处理的message在几百条左右。实在不可取。
第二种解决方式,使用同步的发送机制,也就是说,客户端发送数据,rabbitmq收到后返回ack,再收到ack后,send函数才返回。代码类似这样:
创建channel
send message
wait for ack(or 超时)
close channel
返回成功or失败
同样的,由于每次发送message都要重新建立连接,效率很低。
基于上面的分析,我们使用一种新的方式来做到数据的不丢失。
在rabbitTemplate异步确认的基础上
1 在本地缓存已发送的message
2 通过confirmCallback或者被确认的ack,将被确认的message从本地删除
3 定时扫描本地的message,如果大于一定时间未被确认,则重发
当然了,这种解决方式也有一定的问题:
想象这种场景,rabbitmq接收到了消息,在发送ack确认时,网络断了,造成客户端没有收到ack,重发消息。(相比于丢失消息,重发消息要好解决的多,我们可以在consumer端做到幂等)。
自动重试的代码如下:
public class RetryCache {
private MessageSender sender;
private boolean stop = false;
private Map<String, MessageWithTime> map = new ConcurrentHashMap<>();
private AtomicLong id = new AtomicLong();
@NoArgsConstructor
@AllArgsConstructor
@Data
private static class MessageWithTime {
long time;
Object message;
}
public void setSender(MessageSender sender) {
this.sender = sender;
startRetry();
}
public String generateId() {
return "" + id.incrementAndGet();
}
public void add(String id, Object message) {
map.put(id, new MessageWithTime(System.currentTimeMillis(), message));
}
public void del(String id) {
map.remove(id);
}
private void startRetry() {
new Thread(() ->{
while (!stop) {
try {
Thread.sleep(Constants.RETRY_TIME_INTERVAL);
} catch (InterruptedException e) {
e.printStackTrace();
}
long now = System.currentTimeMillis();
for (String key : map.keySet()) {
MessageWithTime messageWithTime = map.get(key);
if (null != messageWithTime) {
if (messageWithTime.getTime() + 3 * Constants.VALID_TIME < now) {
log.info("send message failed after 3 min " + messageWithTime);
del(key);
} else if (messageWithTime.getTime() + Constants.VALID_TIME < now) {
DetailRes detailRes = sender.send(messageWithTime.getMessage());
if (detailRes.isSuccess()) {
del(key);
}
}
}
}
}
}).start();
}
}
在client端发送之前,先在本地缓存message,代码如下:
@Override
public DetailRes send(Object message) {
try {
String id = retryCache.generateId();
retryCache.add(id, message);
rabbitTemplate.correlationConvertAndSend(message, new CorrelationData(id));
} catch (Exception e) {
return new DetailRes(false, "");
}
return new DetailRes(true, "");
}
在收到ack时删除本地缓存,代码如下:
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (!ack) {
log.info("send message failed: " + cause + correlationData.toString());
} else {
retryCache.del(correlationData.getId());
}
});
再次验证刚才的场景,发送300w条数据,在发送的过程中过一段时间close一次connection,发送结束后,实际发送数据301.2w条,有一些重复,但是没有丢失数据。
同时需要验证本地缓存的内存泄露问题,程序连续发送1.5亿条数据,内存占用稳定在900M,并没有明显的波动。
最后贴一下rabbitmq的性能测试数据:
1 300w条1k的数据,单机部署rabbitmq(8核,32G)
在ack确认模式下平均发送效率为1.1w条/秒
非ack确认模式下平均发送效率为1.6w条/秒
2 300w条1k的数据,cluster模式部署3台(8核*3, 32G*3)
在ack确认模式下平均发送效率为1.3w条/秒
非ack确认模型下平均发送效率为1.7w条/秒
3 300w条1k的数据,单机部署rabbitmq(8核,32G)
在ack确认模式下平均消费效率为9000条/秒
4 300w条1k的数据,cluster模式部署3台(8核*3, 32G*3)
在ack确认模式下平均消费效率为1w条/秒
代码地址:
帮忙点个星星,谢谢-_-
网友评论
“在confirmCallback中是没有原message的,",没有message,但是有CorrelationData啊,完全可以自己的实现CorrelationData,然后rabbitTemplate.convertAndSend()作为参数一起发送,这样confirmCallback时不就有message了吗。。
我想问, exchange、queue持久化了吗?官网有一段描述 和你说的完全不一致。
```
When Will Published Messages Be Confirmed by the Broker?
For unroutable messages, the broker will issue a confirm once the exchange verifies a message won't route to any queue (returns an empty list of queues). If the message is also published as mandatory, the basic.return is sent to the client before basic.ack. The same is true for negative acknowledgements (basic.nack).
For routable messages, the basic.ack is sent when a message has been accepted by all the queues. For persistent messages routed to durable queues, this means persisting to disk. For mirrored queues, this means that all mirrors have accepted the message.
```
对于持久化消息,是在持久化到磁盘时才ack,楼主可以解释一下吗
emmm。。异步应该是指生产者client不必等待ack响应,可以一直向broker发送消息吧
我想问一下,我通过ReturnCallback和ConfirmCallback 来重发 消息 不好的地方在哪呢
我们通过ack只是来保证这个消息被rabbitmq收到了,不会再丢失了,仅此而已。至于这个消息是落到内存就ok,还是说写入page cache,写入物理磁盘,这些只是rabbitmq的实现,不需要关心。
if (!ack) {
log.info("send message failed: " + cause + correlationData.toString());
} else {
System.out.println("retryCache.del(correlationData.getId());");
}
});
大神,请问这个方法的ack值为FALSE的情况下,
try {
rabbitTemplate.correlationConvertAndSend(message, new CorrelationData("generateId()"));/*generateId()*/
} catch (Exception e) {
能否捕捉到
}
if (!ack) {
log.info("send message failed: " + cause + correlationData.toString());
}
中throw出来就好。
一般exchange写错这种,不会有异常抛出的,异常一般是rabbitmq服务端出了问题,或者出现了网络问题。
你说的这个场景,首先发送 1条,失败,这个时候,map中会增加 1条 重试,1分钟后重试,重试时再次尝试发送,这个发送时间本身,会往map中再放一条,这样map中会有2条。如果本次发送是成功的,这两条都将被删除,否则,map中会有2条。第三次,map中会有3条。。。
14@@2018-05-16 13:17:20######## 3
@@2018-05-16 13:18:13+++ map.size= 1
@@2018-05-16 13:19:21+++ map.size= 1
15@@2018-05-16 13:20:49######## 3
@@2018-05-16 13:22:55+++ map.size= 2
16@@2018-05-16 13:23:03######## 3
@@2018-05-16 13:25:38+++ map.size= 2
17@@2018-05-16 13:25:42######## 3
@@2018-05-16 13:27:03+++ map.size= 2
18@@2018-05-16 13:27:03######## 3
@@2018-05-16 13:28:03+++ map.size= 2
19@@2018-05-16 13:28:03######## 3
20@@2018-05-16 13:28:03######## 3
@@2018-05-16 13:29:03+++ map.size= 4
21@@2018-05-16 13:29:03######## 3
22@@2018-05-16 13:29:03######## 3
23@@2018-05-16 13:29:03######## 3
@@2018-05-16 13:30:44+++ map.size= 6
24@@2018-05-16 13:30:44######## 3
25@@2018-05-16 13:30:44######## 3
26@@2018-05-16 13:30:44######## 3
27@@2018-05-16 13:30:44######## 3
28@@2018-05-16 13:30:44######## 3
@@2018-05-16 13:31:44+++ map.size= 10
29@@2018-05-16 13:31:44######## 3
30@@2018-05-16 13:31:44######## 3
31@@2018-05-16 13:31:44######## 3
32@@2018-05-16 13:31:44######## 3
33@@2018-05-16 13:31:44######## 3
34@@2018-05-16 13:31:44######## 3
35@@2018-05-16 13:31:44######## 3
36@@2018-05-16 13:31:44######## 3
报错代码:Channel channel = connection.createChannel(false);
报错信息:java.lang.IllegalStateException: The ApplicationContext is closed and the ConnectionFactory can no longer create connections.
我的理解是当我关闭应用的时候,您的connctionFactory关闭了,但是线程还在跑,并尝试去建立rabbitMQ连接,但是由于connctionFactory不再提供connction导致这里连接通道失败,不知道是不是这样产生的。
我的所有的改造后的代码:https://github.com/zhaoqicheng/springboot-rabbitmq-learning
如果大神有时间...能不能帮我参考一下这个问题
这里我已经在您的代码里面发现了,3min之后未消费则重新发送该条消息。另外一个方法没有明白:
rabbitTemplate.setReturnCallback((message, replyCode, replyText, tmpExchange, tmpRoutingKey)
这个方法在这里的作用是什么呢...
@Override
public DetailRes send(Object message) {
try {
String id = retryCache.generateId();
retryCache.add(id, message);
rabbitTemplate.correlationConvertAndSend(message, new CorrelationData(id));
} catch (Exception e) {
return new DetailRes(false, "");
}
return new DetailRes(true, "");
}
难道不抛异常就能证明发送成功了么?
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
导致mq服务器 connection 太多崩溃,试过很多方法,connection都无法释放,
这个地方应该是增加通道数吧。connectionCacheSize跟channelCacheSize是有区别的。