Apache Kafka 是一种分布式消息系统,由Scala语言编写而成。
Kafak 基本概念
message
message 是 Kafka 中最基本的数据单元,它由 key 和 value 两部分构成,KV 都为字节数组。
Kafka 会按照一定的策略,将消息按照 key 值路由到指定的 partition 中,从而保证 key 相同的 message 全部写入同一 partition 中。
value 是 Kafka真正传递的业务数据。
topic、partition、broker
topic 是 Kafka 中的一个逻辑概念,kafka 集群中可以定义多个 topic。
在使用的时候,producer 和 consumer 会约定好一个 topic 名称,producer 产生的 message 会写入到该指定的 topic 中,consumer 会从该指定的 topic 中读取 message。
topic 可以看做为一个 message 通道,连接多个 producer 和 consumer,如下图所示:
1、topic.png
在 topic 这个逻辑概念中,kafka 做了进一步的拆分,将一个 topic 分为一个或多个 partition(一个 topic 至少有一个 partition),如下图所示:
2、partition.png
从上图可以看出,当 message 在被 producer push 到一个 partition 的时候,都会被分配一个 offset 编号, offset 是这条 message 在这个 partition 中的唯一编号。
通过 offset 编号,kafka 就可以保证一个 partition 内的 message 是有序的,我们可以认为 partition 是一个用来记录有序 message 的存储。
在生产环境中,不同 topic 需要支持的流量大小有所不同,也就是说,topic 需要横向扩展的能力,kafka 就是通过 partition 来实现的。
同一 topic 的不同 partition 会分配到不同的物理机上,partition 是 topic 横向扩展的最小单位,一个 partition 中存储的这一组有序 message 没法存储到多台机器上。
常见的 kafka 架构如下图所示:
3、partition2.png
从上图中我们可以看到,多个 partition 位于同一个 broker 中,broker 其实就是一个 kafka server 服务,kafka 集群是由多个 broker 构成的。
我们可以通过增加 partition 以及 broker 的方式,提高 topic 的吞吐量。
这里的 broker 是 kafka 的核心之一,主要职责有下面核心三个:
- 接收 producer 发送过来的 message,并保存到磁盘中
- 处理 consumer 的请求
- 处理集群其他Broker的请求,根据请求类型进行相应处理并返回响应。
log
了解了 partition、topic、broker 这个三个宏观概念之后,我们向微观方向进一步分析。
partition 在逻辑上对应着一个 log,当 producer 向 partition 推送 message 的时候,kafka 实际上是将 message 写入到 partition 对应的 log 中。
log 由多个 segment 构成,log 和 segment 都是逻辑上的概念。实际上,log 对应 磁盘上的一个文件夹,segment 对应 log 文件夹下的一个 segment 文件和一个 index 文件。
在写入 partition 时,我们只会写入最新的 segment 文件和 index 索引文件,当这个 segment 文件膨胀到一定大小之后,会创建新的 segment 文件继续写入,旧的 segment 文件就不再写入了。
之所以这么设计,为了避免出现超大的单个 segment 文件,也是为了采用顺序 IO 的方式写入,所以只向最新的 segment 文件追加数据。
再来看 index 文件,它是 segment 文件的一个稀疏索引的文件,在 kafka 运行过程中,会将 index 文件内容映射到内存中,提高索引速度。
replica
为了提高分布式系统的可用性,保证的数据完整性和安全性,我们一般会对数据进行备份,kafka 也是这个套路。
在 kafka 中,partition 一般会有多个 replica(副本)(至少一个 replica),同一个 partition 的不同 replica 中的 message 是一模一样的。
partition 中的 replica 虽然数据一样,但是还是有角色上的区分:一个 partition 下的多个 replica 中,有一个是 leader replica,其他的 replica 都是 follower 角色。
所有的读写请求都由 leader replica 进行处理的,其他的 follower replica,仅仅是定期从 leader replica 拉取新写入的 message 到本地,并同步更新到自己的 log中。结构图如下图所示:
5、replica.png
正如上图所示,同一 partition 的不同 replica 会被分配到不同的 broker上。这样,当 leader replica 所在的 broker 宕机之后,kafka 从剩余的 follower replica 中选举出新的 leader replica,然后由新 leader replica 继续对外提供读写服务。
ISR集合
下面我们继续深入聊一下 replica。
kafka replica 中有个叫做 "ISR" 的概念,全称为 "In-Sync Replica",它表示的是一个 replica 集合,该集合中的 replica 必须满足下面两个条件:
- replica 所在 broker 必须与 Zookeeper 集群保持连接。
- replica 中最后一条 message 的 offset 与 leader replica 中的最后一条 message 的 offset 之间的差值,不能超出指定的阈值。
kafka 集群会为每个 partition 维护一个 ISR 集合,kafka 写入 message 的功能与 ISR 集合有非常密切的联系:在 leader replica 收到写请求的时候,首先是由 leader replica 进行处理完成持久化到本地的 log,之后 follower replica 定期请求 leader replica,拉取新写入的 message,并同步到 follower replica 本地的 log 中。
当然,follower replica 的定期同步请求相对于 leader replica 的写入操作来说,会有一定的延迟,这就会导致 follower replica 中存储的 message 略微落后于 leader replica,但是只要未超出指定阈值都是可以容忍的,这些 follower replica 也就是处于 ISR 集合中。
如果一个 follower replica 出现宕机、长时间 GC、网络故障等等异常请求,导致其长时间没有与 leader replica 进行数据同步,也就是,导致 follower replica 不再满足条件,而被踢出 ISR 集合。
在 follower replica 从上述故障中恢复之后,会重新开始与 leader replica 进行同步,当 follower replica 追上 leader replica 时(即两者最后一条 message 的 offset 的差值小于指定阈值),该 follower replica 会被重新加入到 ISR 集合中。
HighWatermark、Log End Offset
HW(HighWatermark)和 LEO(Log End Offset)与上面的 ISR 集合紧密相关。
HW 记录的是一个特殊的 offset 值,在 consumer 拉取 message 的时候,只能拉取到 HW 之前的 message,HW 之后的 message 对 consumer 来说是不可见的。
与 ISR 集合类似,HW 也是由 leader replica 管理的。当 ISR 集合中全部的 follower replica 都同步了 HW 对应的 message 之后,leader replica 会递增 HW 值。
正因为 HW 之前的 message 同时存在于多个 replica 中,即使 leader replica 出现宕机或是磁盘损坏等故障,这些 message 也不会出现数据丢失(follower replica 会重新竞选为 leader replica,并提供读服务),所以 kafka 认为 HW 之前 message 处于 "已提交" 状态。
LEO(Log End Offset)是所有的 replica 都会有的一个 offset 标记,它用来记录追加到当前 replica 中的最后一个 message 的 offset 值:
- 当 leader replica 接收到 producer 发送的 message 时,leader replica 的 LEO 标记会增加。
- 当 follower replica 成功从 leader replica 中拉取 message 并更新到本地 log 时,follower replica 的 LEO 也会增加。
了解了 HW 和 LEO 的基本概念之后,我们这里用一个动态的方式来说明 HW 和 LEO 是如何一起工作的,如下图所示:
6、HW+LEO.png
- 首先来看 producer,它向我们关心的一个 topic 中的一个 partition 发送一条 message。
- 该 message 写入 leader replica 时,被分配的 offset 为 5,同时 leader replica 会将其 LEO 值从 4 修改为 5,此时 leader replica 的 HW 依旧为 4。
- follower replica 从 leader replica 拉取 message 并同步到本地 log,follower replica 会将自身的 LEO 从 4 修改为 5。
- 当 ISR 集合中所有 replica 都完成了对 offset=5 的同步,leader replica 会将 HW 标记从 4 修改为 5。
完成上述操作之后,consumer 再次来拉取 message 的时候,就可以看到 offset 为 5 的 message 了。
复制方案设计
介绍完 replica 的基础知识点之后,我们再深入理解一下 replica 设计思想。如前文所说,冗余备份是分布式存储中常见的方案之一,备份的常用方案有同步复制和异步复制两种:
- 同步复制的含义是所有 follower replica 都要复制完一条 message 之后,该 message 才是 "已提交" 状态。
- 异步复制的含义是 leader replica 收到 producer 发送的 message 之后,会立刻更新 HW,也就是认为该 message 处于"已提交"的状态。之后,follower replica 会异步的从 leader replica 中同步 message。
下面来看说这两个复制方案的问题:
- 对于同步复制方案来说,一旦有一个 follower replica 出现故障或是长时间 Full GC,无法同步 message,就会导致 HW 无法更新,进而导致 message 无法提交,consumer 也就获取不到消息。此时的故障 follower replica 会导致整个分布式系统不可用。
- 对于异步复制方案来说,虽然避免了同步复制方案一个故障点拖垮整个集群的问题,但存在数据丢失的风险。例如,在一个集群中,所有的 follower replica 的同步速度都比较慢,并且其中存储的 message 量都远远落后于 leader replica。如果此时 leader replica 发生宕机,则重新选举出的 leader replica 中没有原 leader replica 的全部 message,就会造成数据丢失,另外,此时的一些 consumer 可能消费了这些丢失的 message,而且没人知道这些丢失的 message 是什么,整个系统的状态就不可控。
kafka 的 replica 和 ISR 集合设计,权衡了同步复制和异步复制两种方案:
- 当某个 follower replica 的同步落后于 leader replica 太多的时候,kafka 将其判定为可能出现故障的 replica,会将其踢出 ISR 集合。由于 message 的提交,只关注 ISR 集合中的 replica,所以这个慢速同步的 replica 并不会影响整个系统的性能。
- 当 leader replica 出现宕机等异常情况的时候,kafka 会优先从 ISR 集合中选举新的 leader replica,新 leader replica 中包含了 HW 标记之前的全部 message。因为 HW 之后的 message 处于"未提交"状态,从 kafka 集群之外看,是感知不到 leader replica 的切换,数据也不会丢失。
Retention Policy & Log compaction
对 kafka 有一定了解的同学可能知道,无论 message 是否已经被 consumer 消费,kafka 都会长时间保留 message 信息,这种设计是为了方便 consumer 回退到某个 offset,并重新开始消费。
但是,kafka 毕竟不是数据库,不应该一直保存历史 message,尤其是那些已经确定不会再使用的历史 message。我们可以通过修改 kafka 的 retention policy 配置(保留策略)来实现周期性清理历史 message 的效果。
kafka 默认提供了有两种 retention policy:
- 根据 message 保留的时间进行清理的策略,其具体含义是:当一条 message 在 kafka 集群中保存的时间超过了指定阈值,就可以被后台线程清理掉
- 根据 topic 占用磁盘大小进行清理的策略,其具体含义是:当 topic 的 log 小大于一个阈值之后,则可以开始由后台线程删除最旧的 message。
kafka 的 retention policy 可以针对全部 topic 进行配置,还可以针对某个 topic 进行特殊的配置。
除了 retention policy 之外,kafka 还提供了 log compaction(日志压缩)来减少磁盘占用量。我们知道 message 由 key 和 value 两部分构成, 如果一个 key 值对应的 value 值不断被更新,且 consumer 只关心最新的 value 值,那么我们就可以开启 log compaction 功能来压缩日志,核心原理是:
kafka 会启动一个后台压缩线程,定期将 key 相同的 message 进行合并,只保留最新的 value 值。
下图展示了一次 log compaction 的工作过程:
7、log_compaction.png
controller
正如前面介绍,kafka 集群是由多个 broker 构成的,kafka 集群会选举出一个 broker 来担任 controller。
controller 主要是负责管理partition 的状态、管理 partition 下 replica 的状态以及监听 Zookeeper 数据的变更等。
controller 是一主多从的实现方式,所有 broker 都会监听 controller leader 的状态,当 controller leader 出现故障时,会重新选举新的 broker 成为 controller。
consumer
consumer 的主要工作是从 topic 中拉取的 message,并消费 message 完成自身的业务逻辑。
consumer 中维护了一个 offset 信息,用来记录当前 consumer 消费到 partition 的哪个位置(offset)。
由 consumer 来维护 offset 是为了减少 kafka broker 维护 consumer 状态的压力,尤其是在 broker 出现故障或延时的时候,就会导致消费状态丢失或是影响 consumer 的消费。另外,这样设计也让 consumer 按照自己的需求指定 offset 进行消费,例如跳过某一部分 message 或是重复消费某些 message。
consumer group
kafka 中的多个 consumer 可以组成一个 consumer group,consumer group 才是消费 kafka message 的基本单位。
一个 consumer 只能属于一个 consumer group。在一个 consumer group 内部,会保证其消费的 topic 的每个 partition,只会被分配给该 consumer group 的一个 consumer 进行消费。当然,不同 consumer group 之间不会相互影响。
根据 consumer group 的这个特性,我们可以将每个 consumer 独立组成一个 consumer group,这样就可以实现广播的效果(即一个 message 被多个 consumer 同时消费)。
如果要实现独占消费的效果,可以将目标 topic 的全部 consumer 放入一个 consumer group 中,这样的话,就可以保证每个 partition 只有一个 consumer 消费。
下图展示了一个 consumer group 中 consumer 与 partition 之间的对应关系,其中的 consumer0 和 consumer1 分别负责消费 partition0 和 partition1 两个 partition,consumer2 负责消费 partition2 和 partition3:
8、consumer_group.png
consumer group 除了提供了独占和广播两种消费模式之外,consumer group 还实现了水平扩展和故障转移。在上图 consumer2 的处理能力不足以消费两个 partition 的时候,我们可以通过向 consumer group 中添加新 consumer 的方式,重新分配 partition 与 consumer 的映射关系,如下图所示:
8、consumer_group2.png
在添加 consumer3 之后,consumer2 只消费 partition2 中的 message,partition3 则重新分配给了 consumer3 来消费。
接下来再看 consumer group 故障转移的场景,在 consumer3 发生故障的时候,consumer group 也会重新分配 consumer 与 partition 的映射关系,如下图所示,consumer2 重新接手 partition3:
8、consumer_group3.png
根据上面描述的 consumer 与 partition 一对多(或一)的原则,consumer group 中 consumer 的数量并不是越多越好,当一个 consumer group 中的 consumer 数量超过 topic 中 partition 的数量时,这会导致有 consumer 分配不到对应的 partition,从而造成这些 consumer 空闲。
总结
本课时我们重点介绍了 kafka 的基本概念,主要涉及到下面几部分:
- message 的构成
- kafka 中 topic、partition、broker 三大核心概念和它们之间的关系
- log 这个逻辑概念以及对应的物理实现,其中包括 segment 文件、index 稀疏索引文件的介绍
- replica 与 partition 的关系,以及 replica 主从设计的相关讨论
- ISR 概念的介绍
- HighWatermark、Log End Offset
- 复制方案设计
- Retention Policy & Log compaction
- controller
- consumer、consumer group
感谢同学们的观看,课程的相关文章和视频还会放到
- 微信:杨四正
- 抖音:杨四正
- B 站:kafka基本概念
网友评论