rabbitmq可靠发送的自动重试机制

作者: littlersmall | 来源:发表于2016-09-07 16:42 被阅读14911次

http://www.jianshu.com/p/4112d78a8753

接这篇

在上文中,主要实现了可靠模式的consumer。而可靠模式的sender实现的相对简略,主要通过rabbitTemplate来完成。
本以为这样的实现基本是没有问题的。但是前段时间做了一个性能压力测试,但是发现在使用rabbitTemplate时,会有一定的丢数据问题。

当时的场景是用30个线程,无间隔的向rabbitmq发送数据,但是当运行一段时间后发现,会出现一些connection closed错误,rabbitTemplate虽然进行了自动重连,但是在重连的过程中,丢失了一部分数据。当时发送了300万条数据,丢失在2000条左右。
这种丢失率,对于一些对一致性要求很高的应用(比如扣款,转账)来说,是不可接受的。

在google了很久之后,在stackoverflow上找到rabbitTemplate作者对于这种问题的解决方案,他给的方案很简单,单纯的增加connection数:

connectionFactory.setChannelCacheSize(100);

修改之后,确实不再出现connection closed这种错误了,在发送了3000万条数据后,一条都没有丢失。
似乎问题已经完美的解决了,但是我又想到一个问题:当我们的网络在发生抖动时,这种方式还是不是安全的?
换句话说,如果我强制切断客户端和rabbitmq服务端的连接,数据还会丢失吗?

为了验证这种场景,我重新发送300万条数据,在发送过程中,在rabbitmq的管理界面上点击强制关闭连接:


然后发现,仍然存在丢失数据的问题。

看来这个问题,没有想象中的那么简单了。

在阅读了部分rabbitTemplate的代码之后发现:
1 rabbitTemplate的ack确认机制是异步的
2 这种确认机制是一种事后发现机制,并不能同步的发现问题
也就是说,即便打开了

connectionFactory.setPublisherConfirms(true);
rabbitTemplate.setMandatory(true);

并且实现了:

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (!ack) {
                log.info("send message failed: " + cause + correlationData.toString());
            } 
        });

依旧是不安全的。
rabbitTemplate的发送流程是这样的:
1 发送数据并返回(不确认rabbitmq服务器已成功接收)
2 异步的接收从rabbitmq返回的ack确认信息
3 收到ack后调用confirmCallback函数
注意:在confirmCallback中是没有原message的,所以无法在这个函数中调用重发,confirmCallback只有一个通知的作用

在这种情况下,如果在2,3步中任何时候切断连接,我们都无法确认数据是否真的已经成功发送出去,从而造成数据丢失的问题。

最完美的解决方案只有1种:
使用rabbitmq的事务机制。
但是在这种情况下,rabbitmq的效率极低,每秒钟处理的message在几百条左右。实在不可取。
第二种解决方式,使用同步的发送机制,也就是说,客户端发送数据,rabbitmq收到后返回ack,再收到ack后,send函数才返回。代码类似这样:

创建channel
send message
wait for ack(or 超时)
close channel
返回成功or失败

同样的,由于每次发送message都要重新建立连接,效率很低。

基于上面的分析,我们使用一种新的方式来做到数据的不丢失。
在rabbitTemplate异步确认的基础上
1 在本地缓存已发送的message
2 通过confirmCallback或者被确认的ack,将被确认的message从本地删除
3 定时扫描本地的message,如果大于一定时间未被确认,则重发

当然了,这种解决方式也有一定的问题
想象这种场景,rabbitmq接收到了消息,在发送ack确认时,网络断了,造成客户端没有收到ack,重发消息。(相比于丢失消息,重发消息要好解决的多,我们可以在consumer端做到幂等)。
自动重试的代码如下:

public class RetryCache {
    private MessageSender sender;
    private boolean stop = false;
    private Map<String, MessageWithTime> map = new ConcurrentHashMap<>();
    private AtomicLong id = new AtomicLong();

    @NoArgsConstructor
    @AllArgsConstructor
    @Data
    private static class MessageWithTime {
        long time;
        Object message;
    }

    public void setSender(MessageSender sender) {
        this.sender = sender;
        startRetry();
    }

    public String generateId() {
        return "" + id.incrementAndGet();
    }

    public void add(String id, Object message) {
        map.put(id, new MessageWithTime(System.currentTimeMillis(), message));
    }

    public void del(String id) {
        map.remove(id);
    }

