1. 生产者分区选择配策略
生产者在将消息发送到某个Topic ,需要经过拦截器、序列化器和分区器(Partitioner
)的一系列作用之后才能发送到对应的Broker,在发往Broker之前是需要确定它所发往的分区。
- 如果消息
ProducerRecord
指定了partition字段,那么就不需要分区器。 - 如果消息
ProducerRecord
没有指定partition字段,那么就需要依赖分区器,根据key这个字段来计算partition的值。分区器的作用就是为消息分配分区。
public class ProducerRecord<K, V> {
// 该消息需要发往的主题
private final String topic;
// 该消息需要发往的主题中的某个分区,如果该字段有值,则分区器不起作用,直接发往指定的分区
// 如果该值为null,则利用分区器进行分区的选择
private final Integer partition;
private final Headers headers;
// 如果partition字段为null,则使用分区器进行分区选择时会用到该key字段,该值可为空
private final K key;
private final V value;
private final Long timestamp;
Kafka 中提供的默认分区器是 DefaultPartitioner
,它实现了Partitioner接口(用户可以实现这个接口来自定义分区器),其中的partition方法就是用来实现具体的分区分配逻辑:
- 如果在发消息的时候指定了分区,则消息投递到指定的分区。
- 如果没有指定分区,但是消息的key不为空,则使用称之为
murmur
的Hash算法(非加密型Hash函数,具备高运算性能及低碰撞率)来计算分区分配。 - 如果既没有指定分区,且消息的key也是空,则用轮询的方式选择一个分区。
public class DefaultPartitioner implements Partitioner {
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 首先通过cluster从元数据中获取topic所有的分区信息
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
// 拿到该topic的分区数
int numPartitions = partitions.size();
// 如果消息记录中没有指定key
if (keyBytes == null) {
// 则获取一个自增的值
int nextValue = nextValue(topic);
// 通过cluster拿到所有可用的分区(可用的分区这里指的是该分区存在首领副本)
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
// 如果该topic存在可用的分区
if (availablePartitions.size() > 0) {
// 那么将nextValue转成正数之后对可用分区数进行取余
int part = Utils.toPositive(nextValue) % availablePartitions.size();
// 然后从可用分区中返回一个分区
return availablePartitions.get(part).partition();
} else { // 如果不存在可用的分区
// 那么就从所有不可用的分区中通过取余的方式返回一个不可用的分区
return Utils.toPositive(nextValue) % numPartitions;
}
} else { // 如果消息记录中指定了key
// 则使用该key进行hash操作,然后对所有的分区数进行取余操作,这里的hash算法采用的是murmur2算法,然后再转成正数
//toPositive方法很简单,直接将给定的参数与0X7FFFFFFF进行逻辑与操作。
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
// nextValue方法可以理解为是在消息记录中没有指定key的情况下,需要生成一个数用来代替key的hash值
// 方法就是最开始先生成一个随机数,之后在这个随机数的基础上每次请求时均进行+1的操作
private int nextValue(String topic) {
// 每个topic都对应着一个计数
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) { // 如果是第一次,该topic还没有对应的计数
// 那么先生成一个随机数
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
// 然后将该随机数与topic对应起来存入map中
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
// 之后把这个随机数返回
counter = currentCounter;
}
}
// 一旦存入了随机数之后,后续的请求均在该随机数的基础上+1之后进行返回
return counter.getAndIncrement();
}
2. 消费者分区分配策略
消费者以组的名义订阅主题,主题有多个分区,消费者组中有多个消费者实例,同一时刻,一条消息只能被组中的一个消费者实例消费。
- 如果分区数大于或者等于组中的消费者实例数,一个消费者会负责多个分区。
- 如果分区数小于组中的消费者实例数,有些消费者将处于空闲状态并且无法接收消息。
如果多个消费者负责同一个分区,那么就意味着两个消费者同时读取分区的消息,由于消费者自己可以控制读取消息的Offset,就有可能C1才读到2,而C1读到1,C1还没处理完,C2已经读到3了,这就相当于多线程读取同一个消息,会造成消息处理的重复,且不能保证消息的顺序。
2.1 Range策略
range
(默认分配策略)对应的实现类是org.apache.kafka.clients.consumer.RangeAssignor
。
- 首先,将分区按数字顺序排行序,消费者按名称的字典序排序。
- 然后,用分区总数除以消费者总数。如果能够除尽,平均分配;若除不尽,则位于排序前面的消费者将多负责一个分区。
-
假设,有1个主题、10个分区、3个消费者线程, 10 / 3 = 3,而且除不尽,那么消费者C1将会多消费一个分区,分配结果是:
- C1将消费T1主题的0、1、2、3分区。
- C2将消费T1主题的4、5、6分区。
- C3将消费T1主题的7、8、9分区
-
假设,有11个分区,分配结果是:
- C1将消费T1主题的0、1、2、3分区。
- C2将消费T1主题的4、5、 6、7分区。
- C2将消费T1主题的8、9、10分区。
-
假如,有2个主题(T0和T1),分别有3个分区,分配结果是:
- C1将消费T1主题的 0、1 分区,以及T1主题的 0、1 分区。
- C2将消费T1主题的 2、3 分区,以及T2主题的 2、3 分区。
![](https://img.haomeiwen.com/i2269232/76b62107a30c8e4c.jpg)
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
// 主题与消费者的映射
Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
Map<String, List<TopicPartition>> assignment = new HashMap<>();
for (String memberId : subscriptions.keySet())
assignment.put(memberId, new ArrayList<TopicPartition>());
for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
String topic = topicEntry.getKey(); // 主题
List<String> consumersForTopic = topicEntry.getValue(); // 消费者列表
// partitionsPerTopic表示主题和分区数的映射
// 获取主题下有多少个分区
Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
if (numPartitionsForTopic == null)
continue;
// 消费者按字典序排序
Collections.sort(consumersForTopic);
// 分区数量除以消费者数量
int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
// 取模,余数就是额外的分区
int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
// 分配分区
assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
}
}
return assignment;
}
2.2 RoundRobin策略
RoundRobin基于轮询算法,对应的实现类是 org.apache.kafka.clients.consumer.RoundRobinAssignor
- 首先,将所有主题的分区组成
TopicAndPartition
列表。 - 然后对TopicAndPartition列表按照hashCode进行排序某个 topic。
假设,有两个消费者C0和C1,两个主题T0和T1,每个主题有3个分区,分配结果是:
- C0将消费T0主题的0、2分区,以及T1主题的1分区。
- C1将消费T0主题的1分区,以及T1主题的0、2分区。
![](https://img.haomeiwen.com/i2269232/06213b2afcd0fc47.jpg)
网友评论