消息分发原理
kafka消息分发策略
消息是kafka中最基本的数据单元,在kafka中,一条消息由key、value两部分构成,在发送一条消息时,我们可以指定这个key,那么producer会根据key和partition机制来判断当前这条消息应该发送并存储到哪个partition中。我们可以根据需要进行扩展producer的partition机制。
消息默认的分发机制
默认情况下,kafka采用的是hash取模的分区算法。如果key为null,则会随机分配一个分区。这个随机是在这个参数metadata.max.age.ms
的时间范围内随机选择一个。对于这个时间段内,如果key为 null,则只会发送到唯一的分区。这个值默认情况下是10分钟更新一次。
关于Metadata,简单理解就是topic/partition和broker的映射关系,每一个topic的每一个partition,需要知道对应的broker列表是什么,leader是谁、follower是谁。这些信息都是存储 在Metadata这个类里面。
代码实验
-
当只有1个producer将消息发送至3个分区的时候
kafka-35.png
我们关注后面的partition0,1,2;可以清晰的看到,消息会被发送到不同的分区中,重点是在同一个分区中的消息是能保证顺序性的
- 当3个producer往3个分区发送消息的时候
-
第一个控制台
kafka-36.png
-
第二个控制台
kafka-37.jpg
-
第一个控制台
-
第二个控制台
kafka-38.jpg
将3个控制台结合起来看,消息会被发送到不同的分区中,关键点在于在同一个分区中的消息是能保证顺序性的.
消费端如何消费指定的分区
- 通过下面的代码,就可以消费指定该topic下的0号分区。其他分区的数据就无法接收
//消费指定分区的时候,不需要再订阅 //kafkaConsumer.subscribe(Collections.singletonList(topic));
//消费指定的分区
TopicPartition topicPartition=new TopicPartition(topic,0); kafkaConsumer.assign(Arrays.asList(topicPartition));
消息的消费原理
kafka消息消费原理演示
在实际生产过程中,每个topic都会有多个partitions,多个partitions的好处在于,一方面能够对broker上的数据进行分片有效减少了消息的容量从而提升io性能。另外一方面,为了提高消费端的消费能力,一般会通过多个consumer去消费同一个topic ,也就是消费端的负载均衡机制,还有kafka存在consumer group的概念,也就是group.id一样的 consumer,这些consumer属于一个consumer group,组内的所有消费者协调在一起来消费订阅主题的所有分区。当然每一个分区只能由同一个消费组内的consumer来消费.![](https://img.haomeiwen.com/i20350661/2bf11d17f37d5d1d.png)
对于上面这个图来说,这3个消费者会分别消费test这个topic 的3个分区,也就是每个consumer消费一 个partition。
效果:consumer1会消费partition0分区、consumer2会消费partition1分区、consumer3会消费 partition2分区
假如其他都不变,只有consumer1和consumer2;2个消费者的情况下,
效果:consumer1会消费partition0/partition1分区、consumer2会消费partition2分区
假如其他都不变,有consumer1,consumer2,consumer3,consumer4;4个消费者的情况下,
效果:consumer1会消费partition0分区、consumer2会消费partition1分区、consumer3会消费 partition2分区, consumer4则是一个候补成员
实验论证:
![](https://img.haomeiwen.com/i20350661/12c75c019b98163d.png)
![](https://img.haomeiwen.com/i20350661/399b478e4a15da4a.png)
![](https://img.haomeiwen.com/i20350661/f2a62cba5a8c68f5.png)
consumer和partition的数量建议
- 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的, 所以consumer数不要大于partition数
- 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配 consumer数和partition数,否则会导致partition里面的数据被取的不均匀。最好partiton数目是 consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目
- 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的 partition会发生变化
- 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在每一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同
kafka的分区分配策略(Partition Assignment Strategy)
分区分配策略
- Range:范围 (默认)
- RoundRobin:轮询
- Sticky:粘性
触发分区策略时机
- 同一个consumer group内新增了消费者
- 消费者离开当前所属的consumer group,比如主动停机或者宕机
- topic新增了分区(也就是分区数量发生了变化)
总结:broker,customer,partition发生了变化,就会触发rebalance
在消费端中的ConsumerConfig中,通过这个属性来指定分区分配策略
public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.strategy";
分区策略解析
RangeAssignor(范围分区)
Range策略是对每个主题而言的,首先对同一个主题里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。
规则:
假设 n = 分区数/消费者数量
m = 分区数%消费者数量
那么前m个消费者每个分配n+l个分区,后面的(消费者数量-m)个消费者每个分配n个分区
假设我们有10个分区,3个消费者,排完序的分区将会是0, 1, 2, 3, 4, 5, 6, 7, 8, 9;消费者线程排完序将会是C1-0, C2-0, C3-0。然后将partitions的个数除于消费者线程的总数来决定每个消费者线程消费几个 分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。在我们的例子里面,我们有10个分 区,3个消费者线程, 10 / 3 = 3,而且除不尽,那么消费者线程 C1-0 将会多消费一个分区.
结果看起来是这样的:
C1-0 将消费 0, 1, 2, 3 分区
C2-0 将消费 4, 5, 6 分区
C3-0 将消费 7, 8, 9 分区
假如我们有11个分区,那么最后分区分配的结果看起来是这样的:
C1-0 将消费 0, 1, 2, 3 分区
C2-0 将消费 4, 5, 6, 7 分区
C3-0 将消费 8, 9, 10 分区
假如我们有2个主题(T1和T2),分别有10个分区,那么最后分区分配的结果看起来是这样的:
C1-0 将消费 T1主题的 0, 1, 2, 3 分区以及 T2主题的 0, 1, 2, 3分区
C2-0 将消费 T1主题的 4, 5, 6 分区以及 T2主题的 4, 5, 6分区
C3-0 将消费 T1主题的 7, 8, 9 分区以及 T2主题的 7, 8, 9分区
可以看出,C1-0 消费者线程比其他消费者线程多消费了2个分区,这就是Range strategy的一个很明显的弊端
RoundRobinAssignor(轮询分区)
轮询分区策略是把所有partition和所有consumer线程都列出来,然后按照hashcode进行排序。最后通过轮询算法分配partition给消费线程。如果所有consumer实例的订阅是相同的,那么partition会均匀分布。
在我们的例子里面,假如按照 hashCode 排序完的topic-partitions组依次为T1-5, T1-3, T1-0, T1-4, T1-6, T1-2, T1-1我们的消费者线程排序为C1-0, C2-0, C1-1, C2-1,
![](https://img.haomeiwen.com/i20350661/42264f5eb61a2c3b.png)
最后分区分配的结果为:
C1-0 将消费 T1-5, T1-6 分区;
C2-0 将消费 T1-3, T1-2 分区;
C1-1 将消费 T1-0, T1-1 分区;
C2-1 将消费 T1-4, T1-7 分区;
使用轮询分区策略必须满足两个条件
- 每个主题的消费者实例具有相同数量的分区
- 每个消费者订阅的主题必须是相同的
那么根据以上的2个条件,我们可以得出:分区为9个的时候,就不建议使用此策略了,10个就可以
StrickyAssignor 分配策略
kafka在0.11.x版本支持了StrickyAssignor, 翻译过来叫粘滞策略,它主要有两个目的
- 分区的分配尽可能的均匀
- 分区的分配尽可能和上次分配保持相同
原则:当两者发生冲突时,第一个目标优于第二个目标
![](https://img.haomeiwen.com/i20350661/3ece2411b4060e63.png)
消费组有3个消费者:C0,C1,C2,它们分别订阅了4个Topic(t0,t1,t2,t3),并且每个主题有两个分 区(p0,p1),也就是说
整个消费组订阅了8个分区:tOpO 、 tOp1 、 t1pO 、 t1p1 、 t2p0 、 t2p1 、t3p0 、 t3p1那么最终的分配场景:
C0: tOpO、t1p1 、 t3p0
C1: tOp1、t2p0 、 t3p1
C2: t1pO、t2p1
这种分配方式有点类似于轮询策略,但实际上并不是,因为假设这个时候,C1这个消费者挂了,就势必会造成
重新分区(reblance),如果是轮询,那么结果应该是
C0: tOpO、t1pO、t2p0、t3p0
C2: tOp1、t1p1、t2p1、t3p1
然后,strickyAssignor它是一种粘滞策略,所以它会满足`分区的分配尽可能和上次分配保持相同`,所以分配结果应该是:
消费者CO: tOpO、t1p1 、 t3p0、t2p0
消费者C2: t1pO、t2p1、tOp1、t3p1
也就是说,C0和C2保留了上一次是的分配结果,并且把原来C1的分区分配给了C0和C2。 这种策略的好处是
使得分区发生变化时,由于分区的粘性,减少了不必要的分区移动
触发Rebalance以及consumer-group管理
kafka提供了一个角色: coordinator来执行对于consumer group的管理,当consumer group的第一个consumer启动的时 候,它会去和kafka server确定谁是它们组的coordinator。之后该group内的所有成员都会和该coordinator进行协调通信
如何确定coordinator
消费者向kafka集群中的任意一个broker发送一个 GroupCoordinatorRequest请求,服务端会返回一个负载最小的broker节点的id,并将该broker设置为coordinator
例如:kafka的集群有3个broker,分别是broker0,broker1,broker2,那么这个时候 broker0就是coordinator
Join Group阶段
在rebalance之前,需要保证coordinator是已经确定好了的,整个rebalance的过程分为两个步骤,Join和Sync
join: 表示加入到consumer group中,在这一步中,所有的成员都会向coordinator发送joinGroup的请求。一旦所有成员都发送了joinGroup请求,那么coordinator会选择一个consumer担任leader角色, 并把组成员信息和订阅信息发送加入的消费者
leader选举算法比较简单:
- 如果消费组内没有leader,那么第一个加入消费组的消费者就是消费者 leader,
- 如果这个时候leader消费者退出了消费组,那么重新选举一个leader,这个选举很随意,类似于随机算法
kafka-14.png
protocol_metadata: 序列化后的消费者的订阅信息
leader_id: 消费组中的消费者,coordinator会选择一个作为leader,对应的就是member_id member_metadata 对应消费者的订阅信息
members:consumer group中全部的消费者的订阅信息
generation_id: 年代信息,类似于之前讲解zookeeper的时候的epoch是一样的,对于每一轮 rebalance,generation_id都会递增。主要用来保护consumer group。隔离无效的offset提交。也就 是上一轮的consumer成员无法提交offset到新的consumer group中。
Joing Group流程
- 在JoingGroup阶段,每个consumer都会把自己支持的分区分配策略发送到coordinator
- coordinator收集到所有消费者的分配策略,组成一个候选集
- 每个消费者需要从候选集里找出一个自己支持的策略,并且为这个策略投票
- 最终计算候选集中各个策略的选票数,票数最多的就是当前消费组的分配策略
Synchronizing Group State阶段
完成分区分配之后,就进入了Synchronizing Group State阶段,主要逻辑是向GroupCoordinator发送SyncGroupRequest请求,并且处理SyncGroupResponse响应,简单来说,就是leader将消费者对应的partition分配方案同步给consumer group 中的所有consumer
![](https://img.haomeiwen.com/i20350661/976730c987fb59e5.png)
总结consumer group rebalance的过程
- 对于每个consumer group子集,都会在服务端对应一个GroupCoordinator进行管理, GroupCoordinator会在zookeeper上添加watcher,当消费者加入或者退出consumer group时,会修改zookeeper上保存的数据,从而触发GroupCoordinator开始Rebalance操作
- 当消费者准备加入某个Consumer group或者GroupCoordinator发生故障转移时,消费者并不知道 GroupCoordinator的在网络中的位置,这个时候就需要确定GroupCoordinator,消费者会向集群中的任意一个Broker节点发送ConsumerMetadataRequest请求,收到请求的broker会返回一个response 作为响应,其中包含管理当前ConsumerGroup的GroupCoordinator,
- 消费者会根据broker的返回信息,连接到groupCoordinator,并且发送HeartbeatRequest,发送心跳的目的是要判断GroupCoordinator这个消费者是正常在线的。当消费者在指定时间内没有发送心跳请求,则GroupCoordinator会触发Rebalance操作。
发起join group请求,两种情况
- 如果GroupCoordinator返回的心跳包数据包含异常,说明GroupCoordinator因为前面说的几种情况导致了Rebalance操作,那这个时候,consumer会发起join group请求
- 新加入到consumer group的consumer确定好了GroupCoordinator以后,消费者会向GroupCoordinator发起join group请求,GroupCoordinator会收集全部消费者信息之后,来确认可用的消费者,并从中选取一个消费者成为group_leader。并把相应的信息(分区分配策略、leader_id、...)封装成response返回给所有消费者,但是只有group leader会收到当前 consumer group中的所有消费者信息。当消费者确定自己是group leader以后,会根据消费者的信息以及选定分区分配策略进行分区分配
- 接着进入Synchronizing Group State阶段,每个消费者会发送SyncGroupRequest请求到 GroupCoordinator,但是只有Group Leader的请求会存在分区分配结果,GroupCoordinator会根据Group Leader的分区分配结果形成SyncGroupResponse返回给所有的Consumer。
- consumer根据分配结果,执行相应的操作
网友评论