    private void startRetry() {
        new Thread(() ->{
            while (!stop) {
                try {
                    Thread.sleep(Constants.RETRY_TIME_INTERVAL);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                long now = System.currentTimeMillis();

                for (String key : map.keySet()) {
                    MessageWithTime messageWithTime = map.get(key);

                    if (null != messageWithTime) {
                        if (messageWithTime.getTime() + 3 * Constants.VALID_TIME < now) {
                            log.info("send message failed after 3 min " + messageWithTime);
                            del(key);
                        } else if (messageWithTime.getTime() + Constants.VALID_TIME < now) {
                            DetailRes detailRes = sender.send(messageWithTime.getMessage());

                            if (detailRes.isSuccess()) {
                                del(key);
                            }
                        }
                    }
                }
            }
        }).start();
    }
}

在client端发送之前,先在本地缓存message,代码如下:

@Override
public DetailRes send(Object message) {
    try {
        String id = retryCache.generateId();
        retryCache.add(id, message);
        rabbitTemplate.correlationConvertAndSend(message, new CorrelationData(id));
    } catch (Exception e) {
        return new DetailRes(false, "");
    }

    return new DetailRes(true, "");
}

在收到ack时删除本地缓存,代码如下:

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    if (!ack) {
        log.info("send message failed: " + cause + correlationData.toString());
    } else {
        retryCache.del(correlationData.getId());
    }
});

再次验证刚才的场景,发送300w条数据,在发送的过程中过一段时间close一次connection,发送结束后,实际发送数据301.2w条,有一些重复,但是没有丢失数据。
同时需要验证本地缓存的内存泄露问题,程序连续发送1.5亿条数据,内存占用稳定在900M,并没有明显的波动。

最后贴一下rabbitmq的性能测试数据:
1 300w条1k的数据,单机部署rabbitmq(8核,32G)
在ack确认模式下平均发送效率为1.1w条/秒
非ack确认模式下平均发送效率为1.6w条/秒

2 300w条1k的数据,cluster模式部署3台(8核*3, 32G*3)
在ack确认模式下平均发送效率为1.3w条/秒
非ack确认模型下平均发送效率为1.7w条/秒

3 300w条1k的数据,单机部署rabbitmq(8核,32G)
在ack确认模式下平均消费效率为9000条/秒

4 300w条1k的数据,cluster模式部署3台(8核*3, 32G*3)
在ack确认模式下平均消费效率为1w条/秒


代码地址:

https://github.com/littlersmall/rabbitmq-access

帮忙点个星星,谢谢-_-

相关文章

