什么是分区分配策略
通过前面的案例演示,我们应该能猜到,同一个group中的消费者对于一个topic中的多个partition,存在一定的分区分配策略,每个消费者都可以设置自己的分区分配策略,对于消费组而言,会从各个消费者上报过来的分区分配策略中选举一个彼此都赞同的策略来实现整体的分区分配,这个"赞同"的规则请继续往下看。
在kafka中,存在三种分区分配策略,一种是Range(默认)、 另一种是RoundRobin(轮询)、StickyAssignor(粘性)。 在消费端中的ConsumerConfig中,通过这个属性来指定分区分配策略
public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.strategy";
RangeAssignor(范围分区)
Range策略是对每个主题而言的,首先对同一个主题里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。
假设n = 分区数/消费者数量
m= 分区数%消费者数量
那么前m个消费者每个分配n+1个分区,后面的(消费者数量-m)个消费者每个分配n个分区
假设我们有10个分区,3个消费者,排完序的分区将会是0, 1, 2, 3, 4, 5, 6, 7, 8, 9;消费者线程排完序将会是C1-0, C2-0, C3-0。然后将partitions的个数除于消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。在我们的例子里面,我们有10个分区,3个消费者线程, 10 / 3 = 3,而且除不尽,那么消费者线程 C1-0 将会多消费一个分区.
结果看起来是这样的:
C1-0 将消费 0, 1, 2, 3 分区
C2-0 将消费 4, 5, 6 分区
C3-0 将消费 7, 8, 9 分区
假如我们有11个分区,那么最后分区分配的结果看起来是这样的:
C1-0 将消费 0, 1, 2, 3 分区
C2-0 将消费 4, 5, 6, 7 分区
C3-0 将消费 8, 9, 10 分区
假如我们有2个主题(T1和T2),分别有10个分区,那么最后分区分配的结果看起来是这样的:
C1-0 将消费 T1主题的 0, 1, 2, 3 分区以及 T2主题的 0, 1, 2, 3分区
C2-0 将消费 T1主题的 4, 5, 6 分区以及 T2主题的 4, 5, 6分区
C3-0 将消费 T1主题的 7, 8, 9 分区以及 T2主题的 7, 8, 9分区
可以看出,C1-0 消费者线程比其他消费者线程多消费了2个分区,这种分配方式明显的一个问题是随着消费者订阅的Topic的数量的增加,不均衡的问题会越来越严重,也就代表着C1-0这个消费者的消费能力会低于C2-0和C3-0消费者,导致的问题直接点说就是消费者的消费能力不平衡
,所以最好的情况就是partiton数目是consumer数目的整数倍,可以有效避免这个弊端。
RoundRobinAssignor(轮询分区)
轮询分区策略是把所有partition和所有consumer线程都列出来,然后按照hashcode进行排序,注意上一种range分区是针对每一个topic而言的,而轮训分区是相对于所有的partition和consumer而言的,最后通过轮询算法分配partition给消费线程。如果消费组内,所有消费者订阅的Topic列表是相同的(每个消费者都订阅了相同的Topic),那么分配结果是尽量均衡的(消费者之间分配到的分区数的差值不会超过1)。如果订阅的Topic列表是不同的,那么分配结果是不保证“尽量均衡”的,因为某些消费者不参与一些Topic的分配。
在我们的例子里面,假如按照 hashCode 排序完的topic-partitions组依次为T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,我们的消费者线程排序为C1-0, C1-1, C2-0, C2-1(c1和c2 consumer group都订阅了t1),最后分区分配的结果为:
C1-0 将消费 T1-5, T1-2, T1-6 分区;
C1-1 将消费 T1-3, T1-1, T1-9 分区;
C2-0 将消费 T1-0, T1-4 分区;
C2-1 将消费 T1-8, T1-7 分区;
相对于RangeAssignor,在订阅多个Topic的情况下,RoundRobinAssignor的方式能消费者之间尽量均衡的分配到分区(分配到的分区数的差值不会超过1——RangeAssignor的分配策略可能随着订阅的Topic越来越多,差值越来越大)
对于订阅组内消费者订阅Topic不一致的情况:假设有三个消费者分别为C1-0、C2-0、C3-0,有3个Topic T1、T2、T3,分别拥有1、2、3个分区,并且C1-0订阅T1,C2-0订阅T1和T2,C3-0订阅T1、T2、T3,那么RoundRobinAssignor的分配结果如下:
![](https://img.haomeiwen.com/i15579250/b2e8ae83a9c44df2.png)
看上去分配已经尽量的保证均衡了,不过可以发现C3-0承担了4个分区的消费而C2-0和C1-0都是承担一个分区,如果T2-1分配给c2-0,均衡性是不是更好呢?带个这个问题,继续下面的这次策略。
StrickyAssignor 分配策略
背景
尽管RoundRobinAssignor已经在RangeAssignor上做了一些优化来更均衡的分配分区,但是在一些情况下依旧会产生严重的分配偏差,比如消费组中订阅的Topic列表不相同的情况下。更核心的问题是无论是RangeAssignor,还是RoundRobinAssignor,当前的分区分配算法都没有考虑上一次的分配结果
。显然,在执行一次新的分配之前,如果能考虑到上一次分配的结果,尽量少的调整分区分配的变动,显然是能节省很多开销的。
kafka在0.11.x版本支持了StrickyAssignor, 翻译过来叫粘性策略
,可以理解为分配结果是带“粘性的”——每一次分配变更相对上一次分配做最少的变动(上一次的结果是有粘性的),它主要有两个目的:
- 分区的分配尽可能的均匀
- 分区的分配尽可能和上次分配保持相同,也就是
rebalance
之后分区的分配尽量和之前的分区分配相同。
当两者发生冲突时, 第 一 个目标优先于第二个目标。 第一个目标是每个分配算法都尽量尝试去完成的,而第二个目标才真正体现出StickyAssignor特性的。
我们举俩个例子来体现StickyAssignor特性
第一个例子:所有consumer订阅的topic都相同的情况:
-
有3个Consumer:C0、C1、C2
-
有4个Topic:T0、T1、T2、T3,每个Topic有2个分区
-
所有Consumer都订阅了这4个分区
StickyAssignor的分配结果如下图所示(增加RoundRobinAssignor分配作为对比):
![](https://img.haomeiwen.com/i15579250/a5ede86c49e67eca.png)
上面的例子中,删除C1 consumerre然后balance,RoundRobin策略会将所有分区重新进行一遍分配,可以看到变动较大,而Sticky模式原来分配给C0、C2的分区都没有发生变动,且最终C0、C1达到的均衡的目的,这就体现了StickyAssignor策略的优越性
。
再举一个例子:所有consumer订阅的topic不相同的情况:
-
有3个Consumer:C0、C1、C2
-
3个Topic:T0、T1、T2,它们分别有1、2、3个分区
-
C0订阅T0;C1订阅T0、T1;C2订阅T0、T1、T2
分配结果如下图所示:
![](https://img.haomeiwen.com/i15579250/7e48aa41749d3bdf.png)
首先在所有consumer订阅的topic不相同的情况下,可以看出StickyAssignor策略相比于RoundRobin策略均衡性更好,体现了StickyAssignor策略的第一个特点:分区的分配尽可能的均匀
,看到这里也解决了我们上节留下的疑问。
其次是,在删除C0消费者进行rebalance之后,可以看出使用RoundRobin策略的分区会重新进行一遍RoundRobin,而使用StickyAssignor策略的分区分配尽可能的和上次保持了最小变动。
以上俩个例子,完美体现了StickyAssignor策略的优越性。
rebalance触发的场景
在上面的例子中可以看到rebalance触发的场景大致有如下三种情况:
(1)Consumer增加或删除会触发 Consumer Group的Rebalance
(2)Partition的增加或者减少都会触发 Consumer Rebalance
(3)consumer在超过max.poll.interval.ms时间后没有再次poll的操作,kafka会认为该consumer宕机,也就会将该consumer踢出group,触发rebalance
谁来执行Rebalance以及管理consumer的group呢?
Kafka提供了一个角色:coordinator来执行对于consumer group的管理,当consumer group的第一个consumer启动的时候,它会去和kafka server确定谁是它们组的coordinator。之后该group内的所有成员都会和该coordinator进行协调通信
如何确定coordinator
consumer group如何确定自己的coordinator是谁呢, 消费者向kafka集群中的任意一个broker发送一个GroupCoordinatorRequest请求,服务端会返回一个负载最小的broker节点的id,并将该broker设置为coordinator
JoinGroup的过程
在rebalance之前,需要保证coordinator是已经确定好了的,整个rebalance的过程分为两个步骤,Join
和Sync
join: 表示加入到consumer group中,在这一步中,所有的成员都会向coordinator发送joinGroup的请求。一旦所有成员都发送了joinGroup请求,那么coordinator会选择一个consumer担任leader角色,并把组成员信息和订阅信息发送消费者
leader选举算法比较简单,如果消费组内没有leader,那么第一个加入消费组的消费者就是消费者leader,如果这个时候leader消费者退出了消费组,那么重新选举一个leader,这个选举很随意,类似于随机算法
![](https://img.haomeiwen.com/i16701032/aea9566db9581832.png)
protocol_metadata: 序列化后的消费者的订阅信息
leader_id: 消费组中的消费者,coordinator会选择一个座位leader,对应的就是member_id
member_metadata 对应消费者的订阅信息
members:consumer group中全部的消费者的订阅信息
generation_id: 年代信息,类似于之前讲解zookeeper的时候的epoch是一样的,对于每一轮rebalance,generation_id都会递增。主要用来保护consumer group。隔离无效的offset提交。也就是上一轮的consumer成员无法提交offset到新的consumer group中。
每个消费者都可以设置自己的分区分配策略,对于消费组而言,会从各个消费者上报过来的分区分配策略中选举一个彼此都赞同的策略来实现整体的分区分配,这个"赞同"的规则是,消费组内的各个消费者会通过投票来决定
- 在joingroup阶段,每个consumer都会把自己支持的分区分配策略发送到coordinator
- coordinator手机到所有消费者的分配策略,组成一个候选集
- 每个消费者需要从候选集里找出一个自己支持的策略,并且为这个策略投票
- 最终计算候选集中各个策略的选票数,票数最多的就是当前消费组的分配策略
Synchronizing Group State阶段
完成分区分配之后,就进入了Synchronizing Group State阶段,主要逻辑是向GroupCoordinator发送SyncGroupRequest请求,并且处理SyncGroupResponse响应,简单来说,就是leader将消费者对应的partition分配方案同步给consumer group 中的所有consumer
![](https://img.haomeiwen.com/i16701032/883283dd927c8af7.png)
每个消费者都会向coordinator发送syncgroup请求,不过只有leader节点会发送分配方案,其他消费者只是打打酱油而已。当leader把方案发给coordinator以后,coordinator会把结果设置到SyncGroupResponse中。这样所有成员都知道自己应该消费哪个分区。
consumer group的分区分配方案是在客户端执行的!Kafka将这个权利下放给客户端主要是因为这样做可以有更好的灵活性
总结
我们再来总结一下consumer group rebalance的过程
Ø 对于每个consumer group子集,都会在服务端对应一个GroupCoordinator进行管理,GroupCoordinator会在zookeeper上添加watcher,当消费者加入或者退出consumer group时,会修改zookeeper上保存的数据,从而触发GroupCoordinator开始Rebalance操作
Ø 当消费者准备加入某个Consumer group或者GroupCoordinator发生故障转移时,消费者并不知道GroupCoordinator的在网络中的位置,这个时候就需要确定GroupCoordinator,消费者会向集群中的任意一个Broker节点发送ConsumerMetadataRequest请求,收到请求的broker会返回一个response作为响应,其中包含管理当前ConsumerGroup的GroupCoordinator,
Ø 消费者会根据broker的返回信息,连接到groupCoordinator,并且发送HeartbeatRequest,发送心跳的目的是要要奥噶苏GroupCoordinator这个消费者是正常在线的。当消费者在指定时间内没有发送心跳请求,则GroupCoordinator会触发Rebalance操作。
Ø 发起join group请求,两种情况
- 如果GroupCoordinator返回的心跳包数据包含异常,说明GroupCoordinator因为前面说的几种情况导致了Rebalance操作,那这个时候,consumer会发起join group请求
- 新加入到consumer group的consumer确定好了GroupCoordinator以后消费者会向GroupCoordinator发起join group请求,GroupCoordinator会收集全部消费者信息之后,来确认可用的消费者,并从中选取一个消费者成为group_leader。并把相应的信息(分区分配策略、leader_id、…)封装成response返回给所有消费者,但是只有group leader会收到当前consumer group中的所有消费者信息。当消费者确定自己是group leader以后,会根据消费者的信息以及选定分区分配策略进行分区分配
- 接着进入Synchronizing Group State阶段,每个消费者会发送SyncGroupRequest请求到GroupCoordinator,但是只有Group Leader的请求会存在分区分配结果,GroupCoordinator会根据Group Leader的分区分配结果形成SyncGroupResponse返回给所有的Consumer。
- consumer根据分配结果,执行相应的操作
到这里为止,我们已经知道了消息的发送分区策略,以及消费者的分区消费策略和rebalance。对于应用层面来说,还有一个最重要的东西没有讲解,就是offset,他类似一个游标,表示当前消费的消息的位置。
如何保存消费端的消费位置
什么是offset
前面在讲解partition的时候,提到过offset, 每个topic可以划分多个分区(每个Topic至少有一个分区),同一topic下的不同分区包含的消息是不同的。
每个消息在被添加到分区时,都会被分配一个offset
(称之为偏移量),它是消息在此分区中的唯一编号
,kafka通过offset保证消息在分区内的顺序,offset的顺序不跨分区,即kafka只保证在同一个分区内的消息是有序的;
对于应用层的消费来说,每次消费一个消息并且提交以后,会保存当前消费到的最近的一个offset。
![](https://img.haomeiwen.com/i16701032/e540053d753c3ab9.png)
分区的副本机制
我们已经知道Kafka的每个topic都可以分为多个Partition,并且同一topic的多个partition会均匀分布在集群的各个节点下
。虽然这种方式能够有效的对数据进行分片,但是对于每个partition来说,都是单点的,当其中一个partition不可用的时候,那么这部分消息就没办法消费。所以kafka为了提高partition的可靠性而提供了副本的概念(Replica),通过副本机制来实现冗余备份。
每个分区可以有多个副本,并且在副本集合中会存在一个leader的副本,所有的读写请求都是由leader副本来进行处理。剩余的其他副本都做为follower副本,follower副本会从leader副本同步消息日志,和redis cluster中的节点概念相同,leader副本为redis cluster中的主节点,follower副本为redis cluster中的备节点
。
一般情况下,同一个分区的多个副本会被均匀分配到集群中的不同broker上,当leader副本所在的broker出现故障后,可以重新选举新的leader副本继续对外提供服务。通过这样的副本机制来提高kafka集群的可用性。
创建一个带副本机制的topic
通过下面的命令去创建带2个副本的topic
sh kafka-topics.sh --create --zookeeper 192.168.11.156:2181 --replication-factor 2 --partitions 3 --topic secondTopic
然后我们可以在/tmp/kafka-log路径下看到对应topic的副本信息了。我们通过一个图形的方式来表达。
针对secondTopic这个topic的3个分区对应的2个副本
![](https://img.haomeiwen.com/i16701032/0db797b9775fb503.png)
通常follower副本和leader副本不会在同一个broker上,这种是为了保证当leader副本所在broker宕机后,follower副本可继续提供服务。
如何知道哪个各个分区中对应的leader是谁呢?
在zookeeper服务器上,通过如下命令去获取对应分区的信息, 比如下面这个是获取secondTopic第1个分区的状态信息。
get /brokers/topics/secondTopic/partitions/1/state
{"controller_epoch":12,"leader":0,"version":1,"leader_epoch":0,"isr":[0,1]}
或通过这个命令
sh kafka-topics.sh --zookeeper 192.168.13.106:2181 --describe --topic test_partition
leader表示当前分区的leader是那个broker-id。下图中。绿色线条的表示该分区中的leader节点。其他节点就为follower
![](https://img.haomeiwen.com/i16701032/6135d1745611b5d7.png)
需要注意的是,kafka集群中的一个broker中最多只能有一个副本,leader副本所在的broker节点的分区叫leader节点,follower副本所在的broker节点的分区叫follower节点
副本的leader选举机制
Kafka提供了数据复制算法保证,如果leader副本所在的broker节点宕机或者出现故障,或者分区的leader节点发生故障,这个时候怎么处理呢?
那么,kafka必须要保证从follower副本中选择一个新的leader副本。那么kafka是如何实现选举的呢?
要了解leader选举,我们需要了解几个概念
Kafka分区下有可能有很多个副本(replica)用于实现冗余,从而进一步实现高可用。副本根据角色的不同可分为3类:
- leader副本:响应clients端读写请求的副本
- follower副本:被动地备份leader副本中的数据,不能响应clients端读写请求。
- ISR副本:Zookeeper中为每一个partition动态的维护了一个ISR,这个ISR里的所有replica都跟上了leader,只有ISR里的成员才能有被选为leader的可能,ISR副本包含了leader副本和所有与leader副本
保持同步
的follower副本,注意是和保持同步
,不包含和leader副本没保持同步
的follower副本。
副本协同机制
刚刚提到了,消息的读写操作都只会由leader节点来接收和处理。follower副本只负责同步数据以及当leader副本所在的broker挂了以后,会从ISR副本中的follower副本中选取新的leader。
写请求首先由Leader副本处理,之后follower副本会从leader上拉取写入的消息,这个过程会有一定的延迟,导致follower副本中保存的消息略少于leader副本,但是只要没有超出阈值都可以容忍。但是如果一个follower副本出现异常,比如宕机、网络断开等原因长时间没有同步到消息,那这个时候,leader就会把它踢出去。kafka通过ISR集合来维护一个分区副本信息
![](https://img.haomeiwen.com/i16701032/96f6c64f746a3a47.png)
一个新leader被选举并被接受客户端的消息成功写入。Kafka确保从同步副本列表中选举一个副本为leader;leader负责维护和跟踪ISR(in-Sync replicas , 副本同步队列)中所有follower滞后的状态。当producer发送一条消息到broker后,leader写入消息并复制到所有follower。消息提交之后才被成功复制到所有的同步副本。
ISR
ISR表示目前可用且消息量与leader相差不多的副本集合,这是整个副本集合的一个子集
。怎么去理解可用和相差不多这两个词呢?具体来说,ISR集合中的副本必须满足两个条件:
- 副本所在节点必须维持着与zookeeper的连接
- 副本最后一条消息的offset与leader副本的最后一条消息的offset之间的差值不能超过指定的阈值。(replica.lag.time.max.ms) replica.lag.time.max.ms:如果该follower在此时间间隔内一直没有追上过leader的所有消息,则该follower就会被剔除isr列表
- ISR数据保存在Zookeeper的 /brokers/topics/<topic>/partitions/<partitionId>/state 节点中
follower副本把leader副本前的日志全部同步完成时,则认为follower副本已经追赶上了leader副本,这个时候会更新这个副本的lastCaughtUpTimeMs标识,kafka副本管理器会启动一个副本过期检查的定时任务,这个任务会定期检查当前时间与副本的lastCaughtUpTimeMs的差值是否大于参数replica.lag.time.max.ms 的值,如果大于,则会把这个副本踢出ISR集合
![](https://img.haomeiwen.com/i16701032/6909e13cc44cb87b.png)
如何处理所有的Replica不工作的情况,也可以理解为leader的选举
在ISR中至少有一个follower时,Kafka可以确保已经commit的数据不丢失,但如果某个Partition的所有Replica都宕机了,就无法保证数据不丢失了。这种情况下有两种可行的方案:
- 等待ISR中的任一个Replica“活”过来,并且选它作为Leader
- 选择第一个“活”过来的Replica(不一定是ISR中的)作为Leader,默认配置。
这就需要在可用性和一致性当中作出一个简单的折中。
如果一定要等待ISR中的Replica“活”过来,那不可用的时间就可能会相对较长。而且如果ISR中的所有Replica都无法“活”过来了,或者数据都丢失了,这个Partition将永远不可用。
选择第一个“活”过来的Replica作为Leader,而这个Replica不是ISR中的Replica,那即使它并不保证已经包含了所有已commit的消息,它也会成为Leader而作为consumer的数据源(所有读写都由Leader完成)。
默认情况下Kafka采用第二种策略,即unclean.leader.election.enable=true
,也可以将此参数设置为false
来启用第一种策略。
副本数据同步原理
了解了副本的协同过程以后,还有一个最重要的机制,就是数据的同步过程。
下图中,深红色部分表示test_replica分区的leader副本,另外两个节点上浅色部分表示follower副本
![](https://img.haomeiwen.com/i16701032/f4ea4b8d3e49bda7.png)
Producer在发布消息到某个Partition时,
- 先通过ZooKeeper找到该Partition的Leader get /brokers/topics/<topic>/partitions/2/state ,然后无论该Topic的Replication Factor为多少(也即该Partition有多少个Replica),Producer只将该消息发送到该Partition的Leader。
- Leader会将该消息写入其本地Log。每个Follower都从Leader pull数据。这种方式上,Follower存储的数据顺序与Leader保持一致。
- Follower在收到该消息并写入其Log后,向Leader发送ACK。
- 一旦Leader收到了ISR中的所有Replica的ACK,该消息就被认为已经commit了,Leader将增加HW(HighWatermark)并且向Producer发送ACK。
LEO:即日志末端位移(log end offset),记录了该副本底层日志(log)中下一条消息的位移值。注意是下一条消息!也就是说,如果LEO=10,那么表示该副本保存了10条消息,位移值范围是[0, 9]。另外,leader LEO和follower LEO的更新是有区别的,可以看出leader副本和follower副本都有LEO。
HW:即所有follower副本中相对于leader副本最小的LEO值。HW是相对leader副本而言的,其HW值不会大于LEO值。小于等于HW值的所有消息都被认为是“已备份”的(replicated)。同理,leader副本和follower副本的HW更新是有区别的
通过下面这幅图来表达LEO、HW的含义,随着follower副本不断和leader副本进行数据同步,follower副本的LEO主键会后移并且追赶到leader副本,这个追赶上的判断标准是当前副本的LEO是否大于或者等于leader副本的HW,如果follower在replica.lag.time.max.ms
时间范围内追赶上了leader副本,该follower副本则加入到ISR副本内,也可以使得之前被踢出的follower副本重新加入到ISR集合中;如果在replica.lag.time.max.ms
时间范围内follower副本没追赶上leader副本,该follower副本会被从ISR副本范围内踢出,可以看出ISR副本是一个由zookerper动态监控的变化的副本
。
另外, 假如说下图中的最右侧的follower副本被踢出ISR集合,也会导致这个分区的HW发生变化,变成了3
![](https://img.haomeiwen.com/i16701032/acdfc7af97dbe781.png)
数据实现可靠性
producer数据不丢失
当producer向leader发送数据时,可以通过request.required.acks
参数来设置数据可靠性的级别:
-
request.required.acks
=-1:producer写入的一 条消息需要等到分区的leader 副本完成同步,且需要等待ISR集合中的所有follower副本都同步完
之后才能返回producer确认的ack,该参数设置为-1
时,producer的写入方式通常使用异步写入。 -
设置
retries
参数:producer提供该参数表示写入消息时发生如果发生异常的情况,将会进行重试写入,retries
参数值为重试的次数。
broker数据不丢失
kafka提供了分区的副本机制,保证了消息的不丢失。
从broker的角度来看,就是对partion设置合理的副本数。举个极端的例子,比如每个partition只有一个副本,那么当该副本所在的broker宕机之后也就以为该partion内的所有消息将会丢失,所以可以设置以下参数:
-
min.insync.replicas
:该参数表示partition允许设置的最小副本数,通过为计数,最小设置为3即可。
consumer数据不丢失
enable.auto.commit
:该参数表示提交的方式,通过设置为false
,表示手动提交位移。手动提交位移的优点为更细粒度化,可以对消息进行重试下发或者消息的去重等操作。该参数的具体含义可以看上篇文章。
实现消息顺序消费
首先要知道kafka只能保证同一个partion内消息消费顺序,而不能保证topic内的所有消息消费
。
每个partition会维护一个从0开始递增的offset,所以offset是保证partition可以顺序消费的前提。
那么kafka是怎么保证在同一个分区内的消息是有序的呢`?因为kafka有个机制是:一个partition只能由consumer group里的同一个consumer消费,而不能被多个consumer同时消费,正是这个机制,保证了partition中消息的顺序性。
那么换句话说,
一个分区(partition)只能由消费组(consumer group)中的一个消费者来消费,而不能被多个消费者同时消费。这设计的原因主是什么呢?
主要有俩个原因:
顺序保证:Kafka 保证了每个分区内消息的顺序。每个分区的消息都有一个唯一的偏移量(offset),用于表示消息在分区中的顺序位置。当多个消费者同时读取同一个分区时,无法保证它们按照正确的顺序消费消息。这会导致消息顺序错乱,违背了 Kafka 提供的有序性保证。
负载均衡:Kafka 使用消费组来实现负载均衡和伸缩性。一个消费组可以包含多个消费者,每个消费者负责消费一个或多个分区。当分区被分配给消费者时,Kafka 会确保每个分区只由一个消费者消费,以实现负载均衡。这种设计可以根据消费者数量和消费能力自动分配分区,使得每个消费者能够独立地消费一部分数据,提高整体的吞吐量。
为什么一个partion被同一个消费组里的多个consumer同时消费就不能保证该partition的顺序消费了呢?举个例子,如下:
比如消费者A和消费者B同时消费某个partition,且消费者 A 处理速度更快。例如,消费者 A 先获取到分区中的第 0 条消息,而消费者 B 获取分区中的第 1 条消息,由于消费者A消费速度更快且消费完了第0条,接着获取到了第2条消息进行消费,而此时由于消费者B消费较慢,刚刚消费完第1条消息,此时消息的顺序变为0->2->1,这就出现了消息乱序的情况。
此外,在某些情况下,消费者在处理消息时可能会失败,例如发生网络错误、消息处理异常等。如果消费者在处理消息时失败,那么 Kafka 会将该消息重新发送给消费者,这就会导致消息顺序错乱。
想要实现顺序消费,消息如何生产
上篇文章已经介绍了,消息的下发策略为:当消息中存在key值时,使用hash(key)%partitionNum的方式来决定消息写入到哪个partition中;如果不存在key的话,消息则轮训写入到所有的partition中。
所以如果你的需求需要保证消息的顺序消费,则需要设置合理的key来下发消息:举个例子在一个wechat场景,用户之间聊天需要保证消息的顺序性,比如用户a发生用户b的消息顺序为1->2->3,那么用户b接受到的消息也就应该是1->2->3,而不能出现1->3->2的情况。
此时只需要将同一用户的消息写入到相同的partition内即可解决上面出现的问题
,所以消息中可以添加key,且key为用户id
是即可完美解决了。
consumer-group的具体作用是什么
同一个topic可以有多个consumer-group组时,每个consumer-group都会订阅topic的所有消息
,每个consumer-group内的所有消费者会协调消费topic内的所有分区。
这里引出一个问题,既然每个consumer-group都消费整个topic的消息,那么分group的作用是啥?
group顾名思义就是分组,可以对topic中的消息的某个字段进行分组消费。在Kafka中,Consumer Group是一个逻辑上的概念,用于将一组Consumer组织在一起,共同消费一个或多个Topic中的消息。具体的过滤过滤逻辑需要用户自己实现
如下:我们建了一个test-group的分组,该分组只对test-topic中消息字段cmd为first的消息进行消费,其他类型的消息都直接过滤。
public class ConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
//kafka的服务地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
//设置一个consumer-group为test-group
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
//将消息的KEY和VALUE反序列化
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//实例化一个消费者,并将该消费者加入到test-group组中
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//该消费组订阅localhost:9092集群的test-topic
consumer.subscribe(Collections.singleton("test-topic"));
while (true) {
//拉去消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
System.out.printf("topic = %s, partition = %s, offset = %d, key = %s, value = %s%n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
//根据具体的业务代码上线group过滤:比如该group只接该topic中消息体cmd字段为first的消息,其他消息则过滤
if(record.value().cmd == "first"){
//执行具体的业务代码。入库或者是调取api
}
});
}
}
}
如果想消费该topic种cmd为sencod的消息,则我们可以继续创建另一个消费者,可以发现,consumer-group的作用就是对topic中的消息进行分组,然后对分组后的数据自行实现具体的业务逻辑。
看了上面的代码,可能有个疑问,为什么没有消费者的地址?
因为 Kafka 消费者不需要显式地指定其 IP 地址或主机名。Kafka 消费者只需要知道 Kafka 集群的地址,即可与 Kafka 集群建立连接并消费消息。因此,Kafka 消费者可以在任何具有网络连接的机器上启动,并且可以连接到远程 Kafka 集群。
在上面的代码中,我们将 bootstrap.servers 属性设置为 "localhost:9092",这意味着 Kafka 消费者将连接到运行在本地主机上的 Kafka 集群。如果 Kafka 集群运行在远程主机上,则可以将 bootstrap.servers 属性设置为 Kafka 集群的公共 IP 地址或 DNS 名称。当 Kafka 消费者启动时,它将使用 bootstrap.servers 属性指定的地址和端口,尝试与 Kafka 集群建立连接。
consumer-group怎么控制下发qps
Kafka 消费者组并不提供直接控制下发消息速率的选项。Kafka 消费者消费的速率主要受到以下几个因素的影响:
- 分区数:Kafka 消费者从多个分区消费消息,分区数越多,消费的消息速率也就越快。
- 消费者数:如果 Kafka 消费者组中有多个消费者,那么消费者组将会同时从多个分区消费消息,从而提高消息消费的速率。
- 消费者消费消息的速度:Kafka 消费者可以通过 poll() 方法控制从 Kafka 服务端获取消息的速率,消费者可以通过调整 poll() 方法的超时时间,来控制下发消息的速率。但是这种方法只能粗略地控制下发消息的速率,无法实现精确的 QPS 控制。
如果需要对下发消息的速率进行精确控制,可以考虑以下方法:
- 控制生产者生产消息的速率:可以通过限制生产者发送消息的速率,从而限制下发消息的速率。
- 手动提交消费位移:Kafka 消费者可以手动提交消费位移,这样就可以在消费者消费速率过快时暂停消费,以便控制下发消息的速率。可以使用 Consumer.pause() 方法暂停分区的消费,等待处理完成后再使用 Consumer.resume() 方法继续消费。
- 使用 Kafka 的流处理 API:如果需要对下发消息的速率进行精确的控制,可以考虑使用 Kafka 的流处理 API。Kafka 流处理 API 提供了更细粒度的控制能力,可以根据具体需求对下发消息的速率进行定制化的控制。
使用手动位移控制qps的代码如下
public class ManualOffsetCommitConsumerExample {
private static final String TOPIC_NAME = "test-topic";
private static final String CONSUMER_GROUP_ID = "test-group";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final int MAX_POLL_RECORDS = 100;
private static final int MAX_POLL_INTERVAL_MS = 5000; // 5 seconds
private static final int DESIRED_QPS = 20;
private static final int MAX_BATCH_SIZE = DESIRED_QPS * 2; // allow for burst rate
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singleton(TOPIC_NAME));
final int batchSize = Math.min(MAX_BATCH_SIZE, MAX_POLL_RECORDS);
List<ConsumerRecord<String, String>> buffer = new ArrayList<>(batchSize);
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(MAX_POLL_INTERVAL_MS));
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
if (buffer.size() >= batchSize) {
processRecords(buffer);
buffer.clear();
}
}
// If we have remaining records in the buffer, process them as well.
if (!buffer.isEmpty()) {
processRecords(buffer);
buffer.clear();
}
consumer.commitSync();
}
} catch (WakeupException e) {
// Ignore the exception and close the consumer.
} finally {
consumer.close();
}
}
private static void processRecords(List<ConsumerRecord<String, String>> records) {
// Process the records in some way.
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Consumed message: key=%s, value=%s, partition=%d, offset=%d%n",
record.key(), record.value(), record.partition(), record.offset());
}
// Wait to achieve the desired QPS.
try {
Thread.sleep(1000 / DESIRED_QPS);
} catch (InterruptedException e) {
// Ignore the exception and proceed.
}
}
}
topic怎么设置合理的分区数呢?
- 同一topic的多个partition会均匀分布在集群的各个节点下,每个节点相同的partion数可以尽可能的使机器资源均匀。也就是partion数最好是broker节点的倍数。
- 根据上面的分区分配策略,发现partition数最好为consumer数的整数倍,这样每个consumer负责消费的partition数,消费能力也就均匀相同。
所以,partition数的设置最好为为broker数和consumer数的公倍数,比如broker个数为6个,consumer个数为10个,那么可设置30个partition数。
消息的幂等性是什么意思?
在 Kafka 中,消息幂等性指的是相同的消息被重复发送到同一个主题分区时,不会导致数据的重复写入。也就是说,即使同一条消息被多次写入,也只会保留一份数据。
幂等性是 Kafka 提供的一种可选特性,通过使用生产者端的幂等性保证消息不会重复写入到分区中,消费者端就可以避免消费到重复的消息,从而保证数据的一致性。
可通过俩种方式实现生产者的消息幂等性
-
发送唯一标识符(如UUID)作为消息的key,保证相同key的消息总是被写入到同一个分区中。在写入消息之前,可以先查询该key对应的消息是否已经被写入到分区中,如果已经存在,则不再写入,从而实现幂等性。
-
启用Kafka的生产者端幂等性特性。通过配置enable.idempotence=true启用生产者端幂等性特性,生产者端会自动在内部缓存和重试未确认的消息,并使用序列号对每个消息进行标记。这样,即使相同的消息被重复发送,只有第一次发送的消息会被写入分区中,后续发送的消息会被过滤掉。
幂等性是 Kafka 提供的一种可选特性,因为启用该特性会带来额外的开销。启用幂等性会增加生产者端的延迟和网络带宽消耗,因为生产者需要在本地缓存未确认的消息,并在发送完成确认之前,阻塞等待来自服务器的响应。同时,启用幂等性还可能增加服务器端的开销,因为服务器需要维护每个生产者的幂等性状态信息。因此,是否启用幂等性需要根据具体的业务场景和性能需求来进行决策。
问:可能有人疑问第一种方式,为什么必须写入同一分区才能判断?
答:在Kafka中,同一个分区中的消息是有序的,因此如果我们将具有相同key的消息写入同一个分区,那么它们将被存储在相邻的消息偏移量上,这样在进行幂等性检查时会更方便。如果将具有相同key的消息写入不同的分区,那么它们就可能被存储在不同的消息偏移量上,这样就需要在多个分区中进行幂等性检查,增加了实现难度。
基于上面第一种方案,在写入消息之前,可以先查询该key对应的消息是否已经被写入到分区中,可以使用Kafka提供的API中的KafkaConsumer来实现查询功能。具体代码实现如下如下:
// 创建一个KafkaConsumer实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅指定的topic
consumer.subscribe(Collections.singleton(topic));
// 构造查询的参数,key为需要查询的消息的key,partition为消息所在的分区
TopicPartition tp = new TopicPartition(topic, partition);
Map<TopicPartition, Long> query = new HashMap<>();
query.put(tp, 0L);
// 调用KafkaConsumer的seekToEnd方法获取指定分区的最后一个消息的offset
consumer.seekToEnd(Collections.singleton(tp));
long lastOffset = consumer.position(tp);
// 如果该分区的offset为0,则说明该分区没有消息,可以直接写入
if (lastOffset == 0) {
// 写入消息
} else {
// 如果该分区不为空,就需要查询该key对应的消息是否已经存在
consumer.seek(tp, 0L);
while (consumer.position(tp) < lastOffset) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
// 判断消息是否匹配
if (record.key().equals(key) && record.value().equals(value)) {
// 如果消息已经存在,直接返回
return;
}
}
}
// 如果该分区不存在该消息,可以写入
// 写入消息
}
以上代码中,首先订阅指定的topic,然后构造查询的参数,查询该分区的最后一个消息的offset,如果该分区没有消息则可以直接写入。如果该分区不为空,就需要查询该key对应的消息是否已经存在,查询过程中通过调用poll方法获取分区中的消息,然后遍历消息列表进行比对,如果匹配到了则直接返回,否则可以写入新消息。
基于上面的第二种方案,代码如下
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class KafkaProducerDemo {
private static final String TOPIC_NAME = "test-topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String CLIENT_ID = "test-producer-client";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // 开启幂等性
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 设置确认方式为all
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
String key = String.valueOf(i % 3); // 生成key,保证相同key的消息写入同一个分区
String value = "hello world " + i;
// 构造消息
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, key, value);
try {
// 发送消息
RecordMetadata metadata = producer.send(record).get();
System.out.printf("Produced message: key=%s, value=%s, partition=%d, offset=%d\n",
key, value, metadata.partition(), metadata.offset());
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
producer.close();
}
}
在上述代码中,我们通过将enable.idempotence配置设置为true来开启生产端的幂等性,同时将acks配置设置为all,以保证消息被所有副本成功写入才被确认。在构造消息时,我们将相同的key传递给ProducerRecord构造函数,保证具有相同key的消息被写入同一个分区。这样,在生产端发送消息时,就能够保证具有相同key的消息只会被写入一次。
网友评论