Kafka中用zookeeper做了什么
都知道kafka用到了zookeeper,那么它用zookeeper做了什么呢?我们来启动一个kafka集群一探究竟。
- 查看zookeeper的节点,列出根节点下的所有子节点
kafka $ docker exec -it zoo1 bash
bash-4.4# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Mode: follower
bash-4.4# zkCli.sh
Connecting to localhost:2181
......
[zk: localhost:2181(CONNECTED) 11] ls /
[cluster, controller_epoch, controller, brokers, zookeeper, feature, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
- zk记录了kafka集群的broker信息
[zk: 172.19.0.11:2181(CONNECTED) 32] ls /brokers
[ids, topics, seqid]
[zk: 172.19.0.11:2181(CONNECTED) 33] ls /brokers/ids
[1003, 1002, 1001]
[zk: 172.19.0.11:2181(CONNECTED) 34] ls /brokers/ids/1003
[]
[zk: 172.19.0.11:2181(CONNECTED) 35] get /brokers/ids/1003
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka1:9092"],"jmx_port":-1,"port":9092,"host":"kafka1","version":5,"timestamp":"1615593328267"}
cZxid = 0x10000003c
ctime = Fri Mar 12 23:55:28 GMT 2021
mZxid = 0x10000003c
mtime = Fri Mar 12 23:55:28 GMT 2021
pZxid = 0x10000003c
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x2000c76ed810000
dataLength = 196
numChildren = 0
- 记录了控制器controller信息。谁先注册这个节点,谁就是controller。controller专门负责跟zk打交道,往里写数据。
[zk: localhost:2181(CONNECTED) 10] get /controller
{"version":1,"brokerid":1001,"timestamp":"1615612015060"}
- 创建topic名为userMsg,4个分区,3个备份. 然后查看zk,记录了topic及其分区信息。
bash-4.4# /opt/kafka_2.13-2.7.0/bin/kafka-topics.sh --create --topic userMsg --partitions 4 --zookeeper 172.19.0.11:2181 --replication-factor 3
Created topic userMsg.
查看zookeeper,多出了新建的topic及其分区信息。
[zk: 172.19.0.11:2181(CONNECTED) 25] ls /brokers/topics
[userMsg, chat, __consumer_offsets]
[zk: 172.19.0.11:2181(CONNECTED) 26] ls /brokers/topics/userMsg
[partitions]
[zk: 172.19.0.11:2181(CONNECTED) 27] ls /brokers/topics/userMsg/partitions
[0, 1, 2, 3]
[zk: 172.19.0.11:2181(CONNECTED) 28] ls /brokers/topics/userMsg/partitions/0
[state]
[zk: 172.19.0.11:2181(CONNECTED) 29] ls /brokers/topics/userMsg/partitions/0/state
[]
[zk: 172.19.0.11:2181(CONNECTED) 30] get /brokers/topics/userMsg/partitions/0/state
{"controller_epoch":1,"leader":1003,"version":1,"leader_epoch":0,"isr":[1003,1001,1002]}
cZxid = 0x1000000c9
ctime = Sat Mar 13 03:30:51 GMT 2021
mZxid = 0x1000000c9
mtime = Sat Mar 13 03:30:51 GMT 2021
pZxid = 0x1000000c9
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 88
numChildren = 0
- 删除一个topic,如果此时delete.topic.enable设置为false,那么zk的/admin/delete_topic节点下会记录当前待删topic
- 消费者位移
[zk: localhost:2181(CONNECTED) 2] ls /brokers/topics
[test, __consumer_offsets]
[zk: localhost:2181(CONNECTED) 3] ls /brokers/topics/__consumer_offsets
[partitions]
[zk: localhost:2181(CONNECTED) 4] ls /brokers/topics/__consumer_offsets/partitions
[44, 45, 46, 47, 48, 49, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43]
[zk: localhost:2181(CONNECTED) 5] ls /brokers/topics/__consumer_offsets/partitions/45
[state]
[zk: localhost:2181(CONNECTED) 6] ls /brokers/topics/__consumer_offsets/partitions/45/state
[]
[zk: localhost:2181(CONNECTED) 7] get /brokers/topics/__consumer_offsets/partitions/45/state
{"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0]}
cZxid = 0x87
ctime = Fri Feb 19 16:17:38 CST 2021
mZxid = 0x87
mtime = Fri Feb 19 16:17:38 CST 2021
pZxid = 0x87
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 72
numChildren = 0
- 其他信息
/consumers: consumers信息在kafka老版本里是记录在zk上的,新版本里挪到kafka broker内部去维护了。
/controller_epoch
/isr_change_notification
/log_dir_event_notification
/latest_producer_id_block
kafka在zookeeper中存储的信息
此图出自https://blog.csdn.net/lizhitao/article/details/23744675

这张图仅供参考,在新版本的kafka中,consumers的相关信息已经不存放在zookeeper上了,而是由kafka自己进行管理。
kafka为什么要存信息到zookeeper中
个人理解,之所以要用到zookeeper,在kafka设计的角度,将kafka集群的管理,broker leader的选举等交给第三方zookeeper去做。而kafka集群可以专注于消息的生产,存储,消费。
如果去除zookeeper,那么意味着kafka集群需要自己管理自己,成为一个自治的系统。kafka就需要有自己的一套机制去进行管理,这带来的集群内部通信等,可能会影响kafka处理消息的性能。
网友评论