从某个时间段开始发现MQ消费变的不再均匀,一批消息过来之后,最终消费的消息曲线如下图,根据短板效应这次任务完成的时间以最后一条消息消费完成为主,这就导致任务的处理时间由于部分机器被整体拉长。业务上希望任务处理时间越短越好。
问题
为什么出现部分机器消费这么慢?

MQ原理
MQ官方文档:https://yuque.alibaba-inc.com/mqdevops/metaq_wiki/osk16a
生产端
目前MetaQ的在默认情况下生产者发送消息的时候采用轮询方式均匀分发到队列上,每个队列的数据基本一致
生产端选择队列:com.alibaba.rocketmq.client.producer.MessageQueueSelector

消费端
MetaQ是一个分布式消息队列,consumer按队列数分配消费,不是按消息总数
消费不感知具体发送方式,从现存队列上读数据(一个队列只会被一个消费集群中一台机器处理)
一个队列只会被一个消费集群中一台机器处理
队列与机器的分配策略:com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy,里面逻辑都比较简单,可以自行看下。
有四个参数:
- String consumerGroup 消费组,在创建消费者的时候会指定消费者属于哪个组
- String currentCID 当前消费者的ID,一般是IP:数字格式。 在MetaQ控制台可以看到消费者客户端Id
- List<MessageQueue> mqAll 等待分配的所有的消费队列
- List<String> cidAll 所有的消费者

问题原因
队列数量不是消费者数量的整数倍,有200个队列,但是有80台机器,那么有40台机器会分配到3个队列,40台机器分配2个队列,而每个队列的消息量是相同的,导致其中40台机器要比其他机器消费更的消息数量。

问题处理
想了3种处理方案:
方案一 | 方案二 | 方案三 | |
---|---|---|---|
复杂度 | 高 | 中 | 低(没有代码改动) |
适用场景 | 消息数量少 | 跟随MetaQ,无限制 | 无 |
长期性 | 可持续使用 | 可持续使用 | 扩容时需联系MetaQ的人员 |
稳定性 | 中 | 高 | 高 |
方式 | 自己实现微型MQ | 实现MetaQ接口 | 联系MetaQ值班 |
依赖 | Redis,广播机制 | 无 | 无 |
综合考虑,选择方案二,复杂度上较低,依赖较少,比较容易实现。
方案一
自己弄一个队列,所有的机器只消费者一台队列,这样处理快的机器处理更多的消息,队列空的时候所有机器都停止消费,避免部分机器在消费,部门机器是空闲的状态。

方案二
实现当前的MetaQ生产端与消费端队列分配逻辑。取余后的队列不再使用,只使用当前机器倍数的队列。
- MessageQueueSelector: 让生产端发送消息的时候跳过某些队列。
- AllocateMessageQueueStrategy 跳过的队列不进行分配,平均分配剩余队列
需要准备信息:
- 机器数量
- 机器数量 * (mqCount / 机器数量) = 准备使用的队列数量X,index / X >= 1不使用

生产端代码
new MessageQueueSelector() {
private Random random = new Random(System.currentTimeMillis());
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Collections.sort(mqs);
int serverCount = SwitchConfig.serverCount;
int useMqCount = serverCount * (mqs.size() / serverCount);
int pos = random.nextInt(useMqCount);
MessageQueue messageQueue = mqs.get(pos);
log.info("Send MessageQueueSelector {}",messageQueue);
return messageQueue;
}
}
消费端代码
new AllocateMessageQueueStrategy() {
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
Collections.sort(mqAll);
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}
int serverCount = SwitchConfig.serverCount;
int useMqCount = serverCount * (mqAll.size() / serverCount);
ArrayList<MessageQueue> useMqList = new ArrayList<>(useMqCount);
for (int i = 0; i < useMqCount; i++) {
useMqList.add(mqAll.get(i));
}
List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
return result;
}
int index = cidAll.indexOf(currentCID);
int mod = useMqList.size() % cidAll.size();
int averageSize =
useMqList.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? useMqList.size() / cidAll.size()
+ 1 : useMqList.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, useMqList.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(useMqList.get((startIndex + i) % useMqList.size()));
}
if (LogUtils.isDebugEnabled()){
log.info("{} {} {} {} {}",consumerGroup,currentCID,mqAll,cidAll,result);
}
return result;
}
@Override
public String getName() {
return "CUSTOM_DISCARDED";
}
}
方案三
联系MetaQ的人进行队列配置,让队列的数量为机器的整数倍。
最后效果
... 敬请期待
网友评论