美文网首页
Kafka Zookeeper 结构

Kafka Zookeeper 结构

作者: onedoc | 来源:发表于2019-10-10 14:11 被阅读0次

    Kafka 信息zk结构

    image.png

    启动server-0 --> zkcli --> ls /

    [zk: localhost:2181(CONNECTED) 1] ls /
    [cluster, controller, brokers, zookeeper, admin, isr_change_notification, log_dir_event_notification, 
    controller_epoch, consumers, latest_producer_id_block, config]
    

    cluster

    保存集群id和版本信息,broker 启动时从/cluster/id获取,如果没有broker生成

    [zk: localhost:2181(CONNECTED) 14] get /cluster/id
    {"version":"1","id":"cw-I6v4cSt2lsNio4v-rUQ"}
    

    controller 集群临时节点

    int (broker id of the controller) 存储center controller中央控制器所在kafka broker的信息
    Kafka集群中多个broker,有一个会被选举为controller leader,负责管理整个集群中分区和副本的状态,当partition的leader 副本故障,由controller 负责为该partition重新选举新的leader 副本;当检测到ISR列表发生变化,有controller通知集群中所有broker更新其MetadataCache信息;或者增加某个topic分区的时候也会由controller管理分区的重新分配工作。
    当broker启动的时候,都会创建KafkaController对象,但是集群中只能有一个leader对外提供服务,这些每个节点上的KafkaController会在指定的zookeeper路径下创建临时节点,只有第一个成功创建的节点的KafkaController才可以成为leader,其余的都是follower。当leader故障后,所有的follower会收到通知,再次竞争在该路径下创建节点从而选举新的leader。

    Schema:
    {
        "version": 版本编号默认为1,
        "brokerid": kafka集群中broker唯一编号,
        "timestamp": kafka broker中央控制器变更时的时间戳
    }
    [zk: localhost:2181(CONNECTED) 17] get /controller
    {"version":1,"brokerid":0,"timestamp":"1559195868940"}
    关闭brokerid 0
    [zk: localhost:2181(CONNECTED) 9] get /controller
    {"version":1,"brokerid":1,"timestamp":"1559232245731"}
    

    brokers [ids, topics, seqid]

    ids:/brokers/ids/[0...N] 
    每个broker的配置文件中都需要指定一个数字类型的id(全局不可重复),此节点为临时znode
    Schema:
    {
        "jmx_port": jmx端口号,
        "timestamp": kafka broker初始启动时的时间戳,
        "host": 主机名或ip地址,
        "version": 版本编号默认为1,
        "port": kafka broker的服务端端口号,由server.properties中参数port确定
    }
    [zk: localhost:2181(CONNECTED) 16] ls /brokers/ids
    [0, 1, 2]
    [zk: localhost:2181(CONNECTED) 17] get /brokers/ids/0
    {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},
    "endpoints":["PLAINTEXT://192.168.1.103:9092"],
    "jmx_port":-1,"host":"192.168.1.103",
    "timestamp":"1559232289265","port":9092,"version":4}
    
    seqid:/brokers/seqid
    broker启动时检查并确保存在, 永久节点
    
    topics:/brokers/topics/[xxxx] 
    kafka 集群中topic 信息
    Schema:
    {
        "version": "版本编号目前固定为数字1",
        "partitions": {
            "partitionId编号": [
                同步副本组brokerId列表
            ],
            "partitionId编号": [
                同步副本组brokerId列表
            ],
            .......
        }
    }
    [zk: localhost:2181(CONNECTED) 25] ls /brokers/topics
    [test1]
    [zk: localhost:2181(CONNECTED) 27] get /brokers/topics/test1
    {"version":1,"partitions":{"2":[2,1],"1":[1,0],"3":[0,1],"0":[0,2]}}
    
    /brokers/topics/[topic]/partitions/[0...N]/state 其中[0..N]表示partition索引号
    Schema:
    {
        "controller_epoch": 表示kafka集群中的中央控制器选举次数,
        "leader": 表示该partition选举leader的brokerId,
        "version": 版本编号默认为1,
        "leader_epoch": 该partition leader选举次数,
        "isr": [同步副本组brokerId列表]
    }
    [zk: localhost:2181(CONNECTED) 8] get /brokers/topics/test1/partitions/0/state
    {"controller_epoch":14,"leader":0,"version":1,"leader_epoch":0,"isr":[0,2]}
    [zk: localhost:2181(CONNECTED) 11] get /controller_epoch
    14
    关闭broker 0 选举broker2位leader isr 只有broker2节点
    [zk: localhost:2181(CONNECTED) 13] get /brokers/topics/test1/partitions/0/state
    {"controller_epoch":14,"leader":2,"version":1,"leader_epoch":1,"isr":[2]}
    [zk: localhost:2181(CONNECTED) 20] get /controller_epoch
    15
    重新启动broker0, 集群epoch 为变化,  p0的 leader epoch 增长为2
    [zk: localhost:2181(CONNECTED) 31] get /brokers/topics/test1/partitions/0/state
    {"controller_epoch":15,"leader":0,"version":1,"leader_epoch":2,"isr":[2,0]}
    [zk: localhost:2181(CONNECTED) 20] get /controller_epoch
    15
    

    controller_epoch(永久节点,控制年代)

    此值为一个数字,kafka集群中第一个broker第一次启动时为1,以后只要集群中center controller中央控制器所在broker变更或挂掉,就会重新选举新的center controller,每次center controller变更controller_epoch值就会 + 1;

    get /controller_epoch
    7
    

    isr_change_notification

    在Kafka 中, Leader 和Follower 的数据同步遵循的是"最终一致"原则, 也就是数据同步会有延迟, 但保证最终数据的一致性.isr 是'in-sync' replicas 的缩写,代表的是与Leader 数据已经同步过的replica, 它会作为重选Leader 时作为判断依据,用来处理ISR集合变更的动作.

    log_dir_event_notification

    consumer注册信息

    每个consumer都有一个唯一的ID(consumerId可以通过配置文件指定,也可以由系统生成),此id用来标记消费者信息

    consumerId产生规则:
     
       StringconsumerUuid = null;
           if(config.consumerId!=null && config.consumerId){
               consumerUuid = consumerId;
           }else {
               String uuid = UUID.randomUUID()
               consumerUuid = "%s-%d-%s".format(
                    InetAddress.getLocalHost.getHostName, System.currentTimeMillis,
                    uuid.getMostSignificantBits().toHexString.substring(0,8));
     
           }
      String consumerIdString = config.groupId + "_" + consumerUuid; 
     
    Schema:
    {
        "version": 版本编号默认为1,
        "subscription": { //订阅topic列表
            "topic名称": consumer中topic消费者线程数
        },
        "pattern": "static",
        "timestamp": "consumer启动时的时间戳"
    }
      
    Example:
    {
        "version":1,
        "subscription":{
            "replicatedtopic":1
        },
        "pattern":"white_list",
        "timestamp":"1452134230082"
    }
    

    5.consumer owner

    /consumers/[groupId]/owners/[topic]/[partitionId] -> consumerIdString + threadId索引编号

    当consumer启动时,所触发的操作:
    
    a) 首先进行"Consumer Id注册";
    
    b) 然后在"Consumer id 注册"节点下注册一个watch用来监听当前group中其他consumer的"退出"和"加入";只要此znode path下节点列表变更,
    
        都会触发此group下consumer的负载均衡.(比如一个consumer失效,那么其他consumer接管partitions).
    
    c) 在"Broker id 注册"节点下,注册一个watch用来监听broker的存活情况;如果broker列表变更,将会触发所有的groups下的consumer重新balance.
    

    consumer offset

    /consumers/[groupId]/offsets/[topic]/[partitionId] -> long (offset)
    用来跟踪每个consumer目前所消费的partition中最大的offset
    此znode为持久节点,可以看出offset跟group_id有关,以表明当消费者组(consumer group)中一个消费者失效,
    重新触发balance,其他consumer可以继续消费.

    latest_producer_id_block

    broker启动时提前预分配一段PID,当前是0~999,即提前分配出1000个PID来

    [zk: localhost:2181(CONNECTED) 32] get /latest_producer_id_block
    {"version":1,"broker":1,"block_start":"8000","block_end":"8999"}
    

    config

    kafka配置信息

    [zk: localhost:2181(CONNECTED) 0] ls /config
    [changes, clients, brokers, topics, users]
    

    /configs/topics 存放topic的定制化配置信息

    相关文章

      网友评论

          本文标题:Kafka Zookeeper 结构

          本文链接:https://www.haomeiwen.com/subject/vinmtctx.html