美文网首页
Kafka 在写数据的时候是如何实现负载均衡的?

Kafka 在写数据的时候是如何实现负载均衡的?

作者: 笔头还没烂 | 来源:发表于2022-08-14 17:03 被阅读0次

    step1: 先看是否指定了分区,如果有,则将数据全部写入到该分区;如果没有,则进入step2;
    step2: 看是否在配置中是否指定自定义分区器,如果有,则调用自定义分区器,如果没有,则调用默认的分区器 DefaultPartitioner。进入 step3;
    step3: 看是否有指定key,如果有,则先获取topic所有的分区对象,对key进行mur算法再对所有分区的个数进行取模运算,得到对应的分区;如果没有指定key,则进入step4.
    step4: 执行 StickyPartition 调用黏性分区器,随机选择一个分区,将这一批次的所有数据写入这个分区。

    相关源码如下:

    (1)调用send、 dosend方法
    send —> doSend —> int partition = this.partition(record, serializedKey, serializedValue, cluster);
    
    (2)通过第1步,跳转到 partition方法实现
    private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
            Integer partition = record.partition();
            return partition != null ? partition : this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
        }
    
    (3)通过第2步,找到 this.partition 的实例化代码
    this.partitioner = (Partitioner)config.getConfiguredInstance("partitioner.class", Partitioner.class);
    
    (4)最后,查看子类的实现(通过父类查看子类在 mac 版本的 idea 快捷键是 option + command + B)
    public class DefaultPartitioner implements Partitioner {
    
        private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();
    
        public void configure(Map<String, ?> configs) {}
    
        /**
         * Compute the partition for the given record.
         *
         * @param topic The topic name
         * @param key The key to partition on (or null if no key)
         * @param keyBytes serialized key to partition on (or null if no key)
         * @param value The value to partition on or null
         * @param valueBytes serialized value to partition on or null
         * @param cluster The current cluster metadata
         */
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            if (keyBytes == null) {
                return stickyPartitionCache.partition(topic, cluster);
            } 
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            int numPartitions = partitions.size();
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    
        public void close() {}
        
        /**
         * If a batch completed for the current sticky partition, change the sticky partition. 
         * Alternately, if no sticky partition has been determined, set one.
         */
        public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
            stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
        }
    }
    

    相关文章

      网友评论

          本文标题:Kafka 在写数据的时候是如何实现负载均衡的?

          本文链接:https://www.haomeiwen.com/subject/npllgrtx.html