在我之前发的两篇问题整理中,梳理了一下ActiveMQ集群转发问题的分析过程:
ActiveMQ集群消息转发问题整理(一)
ActiveMQ集群消息转发问题整理(二)
这里简要介绍一下问题的发现和分析过程:
问题发现
环境中 8000+ 个 Client 连接8台MQ,使用 non-durable 的方式订阅各自C类网段的TOPIC,例如 192.168.0.1 订阅 TOPIC:VLAN.192.168.0 。环境中大约有160+个C类网段,所以大约有160个TOPIC。
下发针对全量 Client 的任务的时候,常常发现有个别 Client 收不到任务。由于我之前做了个任务流插件,可以跟踪到任务是否到达MQ,见(ActiveMQ插件开发实例-任务日志)表现为任务到达了生产者所在的MQ,但是没有被转发到Client所在的MQ。
问题分析
一、进一步现象
根据现象进一步测试发现如下的现象:
- 假设 Client_A连接MQ_A,消费TOPIC_A。消息下发时,出现Client_A接收不到消息的情况。
- Client_A连接的MQ_A上只有这个Client_A一个消费者消费TOPIC_A上的消息。
- 查看TOPIC的订阅者信息,除了Client_A外,还可能出现该MQ转发到其他MQ上的虚拟消费者,表示发到这个TOPIC的消息需要被转发给其他MQ
- 集群内其他所有MQ上的TOPIC_A上查看订阅者,均未出现类似的虚拟消费者,表示消息不会被转发到MQ_A,所以Client_A无法收到消息。
- 重启Client,Client会自动飘到集群内其他的MQ上,此时可以正常消费。
- 指定MQ_A要求Client_A重启后连接到MQ_A,也可以正常消费。
这些现象表明这是一个偶发的问题,所以可能需要进一步深入到源码级别查找一下问题原因。
二、源码分析
首先要明确一下MQ集群中的转发机制是怎样的。由于是 non-durable 的topic,使用的应该是 DemandForwardBridge 的方式。该方式的原理根据AMQ官网上的介绍,是通过集群中的 MQ 订阅集群中其他 MQ 的 advisory topic 实现的。举个例子:
- Broker A和Broker B为集群中两台MQ。
- A,B启动时,便通过静态配置对方IP的方式得知集群中有另一个MQ存在,所以建立了连接对方的通道,并订阅了对方的 ActiveMQ.Advisory.Consumer.> 的topic。
- 当 A 上连接了一个消费者,订阅topic1时,A便会往自己的 ActiveMQ.Advisory.Consumer.Topic.topic1 发送一条带上了消费者信息(ConsumerInfo) 的消息。
- B接收到这条 Advisory 消息以后,知道了A有一个消费者要消费topic1,就会建立一个专门的订阅通道(DemandSubscription)。
- 当 B 上收到生产者往 topic1 发送的消息时,会同时往 A 上转发一份。A收到以后再发送给自己的消费者。
从订阅 Advisory Topic 到消息转发,所有的动作都在 activemq-broker 的 org.apache.activemq.network.DemandForwardBridgeSupport 类中实现。其中 serviceRemoteConsumerAdvisory() 方法用于处理接收到的Advisory 消息,该方法调用了 addConsumerInfo() 方法,用于建立 DemanSubscription 。
于是我修改了 DemandForwardBridgeSupport 和 AdvisoryBroker(用于发送 Advisory 消息)这两个类,增加了一些日志来分析异常的场景下到底是哪一环节出了问题。
##########正常的日志##########
1. Client-1 连接到 Broker A,Broker A 添加消费者,消费者ID为 Client-1
2. Broker A 发送 Advisory 消息,广播Client-1的ConsumerInfo
3. Broker B 收到 Client-1的 Advisory 消息,添加消费者,消费者ID为 Broker B->Broker A
4. Broker B 发送 Advisory 消息,广播 Broker B->Broker A 的ConsumerInfo
5. Broker C 收到 Client-1的 Advisory 消息,添加消费者,消费者ID为 Broker C->Broker A
6. Broker C 发送 Advisory 消息,广播 Broker C->Broker A 的ConsumerInfo
7. Broker C 收到 Broker B->Broker A 的 Advisory 消息,由于 networkTTL=1 的设置,不添加消费者。
8. Broker B 收到 Broker C->Broker A 的 Advisory 消息,由于 networkTTL=1 的设置,不添加消费者。
9. Broker A 收到 Broker B->Broker A 的 Advisory 消息,由于 networkTTL=1 的设置,不添加消费者。
10. Broker A 收到 Broker C->Broker A 的 Advisory 消息,由于 networkTTL=1 的设置,不添加消费者。
##########异常的日志##########
1. Client-1 连接到 Broker A,Broker A 添加消费者,消费者ID为 Client-1
2. Broker A 发送 Advisory 消息,广播Client-1的ConsumerInfo
3. Broker B 收到 Client-1的 Advisory 消息,添加消费者,消费者ID为 Broker B->Broker A
4. Broker B 发送 Advisory 消息,广播 Broker B->Broker A 的ConsumerInfo
5. Broker C 收到 Broker B->Broker A 的 Advisory 消息,由于 networkTTL=1 的设置,不添加消费者。
6. Broker A 收到 Broker B->Broker A 的 Advisory 消息,由于 networkTTL=1 的设置,不添加消费者。
可以看到,Broker C 异常场景下没有接收到 Broker A 的消息。也就是说,消息从 Broker A 中发出,但是在 Broker C 消费时丢失了。
异常场景只在大量 Client 同时尝试连接,且出现问题的 Topic 网段中 Client 数量较少。
问题解决
我尝试了多种方法,始终无法解决Advisory消息没收到的问题,于是我在默认 Advisory 消息会丢的情况下设计了几种解决方法:
- 设置 networkTTL=2
- 尝试本地接收 Advisory 消息并存在内存中,等到服务器不忙的时候直接再发一次。
前一种方法对一个互相连接的集群来说,有极大的网络负担;而后一个方法逻辑比较复杂,比较难判断 Advisory 消息什么时候该发。一怒之下我提了个 AMQ 严重BUG到JIRA上。一开始有人建议我升级到5.15以上的版本再试试,于是我试了下,问题仍然存在。
在我把我分析问题的流程写到JIRA上以后,有人回复我是我需要去掉AMQ的一个默认配置。
<policyEntry topic=">" >
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
pendingMessageLimitStrategy这个配置项用于处理 Slow Consumer,AMQ官方的解释是这样的(Slow Consumers)
Slow Consumers can cause problems on non-durable topics since they can force the broker to keep old messages in RAM which once it fills up, forces the broker to slow down producers, causing the fast consumers to be slowed down. One option we could implement in the future is spooling to disk - but then spooling to disk could slow down the fast consumers too.
Currently we have a strategy that lets you configure the maximum number of matched messages the broker will keep around for a consumer in addition to its prefetch buffer. Once this maximum is reached, as new messages come in, older messages are discarded. This allows you to keep the RAM for current messages and keep sending messages to a slow consumer but to discard old messages.
也就是是说,可配置一个 non-durable topic 的 consumer 可以让MQ为其保存多少消息,由于 topic=">" 的写法 ,Advisory的Topic也被包含在这个配置的使用范围内。所以,当有大量 Client 连接上来时,MQ会发送大量的 Advisory Messages,如果集群中其他MQ的对 Advisory Messages没有及时处理完,就会导致触发这个机制,新的 Advisory 消息会持续把旧的 Advisory 消息顶出队列(as new messages come in, older messages are discarded)。
综上所述,如果场景中有多个MQ组成集群,有大量的Client尝试连接,就需要调整这个配置,避免出现 advisory 消息被丢弃的情况。
Problem Solved.
山穷水复疑无路,柳暗花明又一村
网友评论