kafka 自定义的总结
controller
1.kafka的broker端选取一个broker作为controller用来管理partition的分配以及partition leader的选举
2.controller的选举是所有broker都去ZK上面注册临时节点注册成功的就是controller
3.当controller奔溃,其他的broker就继续注册临时节点竞选controller
4.当broker宕机且该broker上有对应的partition leader 那么controller会去zk上面读取该partition的ISR 选取一个作为
partition leader。然后controller将刚选出来地leader 通过rpc 发送给其他地broker。
5.上述leader选举过程中如果ISR列表为空则根据配置随机选一个副本作为leader,如果不为空但是对应的broker也是宕机则
等待机器恢复。或选择第一个“活”过来的Replica(不一定是ISR中的)作为Leader
coordinator
生产过程中broker要分配partition,消费过程这里,也要分配partition给消费者。
类似broker中选了一个controller出来,消费也要从broker中选一个coordinator,用于分配partition。
1.选coordinator
看offset保存在那个partition
该partition leader所在的broker就是被选定的coordinator
2.交互流程
consumer启动、或者coordinator宕机了,consumer会任意请求一个broker,发送ConsumerMetadataRequest请求,broker会按照上面说的方法,选出这个consumer对应coordinator的地址。
consumer 发送heartbeat请求给coordinator,返回IllegalGeneration的话,就说明consumer的信息是旧的了,需要重新加入进来,进行reblance。返回成功,那么consumer就从上次分配的partition中继续执行。
3.Consumer Rebalance的触发条件
(1)Consumer增加或删除会触发 Consumer Group的Rebalance
(2)Broker的增加或者减少都会触发 Consumer Rebalance
多副本同步
基本的策略是服务端这边的处理是follower从leader批量拉取数据来同步。但是具体的可靠性,是由生产者来决定的。
通过request.required.acks参数设置
ack=0,发过去就完事了,不关心broker是否处理成功,可能丢数据
ack=1,当写Leader成功后就返回,其他的replica都是通过fetcher去同步的,所以kafka是异步写,主备切换可能丢数据。
ack=-1, 要等到isr里所有机器同步成功,才能返回成功,延时取决于最慢的机器。强一致,不会丢数据。
replica副本数目不能大于kafka broker节点的数目,否则报错。
这里的replica数其实就是partition的副本总数,其中包括一个leader,其他的就是copy副本
partition leader的选举
将所有Broker(假设共n个Broker)和待分配的Partition排序
将第i个Partition分配到第(i mod n)个Broker上 (这个就是leader)
将第i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上
offset
当consumer刚启动的时候会去对应的partition拿offset,后期读取过程中offset 由consumer维护,consumer可以定期的
传递已经消费的offset。
broker对于offset是采用compact模式,只保留最新的key(key=groupid+topic+partition,value=offset)保存在topic
_consumer_offsets
消息投递语义
1.At least once 最多一次,消息可能会丢失,但不会重复
2.At least once:最少一次,消息不会丢失,可能会重复
3.Exactly once:只且一次,消息不丢失不重复,只且消费一次(0.11中实现,仅限于下游也是kafka)
log日志(segment一般都是按照partition来存放)
日志有三个分别如下,文件名以这个segment中最小的offset命名,文件扩展名是.log;segment对应的索引的文件名字一样,扩展名是.index。
有两个index文件,一个是offset index用于按offset去查message,一个是time index用于按照时间去查。
文件类型如下:
1.index
2.timeIndex
3.log
一、为了减少索引文件的大小,降低空间使用,方便直接加载进内存中,这里的索引使用稀疏矩阵。
索引包含baseOffset以及position
baseOffset:这条索引对应segment文件中的第几条message。这样做方便使用数值压缩算法来节省空间(如果使用原来的方式则baseOffset值很大),这样做方便使用数值压缩算法来节省空间。
position:在segment中的绝对位置。即我们该offset消息从哪个文件的哪个位置开始读取
image.png其中.log文件数据由许多message组成,下面详细说明message物理结构如下:
image.png解释如下:
image.png
在partition中如何通过offset查找message
例如读取offset=368776的message,需要通过下面2个步骤查找。
第一步查找segment file
上述图2为例,其中00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0.第二个文件 00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1.同样,第三个文件00000000000000737337.index的起始偏移量为737338=737337 + 1,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据offset 二分查找文件列表,就可以快速定位到具体文件。
当offset=368776时定位到00000000000000368769.index|log
第二步通过segment file查找message通过第一步定位到segment file,当offset=368776时,依次定位到00000000000000368769.index的元数据物理位置和 00000000000000368769.log的物理偏移地址,然后在index文件中通过二分法找到相邻(想等)的值,再去00000000000000368769.log顺序查找直到 offset=368776为止。
timeIndex和index的区别
timeIndex:它是映射时间戳和相对offset, 时间戳和相对offset作为entry,供占用12字节,时间戳占用8字节,相对offset占用4字节,这个索引也是稀疏索引,没有保存全部的消息的entry
index:每一个索引项为8字节,其中相对offset占用4字节,消息的物理地址(position)占用4个字节
通过timeIndex 可以支持时间戳索引来访问消息的方法
网友评论