美文网首页程序员
Kafka-生产者分区

Kafka-生产者分区

作者: 我可能是个假开发 | 来源:发表于2023-01-20 22:16 被阅读0次

    一、分区的好处

    • 便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。
    • 提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

    分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性(Scalability)。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,我们还可以通过添加新的节点机器来增加整体系统的吞吐量。

    生产者分区.png

    kafka默认的分区器DefaultPartitioner

    package org.apache.kafka.clients.producer.internals;
    
    import java.util.Map;
    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;
    import org.apache.kafka.common.utils.Utils;
    /**
     * The default partitioning strategy:
     * <ul>
     * <li>If a partition is specified in the record, use it
     * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
     * <li>If no partition or key is present choose the sticky partition that changes when the batch is full.
     *
     * See KIP-480 for details about sticky partitioning.
     */
    public class DefaultPartitioner implements Partitioner {
    

    二.分区策略

    1.随机策略

    指明partition的情况下,直接将指明的值作为partition值;

    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {}
    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {}
    public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {}
    public ProducerRecord(String topic, Integer partition, K key, V value) {}
    

    例如partition=1,所有数据写入分区1:

    // 指定发送到1号分区
    kafkaProducer.send(new ProducerRecord<>("first", 1, "", "record" + i),
            (recordMetadata, exception) -> {
                if (exception == null) {
                    System.out.println("主题:" + recordMetadata.topic() + ";分区:" + recordMetadata.partition());
                }
            });
    

    实现随机策略版的 partition:

    public class MyPartitioner implements Partitioner {
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            return ThreadLocalRandom.current().nextInt(partitions.size());
        }
          @Override
        public void close() {
        }
    
        @Override
        public void configure(Map<String, ?> map) {
        }
    }
    

    2.按消息键保序策略

    没有指明partition值但有key的情况下,将 key的hash值与topic的partition数进行取余得到partition值;

    public ProducerRecord(String topic, K key, V value) {}
    

    例如:key1的 hash值=5,key2的 hash值=6 ,topic的 partition数 =2,那么key1 对应的value1写入1号分区,key2对应的value2写入0号分区。

    // 指定发送到1号分区
    kafkaProducer.send(new ProducerRecord<>("first", "a", "record" + i),
            (recordMetadata, exception) -> {
                if (exception == null) {
                    System.out.println("主题:" + recordMetadata.topic() + ";分区:" + recordMetadata.partition());
                }
            });
    

    3.轮询策略

    既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)。

    public ProducerRecord(String topic, V value) {}
    

    例如:第一次随机选择0号分区,等0号分区当前批次满了(默认16k)或者linger.ms设置的时间到, Kafka再随机一个分区进行使用(如果还是0会继续随机)。

    kafkaProducer.send(new ProducerRecord<>("first","record" + i),
            (recordMetadata, exception) -> {
                if (exception == null) {
                    System.out.println("主题:" + recordMetadata.topic() + ";分区:" + recordMetadata.partition());
                }
            });
    

    轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一。

    三、自定义分区器

    1.需求

    实现一个分区器实现,发送过来的数据中如果包含 tracy,就发往0号分区,不包含,就发往1号分区。

    2.实现

    • 定义类实现 Partitioner 接口。
    • 重写 partition()方法。
    • 在生产者的配置中添加分区器参数。

    MyPartitioner:

    package kafka;
    
    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;
    
    import java.util.Map;
    
    /**
     * @Title: MyPartitioner.java
     * @Package kafka
     * @Description: 自定义分区器
     * @Author: hongcaixia
     * @Date: 2023/1/21 21:24
     * @Version V1.0
     */
    public class MyPartitioner implements Partitioner {
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            // 获取消息
            String msgValue = value.toString();
            // 创建 partition
            int partition;
            // 判断消息是否包含 tracy
            if (msgValue.contains("tracy")) {
                partition = 0;
            } else {
                partition = 1;
            }
            // 返回分区号
            return partition;
        }
    
        @Override
        public void close() {
    
        }
    
        @Override
        public void configure(Map<String, ?> map) {
    
        }
    }
    
    

    MyProducerPartition

    package kafka;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.Properties;
    
    /**
     * @Title: MyProducer.java
     * @Package kafka
     * @Description: 生产者使用自定义分区器
     * @Author: hongcaixia
     * @Date: 2023/1/20 21:24
     * @Version V1.0
     */
    public class MyProducerPartition {
    
        public static void main(String[] args) {
    
            // 1. 创建kafka生产者的配置对象
            Properties properties = new Properties();
    
            // 2. 给kafka配置对象添加配置信息:bootstrap.servers
            // 连接集群
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092,192.168.0.2:9092");
            // 指定序列化类型 key,value 序列化(必须):key.serializer,value.serializer
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    
            //设置自定义分区器
            properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "kafka.MyPartitioner");
    
            // 3. 创建kafka生产者对象
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    
            // 4. 调用send方法,发送消息
            for (int i = 0; i < 5; i++) {
                kafkaProducer.send(new ProducerRecord<>("first", "record" + i));
            }
    
            // 5. 关闭资源
            kafkaProducer.close();
        }
    }
    

    实现基于地理位置的分区策略

    这种策略一般只针对那些大规模的 Kafka 集群,特别是跨城市、跨国家甚至是跨大洲的集群。
    假设集群中有一部分机器在北京,另外一部分机器在广州。
    某机构计划为每个新注册用户提供一份注册礼品,比如南方的用户注册可以免费得到一碗“甜豆腐脑”,而北方的新注册用户可以得到一碗“咸豆腐脑”。如果用 Kafka 来实现则很简单,只需要创建一个双分区的主题,然后再创建两个消费者程序分别处理南北方注册用户逻辑即可。
    但是需要把南北方注册用户的注册消息正确地发送到位于南北方的不同机房中,因为处理这些消息的消费者程序只可能在某一个机房中启动着。换句话说,送甜豆腐脑的消费者程序只在广州机房启动着,而送咸豆腐脑的程序只在北京的机房中,如果你向广州机房中的 Broker 发送北方注册用户的消息,那么这个用户将无法得到礼品!
    可以根据 Broker 所在的 IP 地址实现定制化的分区策略:

    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    return partitions.stream().filter(p -> isSouth(p.leader().host())).map(PartitionInfo::partition).findAny().get();
    

    从所有分区中找出那些 Leader 副本在南方的所有分区,然后随机挑选一个进行消息发送。

    极客时间《Kafka 核心技术与实战》学习笔记Day6 - http://gk.link/a/11UOV

    相关文章

      网友评论

        本文标题:Kafka-生产者分区

        本文链接:https://www.haomeiwen.com/subject/alpqhdtx.html