美文网首页
kafka的分区策略

kafka的分区策略

作者: CoderInsight | 来源:发表于2023-02-28 09:20 被阅读0次

j简要概括

  • 分区的概念
  • 分区的策略
    • 轮询策略
    • 随机策略
    • 按照消息key保存
    • 基于地理位置分区策略
    • 用户自定义分区
  • 分区的策略小结

详细描述

15.kafka的分区策略

(1), 分区的概念

Kafka 有主题(Topic)的概念,它是承载真实数据的逻辑容器,而在主题之下还分为若干个分区,也就是说 Kafka 的消息组织方式实际上是三级结构:主题 - 分区 - 消息。主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份.


分区的概念.png
对数据进行分区的主要原因,就是为了实现系统的高伸缩性(Scalability)。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,我们还可以通过添加新的节点机器来增加整体系统的吞吐量。

比如在 Kafka 中叫分区,在 MongoDB 和 Elasticsearch 中就叫分片 Shard,而在 HBase 中则叫 Region,在 Cassandra 中又被称作 vnode。从表面看起来它们实现原理可能不尽相同,但对底层分区(Partitioning)的整体思想却从未改变。

除了提供负载均衡这种最核心的功能之外,利用分区也可以实现其他一些业务级别的需求,比如实现业务级别的消息顺序的问题。

(2), 分区的策略

所谓分区策略是决定生产者将消息发送到哪个分区的算法。Kafka 为我们提供了默认的分区策略,同时它也支持你自定义分区策略.

public interface Partitioner extends Configurable, Closeable {

    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes The serialized key to partition on( or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes The serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

    /**
     * This is called when partitioner is closed.
     */
    public void close();

}

1), 轮训策略

也称 Round-robin 策略,即顺序分配。比如一个主题下有 3 个分区,那么第一条消息被发送到分区 0,第二条被发送到分区 1,第三条被发送到分区 2,以此类推。当生产第 4 条消息时又会重新开始,即将其分配到分区 0,就像下面这张图展示的那样。


轮训策略.png

这就是所谓的轮询策略。轮询策略是 Kafka Java 生产者 API 默认提供的分区策略。如果你未指定partitioner.class参数,那么你的生产者程序会按照轮询的方式在主题的所有分区间均匀地“码放”消息。

轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一。

2) 随机策略

也称 Randomness 策略。所谓随机就是我们随意地将消息放置到任意一个分区上,如下面这张图所示。


随机策略.png
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());

本质上看随机策略也是力求将数据均匀地打散到各个分区,但从实际表现来看,它要逊于轮询策略,所以如果追求数据的均匀分布,还是使用轮询策略比较好。事实上,随机策略是老版本生产者使用的分区策略,在新版本中已经改为轮询了。

3) 按照消息key保存

Kafka 允许为每条消息定义消息键,简称为 Key。这个 Key 的作用非常大,它可以是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务 ID 等;也可以用来表征消息元数据。特别是在 Kafka 不支持时间戳的年代,在一些场景中,工程师们都是直接将消息创建时间封装进 Key 里面的。一旦消息被定义了 Key,那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略,如下图所示。

按消息key分区.png
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();

Kafka 默认分区策略实际上同时实现了两种策略:
如果指定了 Key,那么默认实现按消息键保序策略;如果没有指定 Key,则使用轮询策略。

4) 基于地理位置的分区策略

这种策略一般只针对那些大规模的 Kafka 集群,特别是跨城市、跨国家甚至是跨大洲的集群。

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return partitions.stream().filter(p -> isSouth(p.leader().host())).map(PartitionInfo::partition).findAny().get();

可以从所有分区中找出那些 Leader 副本在南方的所有分区,然后随机挑选一个进行消息发送。

5),用户自定义分区

  • 定义一个类实现接口Partitioner
package com.kaikeba.partitioner;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

//todo:需求:自定义kafka的分区函数
public class MyPartitioner implements Partitioner{
    /**
     * 通过这个方法来实现消息要去哪一个分区中
     * @param topic
     * @param key
     * @param bytes
     * @param value
     * @param bytes1
     * @param cluster
     * @return
     */
    public int partition(String topic, Object key, byte[] bytes, Object value, byte[] bytes1, Cluster cluster) {
        //获取topic分区数
        int partitions = cluster.partitionsForTopic(topic).size();
        
        //key.hashCode()可能会出现负数 -1 -2 0 1 2
        //Math.abs 取绝对值
        return Math.abs(key.hashCode()% partitions);

    }

