美文网首页数客联盟程序员
Broker下线引起的partition的leader选举机制

Broker下线引起的partition的leader选举机制

作者: Woople | 来源:发表于2018-02-20 22:19 被阅读115次

Kafka的Broker下线一般有两种情况:

  • 一种是Broker的进程被kill,例如可能Broker所在主机宕机或者故障;
  • 一种就是正常停止Broker,通过发送ControlledShutdownRequest请求的方式,例如使用Kafka提供的脚本停止Broker或者其他引起ControlledShutdownRequest的场景。

在这两种情况下,如果某个topic的partition的leader恰好是这个下线的Broker,那么这个partition就要重新选举leader,这两种情况的选举算法是不同的,本文使用HDP-2.6.0.3基于kafka 0.10.x版本将阐述这两种选举机制的不同。

Partition的leader的5种选举机制

Partition选举leader有5种方式分别是ReassignedPartitionLeaderSelector,PreferredReplicaPartitionLeaderSelector,ControlledShutdownLeaderSelector,NoOpLeaderSelector,OfflinePartitionLeaderSelector。本文只介绍ControlledShutdownLeaderSelectorOfflinePartitionLeaderSelector

Broker的进程被kill

在ReplicaStateMachine中有一个BrokerChangeListener,负责监听ZooKeeper中/brokers/ids下节点的变化,当某个broker被kill的时候会导致这个路径下相应的节点被删除,从而触发BrokerChangeListener中的handleChildChange方法,其中关键代码如下

if(deadBrokerIds.size > 0)
                controller.onBrokerFailure(deadBrokerIdsSorted)

然后执行的是KafkaController.onBrokerFailure,其中关键代码如下

// trigger OnlinePartition state changes for offline or new partitions
partitionStateMachine.triggerOnlinePartitionStateChange()

从而进入PartitionStateMachine.handleStateChange方法,这时候传入的选举策略就是controller.offlinePartitionSelector,即OfflinePartitionLeaderSelector

handleStateChange(topicAndPartition.topic,topicAndPartition.partition, OnlinePartition, 
                            controller.offlinePartitionSelector,
                            (new CallbackBuilder).build)

所以在这种情况下,partition选举leader的策略就是OfflinePartitionLeaderSelector。下面是源码中对这种策略的解释:

Select the new leader, new isr and receiving replicas (for the LeaderAndIsrRequest):

  1. If at least one broker from the isr is alive, it picks a broker from the live isr as the new leader and the live
    isr as the new isr.
  2. Else, if unclean leader election for the topic is disabled, it throws a NoReplicaOnlineException.
  3. Else, it picks some alive broker from the assigned replica list as the new leader and the new isr.
  4. If no broker in the assigned replica list is alive, it throws a NoReplicaOnlineException.
    Replicas to receive LeaderAndIsr request = live assigned replicas
    Once the leader is successfully registered in zookeeper, it updates the allLeaders cache.

已经很清晰了,就不再翻译,而且代码逻辑不复杂,也可以直接阅读源码。这里需要强调一点,就是unclean.leader.election.enable这个参数,简单说,如果ISR为空,那么当这个参数为true的时候,可以在AR(Assigned Replicas)列表中选择一个作为leader。

正常停止Broker

在调用KafkaServer.shutdown方法的时候,会调用controlledShutdown方法,而这个方法中会执行如下代码

// send the controlled shutdown request
val requestHeader = networkClient.nextRequestHeader(ApiKeys.CONTROLLED_SHUTDOWN_KEY)

这个请求会触发KafkaApis中的

ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request)

在这个方法里面会继续调用

val partitionsRemaining = controller.shutdownBroker(controlledShutdownRequest.brokerId)

而在KafkaController.shutdownBroker中会调用如下方法,传入的选举leader的策略就是ControlledShutdownLeaderSelector

partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition,
                    controlledShutdownPartitionLeaderSelector)

