如何保存消费端的消费位置
offset的定义
每个topic可以划分多个分区(每个Topic至少有一个分区),同一topic下的不同分区包含的消息是不同的。每个消息在被添加到分区时,都会被分配一个offset(称之为偏移量),它是消息在此分区中的唯一编号,kafka通过offset保证消息在分区内的顺序,offset的顺序不跨分区,即kafka只保证在同一个分区内的消息是有序的; 对于应用层的消费来说,每次消费一个消息并且提交以后,会保存当前消费到的最近的一个offset。
offset的维护位置
在kafka中,提供了一个consumer_offsets_*
的一个topic,把offset信息写入到这个topic中。 consumer_offsets——按保存了每个consumer group某一时刻提交的offset信息。 __consumer_offsets 默认有50个分区。
计算公式
Math.abs(“groupid”.hashCode())%groupMetadataTopicPartitionCount ;
由于默认情况下 groupMetadataTopicPartitionCount有50个分区,计算得到的结果为:x, 意味着当前的 consumer_group的位移信息保存在__consumer_offsets的第x个分区
分区的副本机制
- 我们已经知道kafka的每个topic都可以分为多个partition,并且多个partition会均匀分布在集群的各个节点下。虽然这种方式能够有效的对数据进行分片,但是对于每个partition来说,都是单点的,当其中一个partition不可用的时候,那么这部分消息就没办法消费。所以kafka为了提高partition的可靠性而提供了副本的概念(Replica),通过副本机制来实现冗余备份。
- 每个分区可以有多个副本,并且在副本集合中会存在一个leader的副本,所有的读写请求都是由leader 副本来进行处理。剩余的其他副本都做为follower副本,follower副本会从leader副本同步消息日志。 这个有点类似zookeeper中leader和follower的概念,但是具体的时间方式还是有比较大的差异。所以 我们可以认为,副本集会存在一主多从的关系。
- 一般情况下,同一个分区的多个副本会被均匀分配到集群中的不同broker上,当leader副本所在的 broker出现故障后,可以重新选举新的leader副本继续对外提供服务。通过这样的副本机制来提高 kafka集群的可用性。
创建一个带副本机制的topic(集群情况下操作)
通过下面的命令去创建带2个副本的topic
sh kafka-topics.sh --create --zookeeper 192.168.11.156:2181 --replication-factor 3 --partitions 3 --topic secondTopic
然后我们可以在/tmp/kafka-log路径下看到对应topic的副本信息了。我们通过一个图形的方式来表达。 针对secondTopic这个topic的3个分区对应的3个副本
kafka-17.png
确定leader副本
在zookeeper服务器上,通过如下命令去获取对应分区的信息, 比如下面这个是获取secondTopic第1个分区的状态信息。get /brokers/topics/secondTopic/partitions/1/state得到{"controller_epoch":12,"leader":0,"version":1,"leader_epoch":0,"isr":[0,1]}
或通过这个命令sh kafka-topics.sh --zookeeper 192.168.13.106:2181 --describe --topic test
需要注意的是,kafka集群中的一个broker中最多只能有一个leader副本,leader副本所在的broker节点的分区叫leader节点,follower副本所在的broker节点的分区叫follower节点
副本的leader选举
-
kafka提供了数据复制算法保证,如果leader副本所在的broker节点宕机或者出现故障,或者分区的leader节点发生故障,那么,kafka必须要保证从follower副本中选择一个新的leader副本。
-
Kafka分区下有可能有很多个副本(replica)用于实现冗余,从而进一步实现高可用。副本根据角色的不同可分为3类:
1.leader副本:响应clients端读写请求的副本
2.follower副本:被动地备份leader副本中的数据,不能响应clients端读写请求。
3.ISR副本:包含了leader副本和所有与leader副本保持同步的follower副本——如何判定是否与leader同步后面会提到每个kafka副本对象都有两个重要的属性:LEO和HW。注意是所有的副本,而不只是 leader副本。
LEO
即日志末端位移(log end offset),记录了该副本底层日志(log)中下一条消息的位移值。注意是下一条消息!也就是说,如果LEO=10,那么表示该副本保存了10条消息,位移值范围是[0, 9]。另外, leader LEO和follower LEO的更新是有区别的。
HW
即上面提到的水位值。对于同一个副本对象而言,其HW值不会大于LEO值。小于等于HW值的所有消息都被认为是“已备份”的(replicated)。同理,leader副本和follower副本的HW更新是有区别.
kafka-21.png从生产者发出的一条消息首先会被写入分区的leader副本,不过还需要等待ISR集合中的所有follower副本都同步完之后(当然这边也会有不同的acks 机制)才能被认为已经提交,之后才会更新分区的HW, 进而消费者可以消费 到这条消息。
副本协同机制
- 消息的读写操作都只会由leader节点来接收和处理。follower副本只负责同步数据以及当 leader副本所在的broker挂了以后,会从follower副本中选取新的leader。
- 写请求首先由Leader副本处理,之后follower副本会从leader上拉取写入的消息,这个过程会有一定的延迟,导致follower副本中保存的消息略少于leader副本,但是只要没有超出阈值都可以容忍。但是如果一个follower副本出现异常,比如宕机、网络断开等原因长时间没有同步到消息,那这个时候, leader就会把它踢出去。kafka通过ISR集合来维护一个分区副本信息
kafka-18.png
图解:一个新leader被选举并被接受客户端的消息成功写入。Kafka确保从同步副本列表中选举一个副本为 leader;leader负责维护和跟踪ISR(in-Sync replicas , 副本同步队列)中所有follower滞后的状态。当 producer发送一条消息到broker后,leader写入消息并复制到所有follower。消息提交之后才被成功复制到所有的同步副本。
ISR
ISR表示目前“可用且消息量与leader相差不多的副本集合,这是整个副本集合的一个子集”。怎么去理解可用和相差不多这两个词呢?具体来说,ISR集合中的副本必须满足以下的条件
- 副本所在节点必须维持着与zookeeper的连接
- 副本最后一条消息的offset与leader副本的最后一条消息的offset之间的差值不能超过指定的阈值 (replica.lag.time.max.ms) replica.lag.time.max.ms:如果该follower在此时间间隔内一直没有追上过leader的所有消息,则该follower就会被剔除isr列表
- ISR数据保存在Zookeeper的
/brokers/topics/<topic>/partitions/<partitionId>/state
节点中
follower副本把leader副本LEO之前的日志全部同步完成时,则认为follower副本已经追赶上了leader 副本,这个时候会更新这个副本的lastCaughtUpTimeMs标识,kafk副本管理器会启动一个副本过期检 查的定时任务,这个任务会定期检查当前时间与副本的lastCaughtUpTimeMs的差值是否大于参数replica.lag.time.max.ms 的值,如果大于,则会把这个副本踢出ISR集合
图解
kafka-19.png
副本同步数据原理
初始化状态
初始状态下,leader和follower的HW和LEO都是0,leader副本会保存remote LEO,表示所有follower LEO,也会被初始化为0,这个时候,producer没有发送消息。follower会不断地个leader发送FETCH 请求,但是因为没有数据,这个请求会被leader寄存,当在指定的时间之后会强制完成请求,这个时间 配置是(replica.fetch.wait.max.ms),如果在指定时间内producer有消息发送过来,那么kafka会唤醒 fetch请求,让leader继续处理 kafka-22.png数据的同步处理会分两种情况,这两种情况下处理方式是不一样的
1.第一种是leader处理完producer请求之后,follower发送一个fetch请求过来
2. 第二种是follower阻塞在leader指定时间之内,leader副本收到producer的请求。
第一种情况
leader处理完producer请求之后,follower发送一个fetch请求过来 。状态图如下
kafka-22.pngleader副本收到请求以后,会做几件事情
1.把消息追加到log文件,同时更新leader副本的LEO
2.尝试更新leader HW值。这个时候由于follower副本还没有发送fetch请求,那么leader的remote LEO仍然是0。leader会比较自己的LEO以及remote LEO的值发现最小值是0,与HW的值相同,所以不会更新HW
follower fetch消息
follower 发送fetch请求,leader副本的处理逻辑是:
1.读取log数据、更新remote LEO=0(follower还没有写入这条消息,这个值是根据follower的fetch 请求中的offset来确定的)
2.尝试更新HW,因为这个时候LEO和remoteLEO还是不一致,所以仍然是HW=0
3.把消息内容和当前分区的HW值发送给follower副本
follower副本收到response以后
1.将消息写入到本地log,同时更新follower的LEO
2.更新follower HW,本地的LEO和leader返回的HW进行比较取小的值,所以仍然是0
第一次交互结束以后,HW仍然还是0,这个值会在下一次follower发起fetch请求时被更新
follower发第二次fetch请求,leader收到请求以后
1.读取log数据
2.更新remote LEO=1, 因为这次fetch携带的offset是1.
3.更新当前分区的HW,这个时候leader LEO和remote LEO都是1,所以HW的值也更新为1 4. 把数据和当前分区的HW值返回给follower副本,这个时候如果没有数据,则返回为空
follower副本收到response以后
1.如果有数据则写本地日志,并且更新LEO
2.更新follower的HW值
到目前为止,数据的同步就完成了,意味着消费端能够消费offset=1这条消息。
第二种情况
前面说过,由于leader副本暂时没有数据过来,所以follower的fetch会被阻塞,直到等待超时或者 leader接收到新的数据。当leader收到请求以后会唤醒处于阻塞的fetch请求。处理过程基本上和前面说 的一致
- leader将消息写入本地日志,更新Leader的LEO 2. 唤醒follower的fetch请求
- 更新HW
kafka使用HW和LEO的方式来实现副本数据的同步,本身是一个好的设计,但是在这个地方会存在一个 数据丢失的问题,当然这个丢失只出现在特定的背景下。我们回想一下,HW的值是在新的一轮FETCH 中才会被更新。我们分析下这个过程为什么会出现数据丢失
网友评论