step1: 先看是否指定了分区,如果有,则将数据全部写入到该分区;如果没有,则进入step2;
step2: 看是否在配置中是否指定自定义分区器,如果有,则调用自定义分区器,如果没有,则调用默认的分区器 DefaultPartitioner。进入 step3;
step3: 看是否有指定key,如果有,则先获取topic所有的分区对象,对key进行mur算法再对所有分区的个数进行取模运算,得到对应的分区;如果没有指定key,则进入step4.
step4: 执行 StickyPartition 调用黏性分区器,随机选择一个分区,将这一批次的所有数据写入这个分区。
相关源码如下:
(1)调用send、 dosend方法
send —> doSend —> int partition = this.partition(record, serializedKey, serializedValue, cluster);
(2)通过第1步,跳转到 partition方法实现
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ? partition : this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
(3)通过第2步,找到 this.partition 的实例化代码
this.partitioner = (Partitioner)config.getConfiguredInstance("partitioner.class", Partitioner.class);
(4)最后,查看子类的实现(通过父类查看子类在 mac 版本的 idea 快捷键是 option + command + B)
public class DefaultPartitioner implements Partitioner {
private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();
public void configure(Map<String, ?> configs) {}
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
public void close() {}
/**
* If a batch completed for the current sticky partition, change the sticky partition.
* Alternately, if no sticky partition has been determined, set one.
*/
public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
}
}
网友评论