下面是源码中对这种策略的解释:

New leader = replica in isr that's not being shutdown;
New isr = current isr - shutdown replica;
Replicas to receive LeaderAndIsr request = live assigned replicas

从这个解释或者源码中可以看到,这个策略比较简单,就是选择在ISR中没有下线的第一个Broker作为这partition的新leader。

验证

  • 通过页面Ambari页面正常停止Broker 1001,前后的对比
/usr/hdp/2.6.0.3-8/kafka/bin/kafka-topics.sh --describe --zookeeper hostA:2181 --topic bar
Topic:bar       PartitionCount:3        ReplicationFactor:3     Configs:
        Topic: bar      Partition: 0    Leader: 1001    Replicas: 1001,1003,1002        Isr: 1003,1002,1001
        Topic: bar      Partition: 1    Leader: 1002    Replicas: 1002,1001,1003        Isr: 1002,1003,1001
        Topic: bar      Partition: 2    Leader: 1003    Replicas: 1003,1002,1001        Isr: 1003,1002,1001
/usr/hdp/2.6.0.3-8/kafka/bin/kafka-topics.sh --describe --zookeeper hostA:2181 --topic bar
Topic:bar       PartitionCount:3        ReplicationFactor:3     Configs:
        Topic: bar      Partition: 0    Leader: 1003    Replicas: 1001,1003,1002        Isr: 1003,1002
        Topic: bar      Partition: 1    Leader: 1002    Replicas: 1002,1001,1003        Isr: 1002,1003
        Topic: bar      Partition: 2    Leader: 1003    Replicas: 1003,1002,1001        Isr: 1003,1002

可以在Kafka Controller日志中找到如下打印

DEBUG [ControlledShutdownLeaderSelector]: Partition [bar,0] : current leader = 1001, new leader = 1003 (kafka.controller.ControlledShutdownLeaderSelector)
  • 通过kill Broker的进程停止Broker 1002,前后的对比
/usr/hdp/2.6.0.3-8/kafka/bin/kafka-topics.sh --describe --zookeeper hostA:2181 --topic bar
Topic:bar       PartitionCount:3        ReplicationFactor:3     Configs:
        Topic: bar      Partition: 0    Leader: 1003    Replicas: 1001,1003,1002        Isr: 1003,1001
        Topic: bar      Partition: 1    Leader: 1002    Replicas: 1002,1001,1003        Isr: 1002,1003,1001
        Topic: bar      Partition: 2    Leader: 1003    Replicas: 1003,1002,1001        Isr: 1003,1001
/usr/hdp/2.6.0.3-8/kafka/bin/kafka-topics.sh --describe --zookeeper hostA:2181 --topic bar
Topic:bar       PartitionCount:3        ReplicationFactor:3     Configs:
        Topic: bar      Partition: 0    Leader: 1001    Replicas: 1001,1003,1002        Isr: 1003,1001
        Topic: bar      Partition: 1    Leader: 1001    Replicas: 1002,1001,1003        Isr: 1003,1001
        Topic: bar      Partition: 2    Leader: 1003    Replicas: 1003,1002,1001        Isr: 1003,1001

可以在Kafka Controller日志中找到如下打印

INFO [OfflinePartitionLeaderSelector]: Selected new leader and ISR {"leader":1001,"leader_epoch":3,"isr":[1003,1001]} for offline partition [bar,1] (kafka.controller.OfflinePartitionLeaderSelector)

总结

本文只是分析了在Broker下线的两种场景中,partition重新选举leader的两种机制。当然在实际生产环境中会遇到更多更复杂的情况,所以在遇到partition找不到leader的时候需要根据Controller日志分析当前场景下,使用的是哪种选举机制才能找到问题的根因。

相关文章

网友评论

    本文标题:Broker下线引起的partition的leader选举机制

    本文链接:https://www.haomeiwen.com/subject/pdngtftx.html