美文网首页Kafka文字欲
无镜--kafka之服务端处理读写请求

无镜--kafka之服务端处理读写请求

作者: 绍圣 | 来源:发表于2018-11-14 13:29 被阅读1次

    消费者发送加入组请求和同步组请求给服务端,服务端将请求的处理交给消费组的协调者(GroupCoordinator)。客户端发送生产请求和拉取请求给服务端,服务端将请求的处理交给副本管理器(RelicaManager)。与日志存储相关的业务组件是副本管理器,负责日志的底层类是日志管理器,副本管理器通过日志管理器间接的操作底层日志。

    服务端创建日志,读取日志,管理日志都只能通过日志管理器完成,日志管理器对象会传递给副本管理器,而副本管理器管理所有的分区,分区管理所有的副本,每个副本对应一个日志。分区在创建副本时才会创建日志。每个分区都需要持有副本管理器的引用,才能通过日志管理器创建日志。

    副本管理器

    追加消息时,生产者客户端会发送每个分区以及对应的消息集;拉取消息时,客户端会发送每个分区以及对应的拉取信息。服务端返回给客户端的响应结果是按照分区分别返回,生产请求的响应结果包含追加消息集到分区后的起始偏移量。拉取请求的响应结果包含每个分区的最高水位(HW),每个分区的消息集。

    追加和读取本地日志

    服务端接收客户端发送的生产请求和拉取请求都会包含多个分区。追加消息集和读取消息集,首先都需要获取到分区,然后再获取分区的主副本。追加消息集时,在分区中获取主副本,并写入本地日志文件;读取消息集时,在副本管理器中获取分区主副本,并读取本地日志文件。追加消息集和读取消息集都会获取到副本的日志对象(Log),然后分别调用日志的追加方法(append())和读取方法(read())。

    生产者用同步和异步的模式发送生产请求给服务端:同步模式下,生产者发送一条消息后,必须阻塞等到收到响应结果后,才会接着发送下一条消息;异步模式下,生产者发送一条消息后,不用等待收到上一条消息的响应结果,就可以接着发送下一条消息。生产者发送生产请求通过设置:request.required.acks的值来控制是否需要等待服务端的应答,应答值表示:生产者要求主副本收到指定数量(备份副本)的应答,才会认为生产请求完成。

    应答值为0:表示生产者不会等待服务端的任何应答,将消息发送到网络通道后就认为生产请求完成了。客户端发送生产请求后,对应的响应结果需要返回每条消息的在服务端偏移量,应答值等于0时,每条消息的偏移量都是-1。

    生产者设置的应答值等于0,可能会丢失数据:

    1,生产者将消息发送到网络通道后就认为生产请求完成。此时向网络通道发送3条数据:消息1,2,3.

    2,消息1和消息2通过网络通道发送出去了,但消息3没有被网络通道发送出去。消息3就不算完成。

    3,主副本只收到了消息1,但不保证备份副本复制了消息1。

    4,主副本挂掉了,它接收的消息1还没有备份,消息1就丢失了。

    5,备份副本变为主副本后,客户端认为成功的消息1和消息2实际上都丢失了。

    应答值为1:表示生产者会等待主副本收到一个应答后,认为生产请求完成。这个应答其实是主副本自己的应答:主副本收到客户端发送的消息集,并存储到本地日志后,生产请求就算完成,服务端可以返回响应结果给客户端(其实就是主副本写入日志成功)。这种情况也没有收到备份副本发送的应答,仍然有可能丢失消息:主副本写入本地日志后,服务端发送响应给客户端,生产者认为请求完成,但这时主副本挂掉,备份副本还没有来得及同步主副本写入日志的信息。

    应答值为-1:表示生产者发送生产请求后,所有处于同步中的备份副本(ISR)都向主副本发送了应答后,生产请求才算完成。如果这些ISR中的副本只要有一个没有向主副本发送应答,主副本就会阻塞等待,生产请求就不能完成。这种情况下只要ISR中有一个副本存活,消息就不会丢失。

    针对应答值为-1的情况,服务端为了等待备份副本发送应答,可以采用阻塞的方式,但这种方式对服务端的性能影响较大,所以针对这种情况kafka采用了延迟的操作。

    创建延迟的生产和延迟的拉取

    创建延迟的生产必须同时满足三个条件:

    1,request.required.acks=-1:生产者等待所有ISR中的备份副本都向主副本发送应答。

    2,生产者发送的消息集有数据。

    3,至少要有一个分区写入到主副本本地日志文件成功(一个请求中可能会包含多个分区的消息集,如果没有一个成功写入日志文件,创建延迟的生产就没有意义)。

    如果以上情况,有任意一条不满足,服务端就会立即返回响应结果给客户端。如果服务端没有创建延迟的生产,并不意味着备份副本不会向主副本拉取数据,只是生产者客户端不关心而已。

    创建延迟的拉取必须同时满足四个条件:

    1,拉取请求设置等待时间大于0。

    2,拉取请求必须设置拉取的分区。

    3,本次拉取还没有收集到足够的数据。

    4,拉取分区时不能发送错误。

    延迟的生产和延迟的拉取的超时时间都是客户端设置;延迟的生产和延迟的拉取的键都是分区。

    服务端的延迟对象完成都有两种方式:超时或者外部事件。延迟的生产的外部事件:ISR的所有备份副本都向主副本发送了应答;服务端处理备份副本的拉取请求,向主副本的本地日志读取消息集后(最高水位有关(HW))。延迟的拉取的外部事件:读取到足够数量的消息集;服务端处理生产请求,追加消息集到主副本的本地日志后。

    服务端处理的拉取请求可以来自消费者和备份副本,备份副本拉取主副本的消息,会尝试完成延迟的生产,而消费者拉取主副本消息则不会去尝试完成延迟的生产。

    生产者追加消息创建延迟的生产,它的限制条件是:所有ISR备份副本发送应答给主副本。当备份副本拉取消息时,表示备份副本会发送应答给主副本,就会尝试完成延迟的生产。备份副本拉取消息创建延迟的拉取,它的限制条件是:拉取到足够的消息集。当生产者追加到消息集到主副本后,表示有新的信息,就会尝试去完成延迟的拉取。

    来之《Kafka技术内幕:图文详解Kafka源码设计与实现》:延迟操作的限制条件和外部事件的关系

    分区与副本

    日志管理器是副本管理器的全局变量,副本管理器管理消息代理节点上的分区,副本管理器将日志管理器传递给所管理的分区,分区通过日志管理器为每个副本创建对应的日志文件。日志管理器对日志进行管理,副本管理器对副本进行管理。日志管理器(Logmanager)通过每个日志对象(log)管理日志的所有分段(LogSegment),副本管理器(ReplicaManager)通过每个分区(Partition)管理每个分区的所有副本(Replica)。

    副本机制:同一个分区的副本会存在于多个消息代理节点上,并被对应节点的副本管理器所管理。节点上的每个分区都有多个副本,但只有本地副本才有对应的日志文件。(多个消息代理节点管理了同一个分区,分区的不同副本,只有分区副本编号与代理节点编号相同的,被称为本地副本,才会有对应的日志文件,剩下的副本只是在当前节点上保留信息:其实是分区和副本的对应关系)

    来之《Kafka技术内幕:图文详解Kafka源码设计与实现》:多个消息代理节点的副本管理器管理了同一个分区

    备份副本会向主副本拉取消息保持数据的同步,主副本所在节点服务端处理备份副本的拉取请求,也会更新对应的备份副本信息(注意是主副本)。以上图中节点2和节点3上的备份副本向节点1上的主副本发送了拉取请求,节点1在返回主副本的数据给备份副本之前,分别是更新副本2和副本3的信息。这样如果需要获取分区的所有副本信息时,就不需要和备份副本所在的节点进行网络通信;还有即使主副本所在的节点挂掉,其他副本所在的节点也保存着分区和副本的关系。

    分区

    每个分区都是只有一个主副本和多个备份副本,不同节点上的分区对象,针对同一个分区,这个分区对象的主副本对象是同一个。还维护了所有副本集合(AR)和同步的副本(ISR)。

    例如:分区P1有三个副本编号【1,2,3】,分别存储在对应编号的节点上,每个节点的副本管理器都会管理分区P1。不同节点上的每个分区对象除了本地节点编号不一样,其他成员变量都一样:AR等于1,2,3,主副本等于2。

    副本

    分区创建副本分成本地副本和远程副本,节点编号和副本编号相同的副本叫做本地副本,编号不同的叫远程副本。

    本地副本和远程副本的区别:本地副本有日志(log),远程副本没有日志。

    数据目录下有三个检查点文件:恢复点,清理点,最高水位,最高水位检查点文件表示备份副本的数据同步位置。

    每个副本对象都定义了两个元数据:最高水位元数据(HW)和偏移量元数据(LEO),创建副本对象时从最高水位检查点文件中读取分区的HW作为初始的最高水位。获取副本偏移量元数据:本地副本通过读取日志文件的偏移量元数据。

    以下两种情况:

    1,消息集追加到主副本的本地日志,更新日志的下一个偏移量元数据(nextOffsetMetadata)。

    2,备份副本同步主副本的数据,将拉取结果写到本地日志,更新日志的下一个偏移量元数据(nextOffsetMetadata)。

    注意都只是更新的日志的下一个偏移量元数据(nextOffsetMetadata),没有操作副本的偏移量元数据。这是因为针对本地副本,当需要获取副本偏移量元数据,可以直接获取日志的偏移量元数据。

    具体步骤:

    1,生产者客户端将消息集追加到分区的主副本。

    2,消息集刷写到主副本的本地日志,会更新日志的偏移量元数据。

    3,其他消息代理节点上的备份副本向主副本所在的消息代理节点同步数据。

    4,主副本所在的副本管理器读取本地日志,更新对应拉取的备份副本信息。

    5,主副本所在的服务端将拉取结果返回给发起拉取请求的备份副本。

    6,备份副本接收到服务端饭会的拉取结果,将消息集追加到本地日志,更新日志的偏移量元数据。

    总结:

    某一个分区的所有副本,都在与之编号相同的消息代理节点有对应的本地日志文件,同时消息代理节点上的副本管理会管理这个分区的所有副本的信息。主副本和备份副本信息的更新情况不同:主副本:消息集追加到主副本,刷写到日志文件后更新日志的偏移量元数据,主副本在处理备份副本拉取请求时所在节点的副本管理器读取本地日志,更新对应拉取的备份副本在主副本节点上的信息。备份副本:发起拉取请求,接收来之主副本节点的拉取请求结果,并追加到本地日志中,并更新日志的元数据偏移量。

    来之《Kafka技术内幕:图文详解Kafka源码设计与实现》:更新本地日志和备份副本的偏移量元数据

    注意:这里只有主副本所在的副本管理器读取本地日志更新其主副本的分区的备份副本信息。在备份副本所在的节点的副本管理器,备份副本接收到服务端返回的拉取结果,将消息集追加到本地日志,更新日志的偏移量元数据后,并没有读取备份副本的本地日志更新对应的副本信息。

    我的理解这是因为:备份副本本身的目的是提供消息的备份,备份副本所在的节点保留分区与副本的对应关系。在主副本所在的节点挂掉后,选择出来的备份副本可以继续提供读写服务,并且数据不丢失。并且保留的分区和副本的对应关系可以知道还有哪些备份副本,并且当剩下的备份副本发送拉取请求后,当下的副本(已经变成主副本)所有在节点的副本管理器就会读取本地日志,更新对应拉取的备份副本信息。这样原本备份副本节点上面的副本信息就会更新到最近的副本信息。

    备份副本同步数据

    备份副本向主副本所在的消息代理节点发送拉取请求,会指定备份副本编号。服务端处理备份副本请求,会读取主副本的本地日志文件,然后更新备份副本的偏移量元数据;扩展分区的ISR集合;是否增加主副本最高水位;如果主副本的最高水位增加还会尝试完成延迟的生产请求和延迟的拉取请求。

    扩展ISR的集合必须满足三个条件:

    1,备份副本之前不在ISR集合中。2,备份副本必须在分区的AR中。3,备份副本的偏移量必须大于或者等于主副本的最高水位,因为这样才能表示备份副本已经跟上了主副本的脚步。

    主副本的最高水位是ISR集合中偏移量最小的值。

    为什么说主副本的最高水位增加了就可以尝试完成延迟的生产请求和延迟的拉取请求?

    1,消费者最多只能消费到主副本的最高水位。如果消费者已经消费到最高水位,但是主副本的最高水位一直没有增加,服务端就不会返回拉取结果给消费者。而一旦主副本的最高水位增加了,就有可能满足“拉取到足够的消息”的限制条件,服务端就可以返回拉取结果给消费者。

    2,主副本等待ISR集合中的所有备份副本都向它发生应答,在这之前,服务端不会返回生产请求给生产者,主副本的最高水位会选择ISR集合中所有备份副本的最小偏移量值。服务端处理备份副本的拉取请求,会更新备份副本的偏移量,那么就有可能增加主副本的最高水位,一旦主副本的最高水位增加了,那么ISR集合中的所有备份副本一定都发送了应答,服务器就可以返回生产请求应答给生产者了。

    偏移量,最高水位,复制点

    备份副本向主副本同步数据过程中,备份副本自己会更新本地的日志偏移量,主副本所在的服务端也会更新对应备份副本的偏移量和最高水位。

    具体如下:

    1,生产者向主副本发送消息集,主副本的偏移量增加,初始时所有的副本的最高水位都是0.

    2,备份副本拉取到数据,更新本地的偏移量。拉取响应带有主副本的最高水位,但是主副本的最高水位还是0,因此备份副本的最高水位也是0。

    3,备份副本再次拉取数据,会更新主副本的最高水位。主副本返回给备份副本的拉取响应中包含最新的最高水位。

    4,备份副本拉取到数据,更新本地的偏移量,也会更新备份副本的最高水位。

    通过以上的操作,主副本记录的备份副本的偏移量与备份副本自己记录的偏移量是一致的。

    更新副本偏移量:

    1,追加消息到主副本的本地日志 、备份副本拉取消息写到自己的本地日志,都会更新日志的偏移量。

    2,主副本所在的服务端处理备份副本的拉取请求,也会更新分区中备份副本对应的偏移量。

    更新副本最高水位:

    1,主副本的最高水位取决于ISR中所有副本的最小偏移量。最小值没有变化,最高水位也不会变化。

    2,备份副本的最高水位取决于主副本的最高水位和它自己的偏移量,它会选择这两者的最小值。

    日志管理器会定时将所有分区的副本偏移量,刷写到恢复点检查点文件;副本管理器会定时将分区副本的最高水位,刷写到复制点文件(最高水位检查点文件)。

    同一个分区在不同的消息代理节点上,它们的本地副本都有偏移量和最高水位。主副本所在的节点会记录所有副本的偏移量和主副本的最高水位。备份副本所在的节点只会记录自己的偏移量和最高水位,不会记录其他副本的偏移量和最高水位。

    消费者客户端最多只会读取到主副本的最高水位。但因为主副本可能会出现故障,所以备份副本也需要记录最高水位。当主副本出现故障时,备份副本成为主副本,它的最高水位和之前主副本的最高水位保持一致,消费者客户端就不会丢失数据。

    参考资料:

    Kafka技术内幕:图文详解Kafka源码设计与实现

    相关文章

      网友评论

        本文标题:无镜--kafka之服务端处理读写请求

        本文链接:https://www.haomeiwen.com/subject/dzewxqtx.html