RocketMQ进阶3_订阅关系一致性与消息幂等
1. 消息过滤
消息标签tag,用来标记 Topic 下的不同用途的消息。
在 RocketMQ 中,消费者是可以按照 Tag 对消息进行过滤。举个电商交易场景的例子,用户下完订单之后,会产生一系列的消息,比如说订单消息、支付消息和物流消息。假设这些消息都发送到 Topic 为 Trade 中,同时用 tag 为 order 来标记订单消息,用 tag 为 pay 来标记支付消息,用 tag 为 logistics 来标记物流消息。需要支付消息的支付系统(相当于一个 consumer)订阅 Trade 中 tag 为 pay 的消息,此时,broker 则只会把 tag 为 pay 的消息投递给支付系统。而如果是一个实时计算系统,它可能需要接收所有和交易相关的消息,那么只要它订阅 Trade 中 tag 为 order、pay、logistics 的消息,broker 就会把带有这些 tag 的消息投递给实时计算系统。
对于消息分类,可以选择创建多个 Topic 来区分,也可以选择在同一个 Topic 下创建多个 tag 来区分。这两种方式都是可行的,但是一般情况下,不同的 Topic 之间的消息是没有什么必然联系的,使用 tag 来区分同一个 Topic 下相互关联的消息则更加合适一些。
2. 订阅关系一致性
在 RocketMQ 中,订阅关系由 Topic和 Tag 组成,因此要保证订阅关系一致性,就必须同时保证:
- 订阅的 Topic 必须一致
- 订阅的 Topic 中的 tag 必须一致
在实际使用中,切记同一个消费者集群内的所有消费者实例务必要保证订阅关系的一致性。
保证订阅关系一致性是非常重要的,一旦订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。
在实际使用中,在 producer 端要做好消息的分类,便于 consumer 可以使用 tag 进行消息的准确订阅,而在 consumer 端,则要保证订阅关系一致性。
3. 消息重试
消息重试就是当消费者消费消息失败后,broker 会重新投递该消息,直到消费成功。
消息重试只针对集群消费模式。广播消费没有消息重试的特性,消费失败之后,只会继续消费下一条消息。
在 RocketMQ 中,当消费者使用集群消费模式时,消费者接收到消息并进行相应的逻辑处理之后,最后都要返回一个状态值给 broker。这样 broker 才知道是否消费成功,需不需要重新投递消息。也就是说,可以通过设置返回的状态值来告诉 broker 是否重新投递消息。
如果这条消息本身就是一条脏数据,就算你消费 100 次也不会消费成功,难道还是一直去重试吗?其实 RocketMQ 并不会无限制地重试下去,默认每条消息最多重试 16 次,而每次重试的间隔时间如下表所示:
第几次重试 | 每次重试间隔时间 |
---|---|
1 | 10 秒 |
2 | 30 秒 |
3 | 1 分钟 |
4 | 2 分钟 |
5 | 3 分钟 |
6 | 4 分钟 |
7 | 5 分钟 |
8 | 6 分钟 |
9 | 7 分钟 |
10 | 8 分钟 |
11 | 9 分钟 |
12 | 10 分钟 |
13 | 20 分钟 |
14 | 30 分钟 |
15 | 1 小时 |
16 | 2 小时 |
如果消息重试 16 次之后还是消费失败,那么消息就不会再投递给消费者,而是将消息放到相对应的死信队列中。这时候我们就需要对死信队列的消息做一些人工补偿处理,因为这些消息可能本身就有问题,也有可能和消费逻辑调用的服务有关等,所以需要人工判断之后再进行处理。
消费失败的情况:
- 返回 ConsumeConcurrentlyStatus.RECONSUME_LATER
- 返回 null
- 抛出异常
消费者返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 或者 null,就相当于告诉 broker 说,这条消息我消费失败了,你给我重新投递一次。对于抛出异常这种情况,只要在处理消费逻辑的地方抛出了异常,那么 broker 也重新投递这条消息。如果异常被捕获,则不会进行消息重试。
4. 消息幂等
消费幂等就是对于一条消息的处理结果,不管这条消息被处理多少次,最终的结果都一样。比如说,收到一条消息是要更新一个商品的价格为 6.8 元,那么当这条消息执行 1 次,还是执行 100 次,最终在数据库里的该商品价格就是 6.8 元。
为什么消费需要幂等呢?因为在实际使用中,尤其在网络不稳定的情况下,RocketMQ 的消息有可能会出现重复,包括两种情况:
- 发送时消息重复。
- 投递时消息重复。
第一种情况是生产者发送消息的场景。消息已成功发送到 broker,但是此时可能发生网络闪断或者生产者宕机,导致 broker 发回的响应失败。这时候生产者由于没有收到响应,认为消息发送失败,于是尝试再次发送消息给 broker。这样一来,broker 就会再收到一条一模一样内容的消息,最终造成了消费者也收到两条内容一模一样的消息。
第二种情况是消费者消费消息的场景。消息已投递到消费者并完成消费逻辑处理,当消费者给 broker 反馈消费状态时可能发生网络闪断。broker 收不到消费者的消费状态,为了保证至少消费一次的语义,broker 将在网络恢复后再次尝试投递之前已经被处理过的消息,最终造成消费者收到两条内容一模一样的消息。
对于那些不允许消息重复的业务场景来说,处理建议是通过业务上的唯一标识来作为幂等处理的依据。
网友评论