参考链接:
- javaguide
- https://juejin.cn/post/6844903889003610119
- https://blog.csdn.net/Dongguabai/article/details/86536894
- https://www.cnblogs.com/youngchaolin/p/12543436.html
- https://mp.weixin.qq.com/s/mC3t--xbYQuqeFdxBOp3wQ
- Kafka消息送达语义详解
- Kafka事务特性详解
为什么要使用 kafka?
缓冲和削峰:上游数据时有突发流量,下游可能扛不住,或者下游没有足够多的机器来保证冗余,kafka在中间可以起到一个缓冲的作用,把消息暂存在kafka中,下游服务就可以按照自己的节奏进行慢慢处理。
解耦和扩展性:项目开始的时候,并不能确定具体需求。消息队列可以作为一个接口层,解耦重要的业务流程。只需要遵守约定,针对数据编程即可获取扩展能力。
冗余:可以采用一对多的方式,一个生产者发布消息,可以被多个订阅topic的服务消费到,供多个毫无关联的业务使用。
健壮性:消息队列可以堆积请求,所以消费端业务即使短时间死掉,也不会影响主要业务的正常进行。
异步通信:很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
kafka和其他MQ的对比
kafka的优势:
- 吞吐量更大,性能更高
- 兼容性是最好的没有之⼀,尤其在⼤数据和流计算领域。
kafka的数据可靠性怎么保证:ack应答机制
为保证producer发送的数据,能可靠的发送到指定的topic,topic的每个partition收到producer发送的数据后,都需要向producer发送ack(acknowledgement确认收到),如果producer收到ack,就会进行下一轮的发送,否则重新发送数据。所以引出ack机制。ack应答机制Kafka为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置。
acks参数配置:
-
0:producer不等待broker的ack,这一操作提供了一个最低的延迟,broker一接收到还没有写入磁盘就已经返回,当broker故障时有可能丢失数据。
-
1(默认):producer等待broker的ack,partition的leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据。
-
-1(all):producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack。但是如果在follower同步完成后,broker发送ack之前,leader发生故障,那么会造成数据重复。
Kafka的数据是放在磁盘上还是内存上,为什么速度会快?
- 顺序写入
- parition大文件分成多个小文件段,容易定期清除或删除已经消费完文件,减少磁盘占用
- 稀疏索引
kafka使用的是磁盘存储。速度快是因为:顺序写入:因为硬盘是机械结构,每次读写都会寻址->写入,其中寻址是一个“机械动作”,它是耗时的。所以硬盘 “讨厌”随机I/O, 喜欢顺序I/O。为了提高读写硬盘的速度,Kafka就是使用顺序I/O。Memory Mapped Files(内存映射文件):64位操作系统中一般可以表示20G的数据文件,它的工作原理是直接利用操作系统的Page来实现文件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上。
Kafka高效文件存储设计:Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。通过索引信息可以快速定位 message和确定response的 大 小。通过index元数据全部映射到memory(内存映射文件), 可以避免segment file的IO磁盘操作。通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小。
注:Kafka解决查询效率的手段之一是将数据文件分段,比如有100条Message,它们的offset是从0到99。假设将数据文件分成5段,第一段为0-19,第二段为20-39,以此类推,每段放在一个单独的数据文件里面,数据文件以该段中 小的offset命名。这样在查找指定offset的Message的时候,用二分查找就可以定位到该Message在哪个段中。为数据文件建 索引数据文件分段 使得可以在一个较小的数据文件中查找对应offset的Message 了,但是这依然需要顺序扫描才能找到对应offset的Message。
为了进一步提高查找的效率,Kafka为每个分段后的数据文件建立了索引文件,文件名与数据文件的名字是一样的,只是文件扩展名为.index。
LEO 和 HW
image.pngLEO:log end offset,指的是每个副本最大的offset。
HW:high water,ISR队列中最小的LEO,指的是消费者能见到的最大的offset。
Kafka为什么不支持读写分离?
kafka 是主写主读模式。
- 主写从读的话,主从数据不一致
- kafka 主从节点数据同步更耗时
在 Kafka 中,生产者写入消息、消费者读取消息的操作都是与 leader 副本进行交互的,从 而实现的是一种主写主读的生产消费模型。Kafka 并不支持主写从读,因为主写从读有 2 个很明显的缺点:数据一致性问题:数据从主节点转到从节点必然会有一个延时的时间窗口,这个时间 窗口会导致主从节点之间的数据不一致。某一时刻,在主节点和从节点中 A 数据的值都为 X, 之后将主节点中 A 的值修改为 Y,那么在这个变更通知到从节点之前,应用读取从节点中的 A 数据的值并不为最新的 Y,由此便产生了数据不一致的问题。
延时问题:类似 Redis 这种组件,数据从写入主节点到同步至从节点中的过程需要经历 网络→主节点内存→网络→从节点内存 这几个阶段,整个过程会耗费一定的时间。而在 Kafka 中,主从同步会比 Redis 更加耗时,它需要经历 网络→主节点内存→主节点磁盘→网络→从节 点内存→从节点磁盘 这几个阶段。对延时敏感的应用而言,主写从读的功能并不太适用。
Kafka 的多分区(Partition)以及多副本(Replica)机制有什么好处呢?
- 并发(负载均衡):Kafka 通过给特定 Topic 指定多个 Partition, ⽽各个 Partition 可以分布在不同的 Broker 上,这样便能提供⽐较好的并发能⼒(负载均衡)。
- 安全(容灾能力):Partition 可以指定对应的 Replica 数, 这也极⼤地提⾼了消息存储的安全性, 提⾼了容灾能⼒,不过也相应的增加了所需要的存储空间。
核心概念
- zookeeper
- Producer 生产者
- Consumer 消费者, ConsumerGroup 消费组
- Broker 代理
kafka以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个 broker - Topic 主题
- Partition 分区 => 副本集,leader 和 follower
Zookeeper 在 Kafka 中的作⽤知道吗?
image.pngZooKeeper 主要为 Kafka 提供元数据的管理的功能。
从图中我们可以看出,Zookeeper 主要为 Kafka 做了下⾯这些事情:
- Broker 注册 :在 Zookeeper 上会有⼀个专⻔⽤来进⾏ Broker 服务器列表记录的节点。每个 Broker 在启动时,都会到 Zookeeper 上进⾏注册,即到/brokers/ids 下创建属于⾃⼰的节点。每个 Broker 就会将⾃⼰的 IP 地址和端⼝等信息记录到该节点中去
- Topic 注册 : 在 Kafka 中,同⼀个Topic 的消息会被分成多个分区并将其分布在多个Broker 上,这些分区信息及与 Broker 的对应关系也都是由 Zookeeper 在维护。⽐如我创建了⼀个名字为 my-topic 的主题并且它有两个分区,对应到 zookeeper 中会创建这些⽂件夹: /brokers/topics/my-topic/Partitions/0 、 /brokers/topics/my-topic/Partitions/1
- 负载均衡 :上⾯也说过了 Kafka 通过给特定 Topic 指定多个 Partition, ⽽各个 Partition 可以分布在不同的 Broker 上, 这样便能提供⽐好的并发能⼒。 对于同⼀个 Topic 的不同Partition,Kafka 会尽⼒将这些 Partition 分布到不同的 Broker 服务器上。当⽣产者产⽣消息后也会尽量投递到不同 Broker 的 Partition ⾥⾯。当 Consumer 消费的时候,Zookeeper 可以根据当前的 Partition 数量以及 Consumer 数量来实现动态负载均衡。
- ......
生产者 push & 消费者 pull
生产者 push
消费者 pull:
- 如果是push的话,不同消费速率的 consumer 不好处理。
- consumer 可以自主决定是否批量的从 broker 拉取数据。
- 缺点:如果 broker 没有可供消费的消息,将导致 consumer 不断在循环中轮询,直到新消息到 t 达。为了避免这点,Kafka 有个参数可以让 consumer 阻塞直到新消息到达。(当然也可以阻塞直到消息的数量达到某个特定的量这样就可以批量发。)
Kafka 判断一个节点是否还活着有那两个条件?
- 节点必须可以维护和 ZooKeeper 的连接,Zookeeper 通过心跳机制检查每个节点的连接
- 如果节点是个 follower,他必须能及时的同步 leader 的写操作,延时不能太久
topic, partition 和 offset
每个 Topic 可以划分多个分区(每个 Topic 至少有一个分区),同一 Topic 下的不同分区包含的消息是不同的。
每个消息在被添加到分区时,都会被分配一个 offset,它是消息在此分区中的唯一编号,Kafka 通过 offset 保证消息在分区内的顺序,offset 的顺序不跨分区,即 Kafka 只保证在同一个分区内的消息是有序的。
image.pngKafka 分区数可以增加或减少吗?为什么?
我们可以使用 bin/kafka-topics.sh 命令对 Kafka 增加 Kafka 的分区数据,但是 Kafka 不支持减少分区数。
Kafka 分区数据不支持减少是由很多原因的,比如减少的分区其数据放到哪里去?是删除,还是保留?删除的话,那么这些没消费的消息不就丢了。如果保留这些消息如何放到其他分区里面?追加到其他分区后面的话那么就破坏了 Kafka 单个分区的有序性。如果要保证删除分区数据插入到其他分区保证有序性,那么实现起来逻辑就会非常复杂。
参考链接:https://blog.csdn.net/weixin_39860755/article/details/112076339
producer
Kafka 中最基本的数据单元就是消息,而一条消息其实是由 Key + Value 组成的(Key 是可选项,可传空值,Value 也可以传空值),这也是与 ActiveMQ 不同的一个地方。
每个消息在被添加到分区时,都会被分配一个 offset,它是消息在此分区中的唯一编号,Kafka 通过 offset 保证消息在分区内的顺序,offset 的顺序不跨分区,即 Kafka 只保证在同一个分区内的消息是有序的。
producer 直接将数据发送到 partition 的 leader(主节点),然后 follower 才能从 leader 中拉取消息进⾏同步。
producer 消息分发:
- 可以指定 Key,不指定 partition
那么 Producer 会根据 Key 和 partition 机制来判断当前这条消息应该发送并存储到哪个 partition 中(这个就跟分片机制类似)。默认情况下,Kafka 采用的是 hash 取 % 的分区算法。
如果 Key 为 null,则会随机分配一个分区。这个随机是在这个参数“metadata.max.age.ms“的时间范围内随机选择一个。对于这个时间段内,如果 Key 为 null,则只会发送到唯一的分区。这个值默认情况下是 10 分钟更新一次(因为 partition 状态可能会发生变化)。 - 可以同时指定 Key 和 partition
- 可以根据需要进行扩展 Producer 的 partition 机制
消息在被追加到 Partition(分区)的时候都会分配⼀个特定的偏移量(offset)。Kafka 通过偏移量(offset)来保证消息在分区内的顺序性。
每次添加消息到 Partition(分区) 的时候都会采⽤尾加法,如上图所示。Kafka 只能为我们保证Partition(分区) 中的消息有序,⽽不能保证 Topic(主题) 中的 Partition(分区) 的有序。
consumer 消费消息
当消费者拉取到了分区的某个消息之后,消费者会⾃动提交 offset。
单个consumer
单个 consumer 消费消息:
- 可以指定topic
- 可以指定topic和partition
消费组 consumer group
image.png同一个topic / partition也可以由多个Consumer Group并发消费。
同一个Consumer Group可以并发地消费多个topic的消息。
同一个Consumer Group中,一个partition只能由一个consumer消费。
同一个Consumer Group中,不同consumer可以订阅不同topic。
对同一个 Group 来说,其中的 Consumer 可以消费指定分区也可以消费自动分配的分区。
- Consumer 数量和 partition 数量一致:均匀分配
- Consumer 数量大于 partition 数量:consumer浪费,应该避免这种情况
- Consumer 数量小于 partition 数量:Kafka 分区分配策略
Consumer Group 消费 partition:Kafka 分区分配策略
可以通过partition.assignment.strategy参数选择 range 或 roundrobin。
partition.assignment.strategy参数默认的值是range。
Kafka 对于分配策略这块,提供了可插拔的实现方式,也就是说,除了以下这3种之外,我们还可以创建自己的分配机制。
- Range strategy(范围分区)
- RoundRobin strategy(轮询分区)
- StickyAssignor(粘性分区)
1. Range strategy(范围分区)
Range 策略是对每个主题而言的,首先对同一个主题里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。假设我们有10个分区,3个消费者,排完序的分区将会是0,1,2,3,4,5,6,7,8,9;消费者线程排完序将会是C1-0, C2-0, C3-0。然后将 partitions 的个数除于消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。
假如在 Topic1 中有 10 个分区,3 个消费者线程,10/3 = 3,而且除不尽,那么消费者线程 C1-0 将会多消费一个分区,所以最后分区分配的结果是这样的:
C1-0 将消费 0,1,2,3 分区
C2-0 将消费 4,5,6 分区
C3-0 将消费 7,8,9 分区
假如在 Topic1 中有 11 个分区,那么最后分区分配的结果看起来是这样的:
C1-0 将消费 0,1,2,3 分区
C2-0 将消费 4, 5, 6, 7 分区
C3-0 将消费 8,9,10 分区
假如有两个 Topic:Topic1 和 Topic2,都有 10 个分区,那么最后分区分配的结果看起来是这样的:
C1-0 将消费 Topic1 的 0,1,2,3 分区和 Topic1 的 0,1,2,3 分区
C2-0 将消费 Topic1 的 4,5,6 分区和Topic2 的 4,5,6 分区
C3-0 将消费 Topic1 的 7,8,9 分区和Topic2 的 7,8,9 分区
其实这样就会有一个问题,C1-0 就会多消费两个分区,这就是一个很明显的弊端。
2. RoundRobin strategy(轮询分区)
轮询分区策略是把所有 partition 和所有 Consumer 线程都列出来,然后按照 hashcode 进行排序。最后通过轮询算法分配partition 给消费线程。如果所有 Consumer 实例的订阅是相同的,那么 partition 会均匀分布。
假如按照 hashCode 排序完的 Topic / partitions组依次为T1一5, T1一3, T1-0, T1-8, T1-2, T1-1, T1-4,T1-7,T1-6,T1-9,消费者线程排序为 C1-0,C1-1,C2-0,C2-1,最后的分区分配的结果为:
C1-0 将消费 T1-5, T1-2, T1-6分区
C1-1 将消费 T1-3, T1-1, T1-9分区
C2-0 将消费 T1-0, T1-4分区
C2-1 将消费 T1-8, T1-7分区
使用轮询分区策略必须满足两个条件
- 同一个消费者组里面的所有消费者的num.streams(消费者消费线程数)必须相等;
- 每个消费者订阅的主题必须相同。
3. StickyAssignor(粘性分区)
我们再来看一下StickyAssignor策略,“sticky”这个单词可以翻译为“粘性的”,Kafka从0.11.x版本开始引入这种分配策略,它主要有两个目的:
- 分区的分配要尽可能的均匀,分配给消费者的主题分区数最多相差一个;
- 分区的分配尽可能的与上次分配的保持相同。
当两者发生冲突时,第一个目标优先于第二个目标。
Rebalance 再均衡
Rebalance(再均衡:在同一个消费者组当中,分区的所有权从一个消费者转移到另外一个消费者)机制,Rebalance顾名思义就是重新均衡消费者消费。
当出现以下几种情况时,Kafka 会进行一次分区分配操作,也就是 Kafka Consumer 的 Rebalance:
(1) 一个 Consumer group 内 Consumer 的数量增加或减少
(3) Topic 新增了分区
消费者消费消息时,会记录消费者offset(注意不是分区的offset,不同的上下文环境一定要区分),这个消费者的offset,也是保存在一个特殊的topic,叫做__consumer_offsets,它就一个作用,那就是保存消费组里消费者的offset。默认创建时会生成50个分区(offsets.topic.num.partitions设置),一个副本,如果50个分区分布在50台服务器上,将大大缓解消费者提交offset的压力。可以在创建消费者的时候产生这个特殊消费组。
如果只启动了hadoop03一个broker,则所有的50个分区都会在这上面生成
[root@hadoop03 /home/software/kafka-2/bin]# sh kafka-console-consumer.sh --bootstrap-server hadoop03:9092 --topic football --from-beginning --new-consumer
那么问题来了,消费者的offset到底保存到哪个分区呢,kafka中是按照消费组group.id来确定的,使用Math.abs(groupId.hashCode())%50,来计算分区号,这样就可以确定一个消费组下的所有的消费者的offset,都会保存到哪个分区了.
那么问题又来了,既然一个消费组内的所有消费者都把offset提交到了__consumer_offsets下的同一个分区,如何区分不同消费者的offset呢?原来提交到这个分区下的消息,key是groupId+topic+分区号,value是消费者offset。这个key里有分区号,注意这个分区号是消费组里消费者消费topic的分区号。由于实际情况下一个topic下的一个分区,只能被一个消费组里的一个消费者消费,这就不担心offset混乱的问题了。
实际上,topic下多个分区均匀分布给一个消费组下的消费者消费,是由coordinator来完成的,它会监听消费者,如果有消费者宕机或添加新的消费者,就会rebalance,使用一定的策略让分区重新分配给消费者。如下图所示,消费组会通过offset保存的位置在哪个broker,就选举它作为这个消费组的coordinator,负责监听各个消费者心跳了解其健康状况,并且将topic对应的leader分区,尽可能平均的分给消费组里的消费者,根据消费者的变动,如新增一个消费者,会触发coordinator进行rebalance。
Rebalance的过程如下:
- 所有consumer向coordinator发送请求,请求入组。一旦所有成员都发送了请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader。
- leader开始分配消费方案,指明具体哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案发给coordinator。
- coordinator接收到分配方案之后会把方案发给各个consumer,这样组内的所有成员就都知道自己应该消费哪些分区了。所以对于Rebalance来说,Coordinator起着至关重要的作用。
Kafka Consuemr 的 Rebalance 机制规定了一个 Consumer group 下的所有 Consumer 如何达成一致来分配订阅 Topic 的每个分区。而具体如何执行分区策略,就是前面提到过的分区策略。
参考资料:Kafka消费者组三种分区分配策略roundrobin,range,StickyAssignor
如何保证 Kafka 中消息消费的顺序?
- 1 个 Topic 只对应⼀个 Partition。
- (推荐)producer发送消息和consumer消费消息的时候指定 key/Partition
Kafka 的多分区(Partition)以及多副本(Replica)机制有什么好处呢?
- Kafka 通过给特定 Topic 指定多个 Partition, ⽽各个 Partition 可以分布在不同的 Broker 上,这样便能提供比较好的并发能⼒(负载均衡)。
- Partition 可以指定对应的 Replica 数, 这也极⼤地提⾼了消息存储的安全性, 提⾼了容灾能⼒,不过也相应的增加了所需要的存储空间。
Kafka集群partitions/replicas默认分配解析
副本分配算法如下:
- 将所有n个Broker和待分配的I个Partition排序.
- 将第i个Partition分配到第(i mod n)个Broker上.
- 将第i个Partition的第j个副本分配到第((i + j) mod n)个Broker上.
例:4个Broker,1个topic包含4个Partition,2 Replication:
image.png例:6个Broker,1个topic包含6个Partition,2 Replication:
image.png参考链接:https://blog.csdn.net/lizhitao/article/details/41778193
防止消息丢失
- 生产者丢失消息
- 消费者丢失消息
- kafka丢失消息
生产者丢失消息
⽣产者(Producer) 调⽤ send ⽅法发送消息之后,消息可能因为⽹络问题并没有发送过去。
所以,我们不能默认在调⽤ send ⽅法发送消息之后消息消息发送成功了。为了确定消息是发送成功,我们要判断消息发送的结果。但是要注意的是 Kafka ⽣产者(Producer) 使⽤ send ⽅法发送消息实际上是异步的操作。
- 如果消息发送失败的话,我们检查失败的原因之后重新发送
- 推荐为 Producer 的 retries (重试次数)设置⼀个⽐较合理的值,⼀般是 3 ,但是为了保证消息不丢失的话⼀般会设置⽐较⼤⼀点。设置完成之后,当出现⽹络问题之后能够⾃动重试消息发送,避免消息丢失。另外,建议还要设置重试间隔,因为间隔太⼩的话重试的效果就不明显了,⽹络波动⼀次你3次⼀下⼦就重试完了。
同步:
SendResult<String, Object> sendResult = kafkaTemplate.send(topic, o).get();
if (sendResult.getRecordMetadata() != null) {
logger.info("⽣产者成功发送消息到" + sendResult.getProducerRecord().topic() +
"-> " + sendRe
sult.getProducerRecord().value().toString());
}
异步:
ListenableFuture<SendResult<String, Object>> future =
kafkaTemplate.send(topic, o);
future.addCallback(result -> logger.info("⽣产者成功发送消息到topic:{}
partition:{}的消息", result.getRecordMetadata().topic(),
result.getRecordMetadata().partition()),
ex -> logger.error("⽣产者发送消失败,原因:{}",
ex.getMessage()));
消费者丢失消息
当消费者拉取到了分区的某个消息之后,消费者会⾃动提交了 offset。⾃动提交的话会有⼀个问题,试想⼀下,当消费者刚拿到这个消息准备进⾏真正消费的时候,突然挂掉了,消息实际上并没有被消费,但是 offset 却被⾃动提交了。
解决办法也⽐较粗暴,我们⼿动关闭闭⾃动提交 offset,每次在真正消费完消息之后之后再⾃⼰⼿动提交 offset 。
但是,细⼼的朋友⼀定会发现,这样会带来消息被重新消费的问题。⽐如你刚刚消费完消息之后,还没提交 offset,结果⾃⼰挂掉了,那么这个消息理论上就会被消费两次。
kafka丢失消息
- request.required.acks
- min.insync.replicas
- replication.factor
- replica.lag.time.max.ms
- unclean.leader.election.enable
request.required.acks
为保证producer发送的数据,能可靠的发送到指定的topic,topic的每个partition收到producer发送的数据后,都需要向producer发送ack(acknowledgement确认收到),如果producer收到ack,就会进行下一轮的发送,否则重新发送数据。所以引出ack机制。ack应答机制Kafka为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置。
request.required.acks 有三个值 0 1 -1
0:生产者不会等待 broker 的 ack,这个延迟最低但是存储的保证最弱当 server 挂掉的时候就会丢数据
1(默认):服务端会等待 ack 值 leader 副本确认接收到消息后发送 ack 但是如果 leader 挂掉后他不确保是否复制完成新 leader 也会导致数据丢失
-1 或 all:同样在 1 的基础上 服务端会等所有的 follower 的副本受到数据后才会受到 leader 发出的 ack,这样数据不会丢失
replication.factor
设置 replication.factor >= 3
为了保证 leader 副本能有 follower 副本能同步消息,我们⼀般会为 topic 设置 replication.factor >= 3。这样就可以保证每个 分区(partition) ⾄少有 3 个副本。虽然造成了数据冗余,但是带来了数据的安全性。
min.insync.replicas
设置 min.insync.replicas > 1。
⼀般情况下我们还需要设置 min.insync.replicas> 1 ,这样配置代表消息⾄少要被写⼊到 2 个副本才算是被成功发送。min.insync.replicas 的默认值为 1 ,在实际⽣产中应尽量避免默认值1。
但是,为了保证整个 Kafka 服务的⾼可⽤性,你需要确保 replication.factor > min.insync.replicas 。为什么呢?设想⼀下加⼊两者相等的话,只要是有⼀个副本挂掉,整个分区就⽆法正常⼯作了。这明显违反⾼可⽤性!⼀般推荐设置成 replication.factor = min.insync.replicas + 1。
ISR 和 replica.lag.time.max.ms
ack同步机制中,如果采用全部完成同步,才发送ack的副本的同步策略的话:提出问题:leader收到数据,所有follower都开始同步数据,但有一个follower,因为某种故障,迟迟不能与leader进行同步,那leader就要一直等下去,直到它完成同步,才能发送ack。为了解决这个问题,就有了ISR。
ISR(in-sync replicas):同步副本列表。每个分区的 Leader 维护了一个动态的in-sync replica set (ISR),意为和leader保持同步的follower集合,ISR 列表里面就是这些 follower 副本的 Borker 编号。当ISR中的follower完成数据的同步之后,leader就会给follower发送ack。如果follower长时间未向leader同步数据,则该follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms参数设定。Leader发生故障之后,就会从ISR中选举新的leader。只有 ISR 里的成员才有被选为 leader 的可能。
replica.lag.time.max.ms :延迟时间,指定了副本在复制消息时可被允许的最大延迟时间。
unclean.leader.election.enable
所以当Leader挂掉了,而且 unclean.leader.election.enable=false 的情况下,Kafka 会从 ISR 列表中选择第一个follower作为新的Leader,因为这个分区拥有最新的已经committed的消息。通过这个可以保证已经committed的消息的数据可靠性。
设置不清洁选举 unclean.leader.election.enable = false
Kafka 0.11.0.0版本开始 unclean.leader.election.enable 参数的默认值由原来的true 改为false。
我们最开始也说了我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进⾏同步。多个 follower 副本之间的消息同步情况不⼀样,当我们配置了unclean.leader.election.enable = false 的话,当 leader 副本发⽣故障时就不会从 follower 副本中和 leader 同步程度达不到要求的副本中选择出 leader ,这样降低了消息丢失的可能性。
参考链接:https://blog.csdn.net/weixin_39860755/article/details/112076339
Kafka 新建的分区会在哪个目录下
创建在启动 Kafka 集群之前,我们需要配置好 broker配置中的log.dirs 参数,其值是 Kafka的存放目录,这个参数可以配置多个目录,目录之间使用逗号分隔,通常这些目录是分布在不同的磁盘上用于提高读写性能。
当然我们也可以配置 log.dir 参数,含义一样。只需要设置其中一个即可。如果 log.dirs 参数只配置了一个目录,那么分配到各个 Broker 上的分区肯定只能在这个目录下创建文件夹用于存放数据。
假设 Kafka 消息文件存储目录
log.dirs=/tmp/kafka-logs
假设 partition 数量为 4
/bin/kafka-topics.sh –create –zookeeper localhost:2181 –partitions 4 –topic mytopic –replication-factor 4
然后就能在 /tmp/kafka-logs 目录中看到 4 个目录。
但是如果 log.dirs 参数配置了多个目录,那么创建新的分区目录时,Kafka 会在哪个目录中创建分区目录呢?
答案是:Kafka 会在含有 partition 文件夹(分区目录)数量最少的 Broker 文件夹中创建新的分区目录,分区目录名为 Topic名+分区 ID。注意,是 partition 文件夹(分区目录)数量最少的目录,而不是磁盘使用量最少的目录!也就是说,如果你给 log.dirs 参数新增了一个新的磁盘,新的分区目录肯定是先在这个新的磁盘上创建直到这个新的磁盘目录拥有的partition 文件夹(分区目录)不是最少为止。
(即:某个broker,log.dirs配了多个目录,在该broker新增一个partition,该partition的文件夹存放在哪个目录中?)
Kafka 文件存储
image.png- topic: 可以理解为一个消息队列的名字
- partition:为了实现扩展性,一个非常大的topic可以分布到多个 broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列
- segment:partition物理上由多个segment组成
- message:每个segment文件中实际存储的一条条数据就是message
- offset:每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中,partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息
topic-partition
topic和partition创建成功后存储在log.dirs指定的目录下,默认的存储位置在:/tmp/kafka-logs下。
目录名称:topic的名称+有序序号,这个序号从0开始依次增加,如:
test-topic-0
test-topic-1
test-topic-2
segment
当生产者往partition中存储数据时,内存中存不下了,就会往segment file里面存储。
在每个partition文件夹中有可以分为多个segment file。每个segment file对应3个文件,分别是.log数据文件、.index偏移量索引文件、 .timeindex时间戳索引文件。在服务器上,每个partition是一个文件夹,每个segment是一个文件。
kafka默认每个segment file的大小是500M,在存储数据时,会先生成一个segment file,当这个segment file到500M之后,再生成第二个segment file 以此类推。
segment file也有自己的命名规则,每个名字有20个字符,不够用0填充。每个名字从0开始命名,下一个segment file文件的名字就是,上一个segment file中最后一条消息的offset。
test-topic-0
├── 00000000000000000001.index
├── 00000000000000000001.log
├── 00000000000000000001.timeindex
├── 00000000000000001018.index
├── 00000000000000001018.log
├── 00000000000000001018.timeindex
├── 00000000000000002042.index
├── 00000000000000002042.log
├── 00000000000000002042.timeindex
.index文件
.index偏移量索引文件中保存了消息的offset,和position(表示具体消息存储在log中的物理地址。)
用来建立消息偏移量(offset)到物理地址之间的映射关系,方便快速定位消息所在的物理文件位置。
偏移量是单调递增的,查询指定偏移量时,使用二分查找法来快速定位偏移量的位置。
.timeindex文件
.timeindex 时间戳索引文件,它一条数据的结构是时间戳(8byte)+相对offset(4byte),如果要使用这个索引文件,首先需要通过时间范围,找到对应的相对offset,然后再去对应的index文件找到position信息,然后才能遍历log文件,它也是需要使用上面说的index文件的。
它的作用是可以让用户查询某个时间段内的消息,根据指定的时间戳(timestamp)来查找对应的偏移量信息。
时间戳也保持严格的单调递增,查询指定时间戳时,也根据二分查找法来查找。
稀疏存储
log日志默认每写入4K(log.index.interval.bytes设定的),会写入一条索引信息到index文件中。
.index文件和.timeindex文件中并没有为数据文件中的每条消息都建立索引,而是采用了稀疏存储的方式,每隔一定字节的数据建立一条索引。这样避免了索引文件占用过多的空间,从而可以将索引文件保留在内存中。但缺点是没有建立索引的Message也不能一次定位到其在数据文件的位置,从而需要做一次顺序扫描,但是这次顺序扫描的范围就很小了。
kafka查找一条offset对应的实际的消息时,可以通过index二分查找,获取到最近的低位offset,然后从低位offset对应的position开始,从实际的log文件中开始往后查找对应的消息。如要查找offset=5的消息,先去索引文件中找到低位的3 4597这条数据,然后通过4597这个字节偏移量,从log文件中从4597个字节开始读取,直到读取到offset=5的这条数据,这比直接从log文件开始读取要节省时间。二分查找的时间复杂度为O(lgN),如果从头遍历时间复杂度是O(N)。
image.png.log文件
.log文件中并不是直接存储数据,而是通过许多的message组成。
message包含了实际的消息数据,由一个固定长度的头部和可变长度的字节数组组成:
偏移(offset)
消息长度
CRC32校验码
版本号
...
具体的消息: n bytes
参考链接:
- https://segmentfault.com/a/1190000021824942
- https://www.jianshu.com/p/b4654c39f6bc
- https://tech.meituan.com/2015/01/13/kafka-fs-design-theory.html
- https://blog.csdn.net/qq_32727095/article/details/108012552
Kafka 高效文件存储设计特点
- Kafka 把 topic 中一个 parition 大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
- 通过索引信息可以快速定位 message 和确定 response 的最大大小。
- 通过 index 元数据全部映射到 memory,可以避免 segment file 的 IO 磁盘操作。
- 通过索引文件稀疏存储,可以大幅降低 index 文件元数据占用空间大小。
Kafka 与传统消息系统之间有三个关键区别
- Kafka 持久化日志,这些日志可以被重复读取和无限期保留
- Kafka 是一个分布式系统:它以集群的方式运行,可以灵活伸缩,在内部通过复制数据提升容错能力和高可用性
- Kafka 支持实时的流式处理
幂等性
Idempotence (UK: /ˌɪdɛmˈpoʊtəns/, US: /ˌaɪdəm-/) 幂等:重复执行,获得结果相同。
kafka可能出现非幂等性的情况
image在Consumer端offset没有提交的时候,Consumer重启了,这时候就会出现重复消费的情况
解决方案:
- 唯一ID+指纹码
整体实现相对简单,需要进行数据库写入,利用数据库主键去重,使用ID进行分库分表算法路由,从单库的幂等性到多库的幂等性
- 这里唯一ID一般就是业务表的主键,比如商品ID
- 指纹码:每次操作都要生成指纹码,可以用时间戳+业务编号+...组成,目的是保证每次操作都是正常的
整体流程:
- 需要一个统一ID生成服务,为了保证可靠性,上游服务也要有个本地ID生成服务,然后发送消息给Broker
- 需要ID规则路由组件去监听消息,先入库,如果入库成功,证明没有重复,然后发给下游,如果发现库里面有了这条消息,就不发给下游
参考链接:https://www.jianshu.com/p/c48075dc4395
kafka 消息送达语义
消息送达语义是消息系统中一个常见的问题,主要包含三种语义:
- At most once:消息发送或消费至多一次
- At least once:消息发送或消费至少一次
- Exactly once:消息恰好只发送一次或只消费一次
下面我们分别从发送者和消费者的角度来阐述这三种消息送达语义。
Producer
从Producer的角度来看:
- At least once
意味着Producer发送完一条消息后,会确认消息是否发送成功。如果Producer没有收到Broker的ack确认消息,那么会不断重试发送消息。这样就意味着消息可能被发送不止一次,也就存在这消息重复的可能性。
acks=-1/all。
- At most once
意味着Producer发送完一条消息后,不会确认消息是否成功送达。这样从Producer的角度来看,消息仅仅被发送一次,也就存在者丢失的可能性。
我们可以通过配置Producer的以下配置项来实现At most once语义:
acks=0。acks配置项表示Producer期望的Broker的确认数。默认值为1。可选项:[0,1,all]。如果设置为0,表示Producer发送完消息后不会等待任何Broker的确认;设置为1表示Producer会等待Broker集群中的leader的确认写入消息;设置为all表示Producer需要等待Broker集群中leader和其所有follower的确认写入消息。
retries=0。retires配置项表示当消息发送失败时,Producer重发消息的次数。默认值为2147483647。当配置了acks=0时,retries配置项就失去了作用,因此这儿可以不用配置。
当配置了retires的值后,如果没有将max.in.flight.requests.per.connection配置的值设置为1,有可能造成消息乱序的结果。max.in.flight.requests.per.connection配置代表着一个Producer同时可以发送的未收到确认的消息数量。如果max.in.flight.requests.per.connection数量大于1,那么可能发送了message1后,在没有收到确认前就发送了message2,此时message1发送失败后触发重试,而message2直接发送成功,就造成了Broker上消息的乱序。max.in.flight.requests.per.connection的默认值为5。
- Exactly once = At Least Once + 幂等性
意味着Producer消息的发送是幂等的。这意味着不论消息重发多少遍,最终Broker上记录的只有一条不重复的数据。
在0.11版本以前的Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。
Exactly once是Kafka从版本0.11之后提供的高级特性。开启幂等性:enable.idempotence=true。所谓的幂等性就是指Producer不论向Server发送多少次重复数据,Server端都只会持久化一条。幂等性结合At Least Once语义,就构成了Kafka的Exactly Once语义。即:At Least Once + 幂等性 = Exactly Once。Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。
我们可以通过配置Producer的以下配置项来实现Exactly once语义:
acks=-1/all。
enable.idempotence=true。
Idempotence (UK: /ˌɪdɛmˈpoʊtəns/, US: /ˌaɪdəm-/)
enable.idempotence配置项表示是否使用幂等性。当enable.idempotence配置为true时,acks必须配置为all。并且建议max.in.flight.requests.per.connection的值小于5。
为了实现消息发送的幂等性,Kafka引入了两个新的概念:
- PID。每个新的Producer在初始化的时候会被分配一个唯一的PID,这个PID对用户是不可见的。
- Sequence Numbler。对于每个PID,该Producer发送数据的每个<Topic, Partition>都对应一个从0开始单调递增的Sequence Number。Broker端在缓存中保存了这Sequence Numbler,对于接收的每条消息,如果其序号比Broker缓存中序号大于1则接受它,否则将其丢弃。这样就可以实现了消息重复提交了。但是,只能保证单个Producer对于同一个<Topic, Partition>的Exactly Once语义。不能保证同一个Producer一个topic不同的partion幂等。
开启幂等性的Producer在初始化的时候会被分配一个PID,发往同一Partition的消息会附带Sequence Number。而Broker端会对< PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker只会持久化一条。但是PID重启就会变化,同时不同的Partition也具有不同主键,所以幂等性无法保证跨分区跨会话的Exactly Once。
Consumer
从Consumer的角度来看,
- At least once
Consumer对一条消息可能消费多次。考虑下面的情况:Consumer首先读取消息,然后处理这条消息,最后提交offset。在处理消息时成功后,Consumer宕机了,此时offset还未提交,下一次读取消息时依旧是这条消息,那么处理消息的逻辑又将被执行一遍,这就是At least once消费。
enable.auto.commit=false。禁止后台自动提交offset。
手动调用consumer.commitSync()来提交offset。手动调用保证了offset即时更新。
通过手动提交offset,就可以实现Consumer At least once语义。
- At most once
着Consumer对一条消息最多消费一次,因此有可能存在消息消费失败依旧提交offset的情况。考虑下面的情况:Consumer首先读取消息,然后提交offset,最后处理这条消息。在处理消息时,Consumer宕机了,此时offset已经提交,下一次读取消息时读到的是下一条消息了,这就是At most once消费。
enable.auto.commit=true。后台定时提交offset。
auto.commit.interval.ms配置为一个很小的数值。auto.commit.interval.ms表示后台提交offset的时间间隔。
通过自动提交offset,并且将定时提交时间间隔设置的很小,就可以实现Consumer At most once语义。
- Exactly once
isolation.level=read_committed
意味着消息的消费处理逻辑和offset的提交是原子性的,即消息消费成功后offset改变,消息消费失败offset也能回滚。
isolation.level表示何种类型的message对Consumer可见。
一个常见的Exactly once的的使用场景是:当我们订阅了一个topic,然后往另一个topic里写入数据时,我们希望这两个操作是原子性的,即如果写入消息失败,那么我们希望读取消息的offset可以回滚。
此时可以通过Kafka的Transaction特性来实现。Kafka是在版本0.11之后开始提供事务特性的。我们可以将Consumer读取数据和Producer写入数据放进一个同一个事务中,在事务没有成功结束前,所有的这个事务中包含的消息都被标记为uncommitted。只有事务执行成功后,所有的消息才会被标记为committed。
我们知道,offset信息是以消息的方式存储在Broker的__consumer_offsets topic中的。因此在事务开始后,Consumer读取消息后,所有的offset消息都是uncommitted状态。所有的Producer写入的消息也都是uncommitted状态。
而Consumer可以通过配置isolation.level来决定uncommitted状态的message是否对Consumer可见。isolation.level拥有两个可选值:read_committed和read_uncommitted。默认值为read_uncommitted。
当我们将isolation.level配置为read_committed后,那么所有事务未提交的数据就都对Consumer不可见了,也就实现了Kafka的事务语义。
kafka 事务
Kafka事务的使用
- 生产者发送多条消息可以封装在一个事务中,形成一个原子操作。多条消息要么都发送成功,要么都发送失败。
- read-process-write模式:将消息消费和生产封装在一个事务中,形成一个原子操作。在一个流式处理的应用中,常常一个服务需要从上游接收消息,然后经过处理后送达到下游,这就对应着消息的消费和生成。
Kafka事务配置
- 对于Producer,需要设置transactional.id属性,这个属性的作用下文会提到。设置了transactional.id属性后,enable.idempotence属性会自动设置为true。
- 对于Consumer,需要设置isolation.level = read_committed,这样Consumer只会读取已经提交了事务的消息。另外,需要设置enable.auto.commit = false来关闭自动提交Offset功能。
Kafka事务特性
Kafka的事务特性本质上代表了三个功能:原子写操作,拒绝僵尸实例(Zombie fencing)和读事务消息。
-
原子写
Kafka的事务特性本质上是支持了Kafka跨分区和Topic的原子写操作。在同一个事务中的消息要么同时写入成功,要么同时写入失败。我们知道,Kafka中的Offset信息存储在一个名为_consumed_offsets的Topic中,因此read-process-write模式,除了向目标Topic写入消息,还会向_consumed_offsets 这个Topic中写入已经消费的Offsets数据。因此read-process-write本质上就是跨分区和Topic的原子写操作。Kafka的事务特性就是要确保跨分区的多个写操作的原子性。 -
拒绝僵尸实例(Zombie fencing)
在分布式系统中,一个instance的宕机或失联,集群往往会自动启动一个新的实例来代替它的工作。此时若原实例恢复了,那么集群中就产生了两个具有相同职责的实例,此时前一个instance就被称为“僵尸实例(Zombie Instance)”。在Kafka中,两个相同的producer同时处理消息并生产出重复的消息(read-process-write模式),这样就严重违反了Exactly Once Processing的语义。这就是僵尸实例问题。
Kafka事务特性通过epoch属性来解决僵尸实例问题。所有具有相同transaction-id的Producer都会被分配相同的pid,同时每一个Producer还会被分配一个递增的epoch。Kafka收到事务提交请求时,如果检查当前事务提交者的epoch不是最新的,那么就会拒绝该Producer的请求。从而达成拒绝僵尸实例的目标。
- 读事务消息
为了保证事务特性,Consumer如果设置了isolation.level = read_committed,那么它只会读取已经提交了的消息。在Producer成功提交事务后,Kafka会将所有该事务中的消息的Transaction Marker从uncommitted标记为committed状态,从而所有的Consumer都能够消费。
Kafka事务实现
Kafka为了支持事务特性,引入一个新的组件:Transaction Coordinator。主要负责分配pid,记录事务状态等操作。下面时Kafka开启一个事务到提交一个事务的流程图:
image主要分为以下步骤:
1. 查找Tranaction Corordinator
Producer向任意一个brokers发送 FindCoordinatorRequest请求来获取Transaction Coordinator的地址。
2. 初始化事务 initTransaction
Producer发送InitpidRequest给Transaction Coordinator,获取pid。Transaction Coordinator在Transaciton Log中记录这<TransactionId,pid>的映射关系。另外,它还会做两件事:
- 恢复(Commit或Abort)之前的Producer未完成的事务
- 对PID对应的epoch进行递增,这样可以保证同一个app的不同实例对应的PID是一样,而epoch是不同的。
只要开启了幂等特性即必须执行InitpidRequest,而无须考虑该Producer是否开启了事务特性。
3. 开始事务beginTransaction
执行Producer的beginTransacion(),它的作用是Producer在本地记录下这个transaction的状态为开始状态。这个操作并没有通知Transaction Coordinator,因为Transaction Coordinator只有在Producer发送第一条消息后才认为事务已经开启。
4. read-process-write流程
一旦Producer开始发送消息,Transaction Coordinator会将该<Transaction, Topic, Partition>存于Transaction Log内,并将其状态置为BEGIN。另外,如果该<Topic, Partition>为该事务中第一个<Topic, Partition>,Transaction Coordinator还会启动对该事务的计时(每个事务都有自己的超时时间)。
在注册<Transaction, Topic, Partition>到Transaction Log后,生产者发送数据,虽然没有还没有执行commit或者abort,但是此时消息已经保存到Broker上了。即使后面执行abort,消息也不会删除,只是更改状态字段标识消息为abort状态。
5. 事务提交或终结 commitTransaction/abortTransaction
在Producer执行commitTransaction/abortTransaction时,Transaction Coordinator会执行一个两阶段提交:
- 第一阶段,将Transaction Log内的该事务状态设置为
PREPARE_COMMIT
或PREPARE_ABORT
-
第二阶段,将
Transaction Marker
写入该事务涉及到的所有消息(即将消息标记为committed
或aborted
)。这一步骤Transaction Coordinator会发送给当前事务涉及到的每个<Topic, Partition>的Leader,Broker收到该请求后,会将对应的Transaction Marker
控制信息写入日志。
一旦Transaction Marker
写入完成,Transaction Coordinator会将最终的COMPLETE_COMMIT
或COMPLETE_ABORT
状态写入Transaction Log中以标明该事务结束。
网友评论