网友评论

  • 薄飞:还有另外一点是否也有问题呢?
    “在confirmCallback中是没有原message的,",没有message,但是有CorrelationData啊,完全可以自己的实现CorrelationData,然后rabbitTemplate.convertAndSend()作为参数一起发送,这样confirmCallback时不就有message了吗。。
    薄飞:@littlersmall 哦哦 谢楼主解惑 em。。 假如我用reids存储消息 ,但我的redis现在部署在消费端的机器上,这样也怕网络抖动。。那我应该在生产者端另部署一个redis 专用 这样吗?
    littlersmall:@薄飞 这样是可以,但是会增加网络传输量,增加rabbitmq的压力,必要性不大。
  • 薄飞:楼主你好, 感觉有点问题啊。。 关于 “rabbitTemplate的ack确认机制是异步的”。。
    我想问, exchange、queue持久化了吗?官网有一段描述 和你说的完全不一致。
    ```
    When Will Published Messages Be Confirmed by the Broker?
    For unroutable messages, the broker will issue a confirm once the exchange verifies a message won't route to any queue (returns an empty list of queues). If the message is also published as mandatory, the basic.return is sent to the client before basic.ack. The same is true for negative acknowledgements (basic.nack).

    For routable messages, the basic.ack is sent when a message has been accepted by all the queues. For persistent messages routed to durable queues, this means persisting to disk. For mirrored queues, this means that all mirrors have accepted the message.
    ```
    对于持久化消息,是在持久化到磁盘时才ack,楼主可以解释一下吗
    薄飞:@littlersmall 我现在也先把消息存在redis中了
    薄飞:@littlersmall 感谢楼主回复, 我小小的疑惑的地方在于文章中 “ rabbitTemplate的ack确认机制是异步的,依旧是不安全的。 ”
    emmm。。异步应该是指生产者client不必等待ack响应,可以一直向broker发送消息吧
    我想问一下,我通过ReturnCallback和ConfirmCallback 来重发 消息 不好的地方在哪呢
    littlersmall:这个和是否持久化有什么关系呢?
    我们通过ack只是来保证这个消息被rabbitmq收到了,不会再丢失了,仅此而已。至于这个消息是落到内存就ok,还是说写入page cache,写入物理磁盘,这些只是rabbitmq的实现,不需要关心。
  • 4e9b0a134a4c:rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    if (!ack) {
    log.info("send message failed: " + cause + correlationData.toString());
    } else {
    System.out.println("retryCache.del(correlationData.getId());");

    }
    });
    大神,请问这个方法的ack值为FALSE的情况下,
    try {
    rabbitTemplate.correlationConvertAndSend(message, new CorrelationData("generateId()"));/*generateId()*/
    } catch (Exception e) {

    能否捕捉到
    }
    littlersmall:@BetterMan_2b4f 你想要throw出来的话,可以在
    if (!ack) {
    log.info("send message failed: " + cause + correlationData.toString());
    }
    中throw出来就好。
    一般exchange写错这种,不会有异常抛出的,异常一般是rabbitmq服务端出了问题,或者出现了网络问题。
    4e9b0a134a4c:@littlersmall 大神,我通过故意将Exchange名称写错,回调到了ComfirmCallback,ack为False,并且没有跳到try{}Catch中,我打算将你的代码写到项目里去
    littlersmall:@BetterMan_2b4f 没明白要捕捉啥?能具体描述下吗
  • 2895cc77d5bb:遇到这样一个问题, 当mq挂掉后,发送几条消息失败后,map中的数据会成倍增长,出现大量重复数据堆积,请问这是什么原因造成的.
    littlersmall:超过一定时间,会把这条message丢掉的。
    你说的这个场景,首先发送 1条,失败,这个时候,map中会增加 1条 重试,1分钟后重试,重试时再次尝试发送,这个发送时间本身,会往map中再放一条,这样map中会有2条。如果本次发送是成功的,这两条都将被删除,否则,map中会有2条。第三次,map中会有3条。。。
    2895cc77d5bb:retry 间隔设置的是一分钟
    2895cc77d5bb:这是一条数据发送失败后的数据.
    14@@2018-05-16 13:17:20######## 3
    @@2018-05-16 13:18:13+++ map.size= 1
    @@2018-05-16 13:19:21+++ map.size= 1
    15@@2018-05-16 13:20:49######## 3
    @@2018-05-16 13:22:55+++ map.size= 2
    16@@2018-05-16 13:23:03######## 3
    @@2018-05-16 13:25:38+++ map.size= 2
    17@@2018-05-16 13:25:42######## 3
    @@2018-05-16 13:27:03+++ map.size= 2
    18@@2018-05-16 13:27:03######## 3
    @@2018-05-16 13:28:03+++ map.size= 2
    19@@2018-05-16 13:28:03######## 3
    20@@2018-05-16 13:28:03######## 3
    @@2018-05-16 13:29:03+++ map.size= 4
    21@@2018-05-16 13:29:03######## 3
    22@@2018-05-16 13:29:03######## 3
    23@@2018-05-16 13:29:03######## 3
    @@2018-05-16 13:30:44+++ map.size= 6
    24@@2018-05-16 13:30:44######## 3
    25@@2018-05-16 13:30:44######## 3
    26@@2018-05-16 13:30:44######## 3
    27@@2018-05-16 13:30:44######## 3
    28@@2018-05-16 13:30:44######## 3
    @@2018-05-16 13:31:44+++ map.size= 10
    29@@2018-05-16 13:31:44######## 3
    30@@2018-05-16 13:31:44######## 3
    31@@2018-05-16 13:31:44######## 3
    32@@2018-05-16 13:31:44######## 3
    33@@2018-05-16 13:31:44######## 3
    34@@2018-05-16 13:31:44######## 3
    35@@2018-05-16 13:31:44######## 3
    36@@2018-05-16 13:31:44######## 3
  • 49b8771f4b4e:集群解决不了这个问题?
    littlersmall:@TAKCHINGLO 能具体点吗
  • 7c5d721d3578:另外,我发现一个小问题,不知道是我的理解错误还是本身就是这样,当我在用这里的代码去整合springboot的时候,项目启动和运行都在我的期望之内,不过在项目停止的时候出现了一个错误,而我没有能力去解决...
    报错代码:Channel channel = connection.createChannel(false);
    报错信息:java.lang.IllegalStateException: The ApplicationContext is closed and the ConnectionFactory can no longer create connections.
    我的理解是当我关闭应用的时候,您的connctionFactory关闭了,但是线程还在跑,并尝试去建立rabbitMQ连接,但是由于connctionFactory不再提供connction导致这里连接通道失败,不知道是不是这样产生的。
    我的所有的改造后的代码:https://github.com/zhaoqicheng/springboot-rabbitmq-learning
    如果大神有时间...能不能帮我参考一下这个问题
  • 7c5d721d3578:请问一下,您里面用的注解@PostConstruct配合init方法,和使用rabbitMQ监听的注解@RabbitListener(queues = "xxxx")效果是一样的吗?另外,我在测试您的代码的时候发现,当我不去消费的时候使用您的send方法来发送消息,您的代码里面是不是有这样的机制,当超过一定时间未消费的话,会重新发送消息到通道?如果是的话,这样是否合理?(您写的代码太精简,我真的是有点看不懂。如果在gitlab上面添加一点注释就好了!!!哈哈哈 )。。
    7c5d721d3578:@littlersmall 是的,昨天下午已经研究明白,另外我查阅了资料您使用的@PostConstruct注解来作为消费者运行时机,这个可以完全代替 @RabbitListener(queues = "xxx")和@RabbitHandler注解吗?之前没用过@PostConstruct注解,资料上说@PostConstruct是在这个类被在实例化的时候就会调用,可以完全代替吗?我可以在项目中直接按照您的代码来写吗?
    littlersmall:@Z_3772 不是未消费会重发。而是如果发送到rabbitmq服务器,3分钟没有收到确认ack,会重发,和是否消费没有关系。
    7c5d721d3578:我在测试您的代码的时候发现,当我不去消费的时候使用您的send方法来发送消息,您的代码里面是不是有这样的机制,当超过一定时间未消费的话,会重新发送消息到通道?如果是的话,这样是否合理?

    这里我已经在您的代码里面发现了,3min之后未消费则重新发送该条消息。另外一个方法没有明白:
    rabbitTemplate.setReturnCallback((message, replyCode, replyText, tmpExchange, tmpRoutingKey)
    这个方法在这里的作用是什么呢...
  • 方老司:starretry中确认消息送达的方法
    @Override
    public DetailRes send(Object message) {
    try {
    String id = retryCache.generateId();
    retryCache.add(id, message);
    rabbitTemplate.correlationConvertAndSend(message, new CorrelationData(id));
    } catch (Exception e) {
    return new DetailRes(false, "");
    }

    return new DetailRes(true, "");
    }

    难道不抛异常就能证明发送成功了么?
    方老司:@littlersmall 方便加个qq或者微信么?还是有些地方不太明白
    littlersmall:@方老司 你可以看一下recryCache的实现,那个里面保证的
  • 挥手功名:spring 整合rabbitmq,每次发送都会 CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setHost(host);
    connectionFactory.setPort(port);
    connectionFactory.setUsername(username);
    connectionFactory.setPassword(password);

    导致mq服务器 connection 太多崩溃,试过很多方法,connection都无法释放,
    b6780e949013:@littlersmall 他的意思就是,每发送一次消息到mq,factory都会去连接一次mq,得不到释放,连接太多后导致部署mq的服务崩溃
    littlersmall:没太看明白。。
  • chelsealyn:“他给的方案很简单,单纯的增加connection数:connectionFactory.setChannelCacheSize(100);”

    这个地方应该是增加通道数吧。connectionCacheSize跟channelCacheSize是有区别的。
    littlersmall:@chelsealyn 是的,这里的描述不够严密。没有试过增加connectionCacheSize是否有实际效果,可以试试看。
  • oov:Sender服务器宕机,本地缓存也会丢失吧
    661bbe3c55f0: @oov 确实觉得因为重发机制引入redis有点重,现在参考了作者的思路来实现发送方,考虑到消息发生确认问题,这个重发机制的消息缓存策略怎么做一下比较好呢
    oov:@littlersmall 恩,业务上这样做感觉会重了点,可以考虑根据业务特点做些兜底策略。
    littlersmall:@oov 是的,如果对发送要求特别严格,考虑使用redis代替内存cache

本文标题:rabbitmq可靠发送的自动重试机制

本文链接:https://www.haomeiwen.com/subject/uhbjettx.html