    public void close() {
        
    }

    public void configure(Map<String, ?> map) {

    }
}

  • 配置自定义分区类
//在Properties对象中添加自定义分区类
props.put("partitioner.class","com.kaikeba.partitioner.MyPartitioner");

(3), 分区的策略小结

Kafka的分区策略决定了Producer生产者产生的一条消息最后会写入到Topic的哪一个分区中。

分区是实现负载均衡以及高吞吐量的关键,故在生产者这一端就要仔细盘算合适的分区策略,避免造成消息数据的“倾斜”,使得某些分区成为性能瓶颈,这样极易引发下游数据消费的性能下降。

/**
     * Creates a record with a specified timestamp to be sent to a specified topic and partition
     * 
     * @param topic The topic the record will be appended to
     * @param partition The partition to which the record should be sent
     * @param timestamp The timestamp of the record, in milliseconds since epoch. If null, the producer will assign
     *                  the timestamp using System.currentTimeMillis().
     * @param key The key that will be included in the record
     * @param value The record contents
     * @param headers the headers that will be included in the record
     */
    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {
        if (topic == null)
            throw new IllegalArgumentException("Topic cannot be null.");
        if (timestamp != null && timestamp < 0)
            throw new IllegalArgumentException(
                    String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp));
        if (partition != null && partition < 0)
            throw new IllegalArgumentException(
                    String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition));
        this.topic = topic;
        this.partition = partition;
        this.key = key;
        this.value = value;
        this.timestamp = timestamp;
        this.headers = new RecordHeaders(headers);
    }
  • 1、指定具体的分区号: 这个时候会数据就会写入到指定的分区中
producer.send(new ProducerRecord<String, String>("test", 0,Integer.toString(i), "hello-kafka-"+i));
  • 2、不给定具体的分区号,给定key的值(key不断变化): 则使用key的hashcode进行分区,也就是使用以下公式:(key.hashcode & Integer.MAX_VALUE) %分区数 从而得到对应的 分区号,
producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), "hello-kafka-"+i));
  • 3、不给定具体的分区号,也不给对应的key:这个它会进行轮训(round-robin)的方式把数据写入到不同分区中
producer.send(new ProducerRecord<String, String>("test", "hello-kafka-"+i));
  • 4、也可以自定义分区
//4, 用户可以根据业务需求自定义分区,如类似MR的排序,例如保证某个用户的数据是有序的
(topic.hashCode() & Integer.MAX_VALUE) % numPartitions

相关文章

  • Kafka分区策略

    分区的原因 (1)方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有...

  • Kafka分区策略

    1. 生产者分区选择配策略 生产者在将消息发送到某个Topic ,需要经过拦截器、序列化器和分区器(Partiti...

  • Kafka 分区策略

    我们在使用 Apache Kafka 生产和消费消息的时候,肯定是希望能够将数据均匀地分配到所有服务器上。比如很多...

  • Kafka分区分配策略(Partition Assignment

    Kafka分区分配策略(Partition Assignment Strategy)参考:https://www....

  • 提升内功-kafka producer 小结

    目录 kafka producerkafka 分区策略kafka 数据可靠性保证 - ackkafka 数据一致性...

  • kafka发送消息分区选择策略

    发送kafka:(版本0.10.11) kafka 生产者发送消息分区选择策略 通过跟踪send方法,发现Kafk...

  • 图解Kafka消费者分区分配策略

    1. 分配策略的作用 我们在分析生产者的时候有专门写过文章分析生产者的分区分配策略 Kafka生产者的3种分区策略...

  • A(12)Kafka分区分配策略

    在 Kafka内部存在两种默认的分区分配策略:Range和 RoundRobin。Range是默认策略。Range...

  • (5)消息分发策略

    在介绍kafka的分区策略之前,先看看几个简单的概念 1.topic 在 kafka 中,topic 是一个存储消...

  • 三 Kafka生产者分区策略,Java API

    Kafka生产者 1. 分区策略 1.1 分区的原因 (1) 方便在集群中扩展,提高集群的负载能力(2) 可以提高...

网友评论

      本文标题:kafka的分区策略

      本文链接:https://www.haomeiwen.com/subject/klkqkdtx.html