美文网首页
RocketMQ消息不均衡的分析与解决

RocketMQ消息不均衡的分析与解决

作者: 书唐瑞 | 来源:发表于2022-03-06 14:15 被阅读0次





    此篇文章不仅仅在于分析RocketMQ的消息不均衡原因与解决思路, 更在于分析MQ消息不均衡的原因和解决思路, 只是拿RocketMQ作为例子.

    从生产者角度考虑,是不是生产者发送消息的时候,只钟爱某几个Queue,只将消息发送到个别Queue中,才造成消息倾斜呢?
    这种情况可能会发生,但概率比较低. 而且也不是生产者只钟爱某几个Queue, 多半情况是某些条件不满足了, 才导致把消息发送给了某些Queue中. RocketMQ是按照Queue队列轮询的方式将消息发送出去的, 保证生产者(皇帝)能临幸所有的嫔妃(指Queue), 当然RocketMQ也考虑了发送消息延迟的情况, 尽量将消息发送到消息延迟低的broker上. 即便发送顺序消息, 基于key的哈希值, 也基本可以认为消息是均衡发送到各个Queue中的.

    假如我们有2个broker,topic有4个队列, 那么我们看下,作为生产者它拿到的队列信息长啥样.


    在这里插入图片描述

    因为每个broker有4个队列, 生产者获取到了8个队列. 而且队列的顺序不是杂乱的, 是按照相同的broker,不同的queueId的顺序排列的.

    假如不存在消息发送延迟的情况, TOPIC-A有4个队列, 则broker1和broker2各有4个队列, 那么生产者发送消息的时候,先发送给broker1的4个队列,然后再发送给broker2的4个队列,之后再次发送给broker1的4个队列,循环往复进行...

    在这里插入图片描述

    【正常情况】

    如上图所示,生产者依次将消息发送给broker1-0,broker1-1,broker1-2,broker1-3,broker2-0,broker2-1,broker2-2,broker2-3的Queue队列中,循环往复进行...

    【发送失败情况】

    假如向broker2的某个队列发送消息的时候,发送失败了,根据重试机制,那么生产者会从broker1中选择一个队列进行发送,如下图所示. 这种情况,似乎会造成消息倾斜的可能,毕竟生产者把消息发送到了一个broker上,接下来讲解消费端的时候再说如何应对这种情况.

    如果只有一个broker2, 没有broker1,即便发送消息给broker2失败了,也只能从broker2中再选择一个其他的Queue进行发送.

    在这里插入图片描述





    【延迟容错情况】

    生产者每次发送消息之后,都会记录消息发送给每个broker的延迟情况. 假如向broker2的某个队列发送消息的时候, 发现broker2的延迟比较大, 那么生产者会暂时舍弃broker2, 而是从broker1中选择一个队列进行发送, 和上图一样, 这种情况,似乎也会造成消息倾斜的可能,毕竟生产者把消息发送到了一个broker上,接下来讲解消费端的时候再说如何应对这种情况.

    【总结】

    总之,由于生产者导致消息不均衡的概率很低, 相对于消费端, 可以忽略生产者导致消息不均衡的情况了. 可以认为生产者是均匀的将消息发送到不同的Queue中的.

    在排查消息不均衡的情况时, 可以排查一下是否存消息发送失败和发送延迟的情况. 而且即便由于这2个原因造成发送消息时出现消息发送不均衡的情况, 在消费端也可以避免.

    生产者尽量会保证消息发送,从全局的Queue队列来看都是均衡的, 消息会均衡的发送到每个Queue. 即便某个broker出现了问题, 也会保证消息发送到其他broker的队列也是均衡的.

    RocketMQ消息投递的具体策略在 org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue 方法中. 生产者发送消息的时候, 就会调用到这个方法, 选择对应的队列.







    我们假设生产者发送的消息都是均衡的发送到不同的Queue队列中的. 而且我们也假设每个消费者的消费能力没有太大差别.
    接下来分析消费端如何造成消息不均衡的情况的.

    说明一点的是,消息不均衡,即便增加消费者,也无法解决消息不均衡.

    我亲自经历的,以及我知道别人咨询阿里的结论, 阿里工程师针对消息不均衡的情况,是让你增加消费者的消费能力,其实是不对的.

    假如我们有2个broker,topic有4个队列, 有2个消费者,如下图所示

    在这里插入图片描述

    如上图,共有8个队列,那么这8个队列该如何分配给2个消费者呢?
    常规的方案就是平均分配, 8个队列,2个消费者, 那么每个消费者分配4个队列, 如下图所示

    在这里插入图片描述

    上面说的这个策略对应的类是AllocateMessageQueueAveragely

    在RocketMQ的源码中,默认就是指定了AllocateMessageQueueAveragely策略.

    public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsumer {
    
        /**
         * Queue allocation algorithm
         */
        private AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
    
    }
    

    看似完美,而且这也是RocketMQ消费端默认的分配队列的情况. 然而它却有着天生的缺陷. 假如此时生产者有4条消息需要发送, 按照轮询的策略,生产者会把消息发送给broker1的queueId=0,1,2,3 这4个队列, 意味着只有消费者1可以消费到消息, 而消费者2一直空闲. 过了一会, 生产者再发送4条消息, 会把消息发送给broker2的queueId=0,1,2,3 这4个队列, 意味着只有消费者2可以消费到消息, 而消费者1变成了一直空闲. 就会造成一个消费者忙碌一个消费者空闲的情况, 间接出现了消息不均衡情况. 如果出现了上面说的生产者发送失败或发送延迟的情况, 可能消息都会发送到broker1上, 造成消费者1一直忙碌, 而消费者2无所事事, 间接出现了消息不均衡情况.

    AllocateMessageQueueAveragely策略, 从全局看的话, 队列是平均分配给每个消费者的, 但是从局部看的话, 队列没有平均分配给每个消费者. 这就是关键所在.



    其实生产者没有过失, 在队列可用,消息发送延迟时间可控的情况下, 生产者一直是致力于把消息均衡发送给每个队列, 即便broker2已经不可使用了,在局部看来, 生产者依然是将消息均衡的发送到broker1的每个队列上. 之所以导致消息不均衡的罪魁祸首其实是消费端的分配队列的策略. 按照上面的分配队列策略, 是不完美的, 天生会造成消息不均衡. 只要生产者的生产消息的速度足够快, 消费者消费消息的速度明显跟不上, 这种消息不均衡的现象就会明显出现.

    RocketMQ还提供了一个分配队列的策略, 它是AllocateMessageQueueAveragelyByCircle , 这个策略是环形平均分配, 如下图所示

    在这里插入图片描述

    就好像消费者1和消费者2轮流从8个队列里面挨个拿礼物一样, 不管从全局,还是局部来看, 队列都是平均的分配给2个消费者. 即便broker2此时不可用, 消息都被发送到了broker1上, 依然是有2个消费者来消费消息的, 而不像之前的那样, 只有一个消费者消费消息, 避免了部分消费者处于饥饿的状态, 间接避免了消息不均衡的问题.

    RocketMQ中的RebalanceImpl类负责消费端的负载均衡, 用于给消费端分配对应的队列,源码如下

    // 源码位置 : org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic
    
    private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            case BROADCASTING: {
                ...
            }
            case CLUSTERING: {
                if (mqSet != null && cidAll != null) {
    
                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
    
                    List<MessageQueue> allocateResult = null;
                    try {
                        // 调用具体的分配队列策略
                        allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);
                    } catch (Throwable e) { }
                    ...
                }
                break;
            }
            default:
                break;
        }
    }
    
    
    
    

    【说明】

    1.在原生RocketMQ v4.5.2源码中并没有看到可以手动指定不同分配队列策略的入口, 也许是我没找到

    2.如果使用的是商业版的RocketMQ, 那么使用的ons-client包, 在这个包的1.8.7.4.Final中,如果实例化Consumer的时候指定了AllocateMessageQueueAveragelyByCircle策略的话, 那么就可以使用它.

    ons-client底层代码,根据程序员的配置, 使用不同的分配队列的策略,如下

    在这里插入图片描述

    而我们能做的,就是在实例化Consumer的时候,指定AVG_BY_CIRCLE属性, 那么商业版RocketMQ底层就是使用AllocateMessageQueueAveragelyByCircle策略用来给每个消费者分配队列.

    在这里插入图片描述

    读者朋友只需要知道采用什么策略可以达到什么效果即可, 具体的实现与具体的版本和写法有关,根据实际情况调整下即可.

    3.当然RocketMQ还有其他的分配队列的策略,这里就没有介绍.

    4.脚本工具, 由于工作中使用的是阿里云商业版的RocketMQ, 有时候需要模拟发送和接收消息,基于官方API文档,写了一个HTTP协议的Python版的生产者和消费者, 以及TCP协议的Java版的生产者和消费者, 其实没有啥技术含量,就是把官网的API拿过来,改吧改吧而已.

    虽然我们可以采取环形平均分配的策略, 但是还需要考虑一个问题,如下图

    在这里插入图片描述

    当消费者数量和队列数量不匹配的时候, 总会有一些消费者比其他消费者多分配几个队列, 一旦生产者生产的消息速度比较快, 而消费者消费消息的速度比较慢的话, 那么多分配的几个队列就会造成消息倾斜消息不均衡的悲惨状况.

    导致消费者消费慢的原因,比如RPC调用慢, 数据库查询慢, 硬件等原因

    【总结】
    第一点消费端需要采取合适的队列分配策略,比如按照环形平均分配策略(AllocateMessageQueueAveragelyByCircle),

    第二点消费者的数量和队列的数量尽量保持公平平均分配, 前面这两点就是在说, 不仅每个消费者要分配到相同数量的队列, 而且这些队列还需要按照环形的方式分配给消费者.

    第三点就是消费者的消费速度不能太慢, 尽量把消息尽快消费掉, 而且消费者彼此之间消费能力差别不大.

    欢迎沟通交流

    相关文章

      网友评论

          本文标题:RocketMQ消息不均衡的分析与解决

          本文链接:https://www.haomeiwen.com/subject/wuxwrrtx.html