美文网首页RocketMQ
Consume Rebalance in RocketMQ &

Consume Rebalance in RocketMQ &

作者: lysu | 来源:发表于2017-06-19 11:59 被阅读0次

    今天的计划看下两个消息中间件RocketMQ和Kafka的Rebalance方式- -

    首先说下Rebalance是做啥...为啥需要rebalance并介绍一些参与rebalance的基本概念~

    Kafka(RocketMQ)在Broker中会将一个topic划分为多个Partition(ConsumeQueue), 消息在生产后会被投递到某个Partition(ConsumeQueue)中.(PS: partition可以被分配不同broker)

    而对于消费者,为了解决一条消息如何消费的问题, 引入了ConsumeGroup并将Consumer分配到某个consumeGroup中, MQ在处理消息时会保证消息,在一个Group中只会被一个Consumer消费(In ClusterMode也是正常大家使用的方式).(所以N台机器如果在Group中只会被一个Consumer收到)

    所以,MQ需要

    • 将Partition(ConsumeQueue)分配给Consumer
    • 并保证一个Partition(ConsumeQueue)只会被分配给一个Group中的一个Consumer(这样就做到一个消息只能被Group中一个Consumer消费了)
    • 一个Consumer可以消费多个ConsumeQueue
    • 在Consume变化时重新分配保证保证ConsumeQueue都有被处理
    • 在Partition数量变化时重新分配保证Consume
    • 同样在Broker部分挂机的情况下分配过程保证正确

    而上面的过程就是今天要讨论的Rebalance

    RocketMQ

    粗看流程

    RocketMQ的Rebalance逻辑实际是发生在Consume客户端的(当然也必须会从Broker或Nameserver获取一些信息), 处理思路简单说是这样:(注意RocketMQ里的ConsumeQueue可以理解为Kafka的Partition,所以这节里都用ConsumeQueue表述)

    reblance核心逻辑可以参看RebalanceImpl#topicSubscribeInfoTable

    • 首先这个rebalance过程是被触发在每个consumer上
    • 在客户端获取到当前topic在所有broker所有ConsumeQueue
    • 在客户端获取到所有当前ConsumeGroup的Consumer列表(也就是知道和自己在一个Group的其他兄弟姐妹)
    • 客户端触发Rebalance时会对所有ConsumeQueue基于QueueID(每个ConsumeQueue的固定的属性)进行排序, 对所有的ConsumeID也进行排序(每个Consumer的标示), 将排序结果.
    • 然后使用当前的分配策略进行分配,分配结果就是分配给当前Consumer的ConsumeQueue列表
    • 因为每个Consumer都会运行分配所以最终结果是所有consumer都各自拿到属于自己的ConsumeQueue

    触发条件这个rebalance的条件有:

    • 每20s定时刷新(准确说上次刷新后等20s, @see RebalanceService#run
    • 收到Broker告知的Consume变化通知时@see ClientRemotingProcessor#notifyConsumerIdsChanged
    • 每次Client启动时@see DefaultMQPushConsumerImpl#start

    上面简单的描述了分配过程,不过我们接下来会看下各个细节~

    获取所有ConsumeQueue信息

    这里我们从设置开始一起走到获取~

    • ConsumeQueue会在创建Topic时指定Topic里Queue的数量(细化说有Write和Read这里不展开- -),最终创建结果会被存储到NameServer上(就如名字所说保存一些元数据的server)
    • 所以Consume会直接从NameServer获取关于当前有多少Queue的信息
    • 获取Queue数量是从本地的``获取的代码位于@see MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String)这个函数入口有些多但情况就是会从从NameServer获取并在变化时更新在客户端缓存的对特定topic的Routine信息(这包括所有的broker,每个broker编号queueId为0->x)
    • 有个入口需要关注就是MQClientInstance#startScheduledTask会每10s刷主动刷一次
    • 然后实际就从RebalanceImpl#topicSubscribeInfoTable这个缓存字段获取来rebalance了

    问题:...等等一会儿看

    获取Group中的其他Consume信息

    这里我们倒过来从获取走到数据来源..

    • group信息是保存在broker上的每次分配都会从broker拉取@see MQClientAPIImpl#getConsumerIdListByGroup
    • 这里获取Group中其他ConsumerId请求的是任意一个brokerGroup(一个topic创建时可以指定多个brokerName, 一个brokerName下可以有多台brokerNode,为了便于理解这里把相同brokerName的多个brokerNode假装叫做brokerGroup)
    • 之后会从brokerGroup中选择一台机器获取, 获取会优先获取Master节点, 如果Master没有会乱获取一台(实现是hashmap里iter的第一个- -因为后面看到因为每台consumer都连接所有broker所以理论上可以乱选)
    • 在broker上每个Group现在的consumer信息是保存在内存中的ConsumerManager#consumerTable一个Map
    • 而更新的地方只有一个ClientManageProcessor#heartBeat也就是收到client心跳信息的时候
    • 好了, 我们必须看下心跳咋上报的MQClientInstance#sendHeartbeatToAllBroker, 可以看到果然,只要当前client有consumer信息(即不是纯粹的produer角色的client)就会像所有brokerNode上报心跳, 注意不是brokerGroup是所有node(这里感觉建立心跳连接有些多???不过目前这模式好像没太好优化方法 再想想- -)

    好了这里画个图总结下~

    所有Conume都会向所有broker建立连接并心跳上报,所以所以任意一台broker都有当前group的所有节点信息(正常情况), 客户端想要获取当前group的所有consumer信息直接乱选一台活着的获取就好了

    几个内置的分配策略

    首先前面说过分配前提是已经获取到所有可用Queue和所有当前Group的Consumer,并都做了排序,各个Consumer各自执行分配, 分配逻辑实现是AllocateMessageQueueStrategy的几个实现

    • AllocateMessageQueueAveragely: 平均分配
    • AllocateMessageQueueByMachineRoom: 看注释是什么alipay逻辑机房的逻辑???主要是对brokerName基于逻辑机房进行了筛选- -?不过能否用怎么用就...
    • AllocateMessageQueueAveragelyByCircle: 环形分配
    • AllocateMessageQueueByConfig: 配死(或通过其他机制动起来?)

    总结下最后结果就是能获取到属于当前consumer的ConsumeQueue(代码里叫MessageQueue)

    标记已Queue已被占用

    上面我们看到整个标记过程都是在consumer本地就完成了,各个consumer间通过排序+一个一致的算法就完成了分配,并没有和其他consumer的交互。

    然而这是有问题的,因为rebalance是各自执行,不排除某个时刻两个同一个Group的两个Consumer都怼到一个Queue上,而这个从设计上是绝对不允许的,所以这里需要一个机制保证永远不会出现同Group两个Consume怼到一个Queue上。

    RocketMQ目前是选择在Broker上维护一个LockMap来实现(后面会讨论这个也许有问题??)

    RebalanceImpl#updateProcessQueueTableInRebalance中, 如果是新分配的Queue, 会调用this.lock(mq)

    for (MessageQueue mq : mqSet) {
        if (!this.processQueueTable.containsKey(mq)) {
             if (isOrder && !this.lock(mq)) { // !!! here
                 log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                   continue;
             }
    

    继续往下跟代码(为了避免太长这里不贴代码了),会发现lock会向masterNode(brokerId=0)的节点发LockBatchRequestBody(只有Master?Master挂了的话- -?)

    最后在masterNode内存中会通过RebalanceLockManager#mqLockTable实现加锁占用(带超时默认1分钟超时类似租期), 如果master时这lock信息会丢失掉?当依赖定时rebalance可以恢复,不过那次rebalance如果有冲突之类的情况发生的话...? 好吧 后面再来看这些check特殊场景

    如果加锁失败(别人已经占用或者锁请求失败)会不对这个queue不做处理。。然后等下次rebalance, 再来看别人是否释放锁和masterbroker是否恢复...

    同样在RebalanceImpl#updateProcessQueueTableInRebalance会将无需处理的队列从当前处理中remove掉~这部分逻辑跟下去位于RebalancePushImpl#removeUnnecessaryMessageQueue,会等待当前正在执行的消费并等processQueue处理干净才尝试向maserNode发起unlock(看代码这里好像没处理masterBroker网路不通的情况如果unlock不成功直接算成功了???)

    好了如果按照预期正常unlock,其他consumer可以lock并开始消费,或者等20s下次rebalance可以开始消费(如果本次因次序没竞争lock上)

    Challenge

    最后,我们来假设些场景,看看能否正常work

    1. Consume加入
    • 假设开始有1个aconsumer消费3个队列q1``q2``q3,启动后rebalance消费3个q并在lockMap中都占有
    • bconsumer这时加入,b自己启动触发自己rebalance, a收到b加入的change事件后开始rebalace,
    • b获得q3, 所以尝试lock,但a还占有着lock失败暂时不去消费q3
    • a获取q1, q2, 所以removeq3,并在处理当前消息等一会unlock
    • 定时rebalance运行, b成功lockq3并开始消费

    整体看没啥问题,虽然新加入的consumer要等一阵才能接手消费(有间隙消费小lag), 另外那个等一会儿unlock特殊情况下一会儿小概率会有问题

    2. Consume离开(断线)
    • 假设两个consumera处理q1,q2, b处理q3
    • b因为网络原因断线,broker发出change事件触发在线的a进行rebalance
    • a这时会分配接管q1``q2``q3, 对新加入的q3进行lock, 然后发现是lock不了的因为b已经在lock了
    • 这时候需要等60s后的rebalance, a才有机会解盘q3的消息

    感觉这部分等锁超时有些无奈- -,消费不会乱但会有消费lag增加

    3. Consume同时并发加入

    • 假设开始只有一个consumera处理q1``q2``q3
    • 之后consumerbc“同时”加入
    • 首先各自启动后自己rebalance,blockq2, clockq3可能会失败
    • 然后a收到change消息开始rebalance, 这时可能看到bc 也可能只看到b,rebalance处理change是使用wakeup不会重复唤醒(已醒着不会再来一次),所以本次rebalance是有可能认为只有b只unlockq3..不过没关系还有下次20s的rebalance那次还是可以怼正
    • 同理b,c靠20s运行一次的rebalance也是可以怼正

    所以我们看到可以保证消费不会乱,不过代价是要过一阵新加入的consume才能真正开始接手消费(间隙小lag)

    4. Topic调整Queue数量

    上面提到过Client每10s会从NameServer刷一次TopicRoutine(MQClientInstance#startScheduledTask), 所以Queue变化正常会在这里被收到并更新本地缓存。

    然后,正常情况下等下次rebalance时就会用新的Queue信息进行重新分配,然后基于上面说的lock和定期重rebalance规则,最终可以保证ok且中途不乱

    异常情况下,想到的几个NameServer数据不一致或交换routine刷新和rebalance次序,看好像最终也都能达到期望状态 - -

    5. Broker挂了

    上面提到过,lock信息是放到每个BrokerGroup中的master(id0)上的,所以如果Master挂了的话,lock会用永远不成功,可以理解为新Consume无法加入,老Consume无法退出,必须等待broker活过来,但之前在跑的的还可以正常运行(只要别离开了还没加入然后broker挂了- -这种情况部分queue会有lag)

    (PS背景介绍: 在rocketmq的设计里brokerGroup的master挂了group不可以写入,但可以改写其他brokerGroup来完成写入HA,消费者HA可以通过brokerGroup里的slave消费之前堆在brokerGroup里的内容)

    broker活过来后,因为是内存,所以下次触发rebalance会重新恢复lock的map。。

    不过感觉有个极端情况。。就是master挂了,然后这时消费者有变化或者队列数目有调整。。。因为启动时内存为空等于没占锁,而实际之前consumer已经在跑,在还没来得及rebalance就发生了变更,这时可能出现同group里两个consumer同时消费一个queue????

    RocketMQ小结

    RocketMQ在master不挂的情况下rebalance可以保证消费不乱,虽然可能会有消息lag问题但感觉并不关键;而master挂且同时发生rebalance这个的确有些问题。。此外rebalance完全由客户端控制其他人有没有用上相互之前并不知道;并且各自拉namesrv可能会看到不一致的数据虽然最终通过定期重rebalance可以一致会导致不必要的rebalance的感觉- -

    看似有些问题待解决,如果理解有误欢迎讨论~~哈哈哈

    下面开始看下kafka0.9版本之后的方式,据说kafka在很久之前和rocketmq目前用的很像, 但后来改了..

    Kafka

    kafka还在看, 没按照预期搞完,见下偏文章哈~

    相关文章

      网友评论

        本文标题:Consume Rebalance in RocketMQ &

        本文链接:https://www.haomeiwen.com/subject/ioegqxtx.html