05 | 如何确保消息不会丢失?
我们可以利用消息队列的有序性来验证是否有消息丢失。原理非常简单,在 Producer 端,我们给每个发出的消息附加一个连续递增的序号,然后在 Consumer 端来检查这个序号的连续性。
如果没有消息丢失,Consumer 收到消息的序号必然是连续递增的,或者说收到的消息,其中的序号必然是上一条消息的序号 +1。如果检测到序号不连续,那就是丢消息了。还可以通过缺失的序号来确定丢失的是哪条消息,方便进一步排查原因。
大多数消息队列的客户端都支持拦截器机制,你可以利用这个拦截器机制,在 Producer 发送消息之前的拦截器中将序号注入到消息中,在 Consumer 收到消息的拦截器中检测序号的连续性,这样实现的好处是消息检测的代码不会侵入到你的业务代码中,待你的系统稳定后,也方便将这部分检测的逻辑关闭或者删除。
07 | 消息积压了该如何处理?
在消息的收发两端,我们的业务代码怎么和消息队列配合,达到一个最佳的性能。
如果说,你的代码发送消息的性能上不去,你需要优先检查一下,是不是发消息之前的业务逻辑耗时太多导致的。
我们把这 1ms 的时间分解开,它包括了下面这些步骤的耗时:
- 发送端准备数据、序列化消息、构造请求等逻辑的时间,也就是发送端在发送网络请求之前的耗时;
- 发送消息和返回响应在网络传输中的耗时;
- Broker 处理消息的时延。
所以,我们在设计系统的时候,一定要保证消费端的消费性能要高于生产端的发送性能,这样的系统才能健康的持续运行。
消费端的性能优化除了优化消费业务逻辑以外,也可以通过水平扩容,增加消费端的并发数来提升总体的消费性能。特别需要注意的一点是,在扩容 Consumer 的实例数量的同时,必须同步扩容主题中的分区(也叫队列)数量,确保 Consumer 的实例数和分区数量是相等的。如果 Consumer 的实例数量超过分区数量,这样的扩容实际上是没有效果的。原因我们之前讲过,因为对于消费者来说,在每个分区上实际上只能支持单线程消费。
我见到过很多消费程序,他们是这样来解决消费慢的问题的:
image.jpeg它收消息处理的业务逻辑可能比较慢,也很难再优化了,为了避免消息积压,在收到消息的 OnMessage 方法中,不处理任何业务逻辑,把这个消息放到一个内存队列里面就返回了。然后它可以启动很多的业务线程,这些业务线程里面是真正处理消息的业务逻辑,这些线程从内存队列里取消息处理,这样它就解决了单个 Consumer 不能并行消费的问题。
能导致积压突然增加,最粗粒度的原因,只有两种:要么是发送变快了,要么是消费变慢了。
你通过监控发现,无论是发送消息的速度还是消费消息的速度和原来都没什么变化,这时候你需要检查一下你的消费端,是不是消费失败导致的一条消息反复消费这种情况比较多
总结
优化消息收发性能,预防消息积压的方法有两种,增加批量或者是增加并发,在发送端这两种方法都可以使用,在消费端需要注意的是,增加并发需要同步扩容分区数量,否则是起不到效果的。
网友评论