美文网首页
Kafka 在写数据的时候是如何实现负载均衡的?

Kafka 在写数据的时候是如何实现负载均衡的?

作者: 笔头还没烂 | 来源:发表于2022-08-14 17:03 被阅读0次

step1: 先看是否指定了分区,如果有,则将数据全部写入到该分区;如果没有,则进入step2;
step2: 看是否在配置中是否指定自定义分区器,如果有,则调用自定义分区器,如果没有,则调用默认的分区器 DefaultPartitioner。进入 step3;
step3: 看是否有指定key,如果有,则先获取topic所有的分区对象,对key进行mur算法再对所有分区的个数进行取模运算,得到对应的分区;如果没有指定key,则进入step4.
step4: 执行 StickyPartition 调用黏性分区器,随机选择一个分区,将这一批次的所有数据写入这个分区。

相关源码如下:

(1)调用send、 dosend方法
send —> doSend —> int partition = this.partition(record, serializedKey, serializedValue, cluster);

(2)通过第1步,跳转到 partition方法实现
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        Integer partition = record.partition();
        return partition != null ? partition : this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
    }

(3)通过第2步,找到 this.partition 的实例化代码
this.partitioner = (Partitioner)config.getConfiguredInstance("partitioner.class", Partitioner.class);

(4)最后,查看子类的实现(通过父类查看子类在 mac 版本的 idea 快捷键是 option + command + B)
public class DefaultPartitioner implements Partitioner {

    private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();

    public void configure(Map<String, ?> configs) {}

    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes serialized key to partition on (or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        if (keyBytes == null) {
            return stickyPartitionCache.partition(topic, cluster);
        } 
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

    public void close() {}
    
    /**
     * If a batch completed for the current sticky partition, change the sticky partition. 
     * Alternately, if no sticky partition has been determined, set one.
     */
    public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
        stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
    }
}

相关文章

网友评论

      本文标题:Kafka 在写数据的时候是如何实现负载均衡的?

      本文链接:https://www.haomeiwen.com/subject/npllgrtx.html