美文网首页Kafka
kafka_05_Kafka的分区器

kafka_05_Kafka的分区器

作者: 平头哥2 | 来源:发表于2019-03-25 22:14 被阅读0次

Kafka 分区器

我们知道,Kafka中的每个Topic一般会分配N个Partition,那么生产者(Producer)在将消息记录(ProducerRecord)发送到某个Topic对应的Partition时采用何种策略呢?Kafka中采用了分区器(Partitioner)来为我们进行分区路由的操作。本文将详细讨论Kafka给我们提供的分区器实现DefaultPartitioner,当然我们也可以实现自定义的分区器,只需要实现Partitioner接口。

org.apache.kafka.clients.producer.Partitioner源代码如下:

package org.apache.kafka.clients.producer;

import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.Cluster;
import java.io.Closeable;

/**
 * Partitioner Interface
 */
public interface Partitioner extends Configurable, Closeable {

    /**
     * 为给定的record计算分取
     *
     * @param topic 主题的名字
     * @param key The key to partition on (or null if no key)
     * @param keyBytes The serialized key to partition on( or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes The serialized value to partition on or null
     * @param cluster 当前集群的元数据
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

    /**
     * This is called when partitioner is closed.
     */
    public void close();

}

Partitioner实现了Configurable接口,Configurable源码如下:

/**
 * A Mix-in style interface for classes that are instantiated by reflection and need to take configuration parameters
 */
public interface Configurable {

    /**
     * Configure this class with the given key-value pairs
     */
    void configure(Map<String, ?> configs);
}

org.apache.kafka.clients.producer.internals.DefaultPartitioner源码如下:


import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

/**
 * The default partitioning strategy: 默认的分取策略
 * <ul>
 * <li>If a partition is specified in the record, use it
        如果record指定了分取器,那么就使用指定的分区器
 * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
 如果没有指定分区器,但是key选择一个分区,该分区基于key的Hash值
 
 * <li>If no partition or key is present choose a partition in a round-robin fashion
 如果没有指定分区器,或者key是以轮询(round-robin)的方式选择分区
 */
public class DefaultPartitioner implements Partitioner {

    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();

    public void configure(Map<String, ?> configs) {}

    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes serialized key to partition on (or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //获取该主题对应的主题分区列表
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();//获取主题分区列表的大小
        if (keyBytes == null) {//如果key为null
            int nextValue = nextValue(topic);//获取到下一个值
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);//获取该主题的可用的分区列表
            if (availablePartitions.size() > 0) {//如果可用的分区列表大于0
                //采用轮询的方式获取分区号
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // 没有可用分区,对所有的非可用分区轮询
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {//如果key不为null
            // 采用Utils.murmur2(keyBytes)哈希算法,高性能,低碰撞 
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    //获取下一个value
    private int nextValue(String topic) {
        AtomicInteger counter = topicCounterMap.get(topic);//从缓存中获取counter
        if (null == counter) {//如果counter不存在
            //创建counter
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            //将counter存入缓存
            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
            //如果存入缓存已经存在了内容
            if (currentCounter != null) {
                counter = currentCounter;//将已经存在内容赋值为counter
            }
        }
        return counter.getAndIncrement();//获取并且增加1
    }

    public void close() {}

}

对Utils两个方法的源码解读:

/**
     * A cheap way to deterministically convert a number to a positive value. When the input is
     * positive, the original value is returned. When the input number is negative, the returned
     * positive value is the original value bit AND against 0x7fffffff which is not its absolutely
     * value.
     *
     * Note: changing this method in the future will possibly cause partition selection not to be
     * compatible with the existing messages already placed on a partition since it is used
     * in producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}
     *
     * @param number a given number
     * @return a positive number.
     */
    //0x7fffffff: 01111111 11111111 11111111 11111111
    public static int toPositive(int number) {
        return number & 0x7fffffff;
    }
   /**
     * Generates 32 bit murmur2 hash from byte array
     * @param data byte array to hash
     * @return 32 bit hash of the given array
     */
    public static int murmur2(final byte[] data) {
        int length = data.length;
        int seed = 0x9747b28c;
        // 'm' and 'r' are mixing constants generated offline.
        // They're not really 'magic', they just happen to work well.
        final int m = 0x5bd1e995;
        final int r = 24;

        // Initialize the hash to a random value
        int h = seed ^ length;
        int length4 = length / 4;

        for (int i = 0; i < length4; i++) {
            final int i4 = i * 4;
            int k = (data[i4 + 0] & 0xff) + ((data[i4 + 1] & 0xff) << 8) + ((data[i4 + 2] & 0xff) << 16) + ((data[i4 + 3] & 0xff) << 24);
            k *= m;
            k ^= k >>> r;
            k *= m;
            h *= m;
            h ^= k;
        }

        // Handle the last few bytes of the input array
        switch (length % 4) {
            case 3:
                h ^= (data[(length & ~3) + 2] & 0xff) << 16;
            case 2:
                h ^= (data[(length & ~3) + 1] & 0xff) << 8;
            case 1:
                h ^= data[length & ~3] & 0xff;
                h *= m;
        }

        h ^= h >>> 13;
        h *= m;
        h ^= h >>> 15;

        return h;
    }

自定义分区器:

package com.ghq.kafka.server;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

public class DemoPartitioner implements Partitioner {

    private final AtomicInteger integer = new AtomicInteger(0);
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        //分区的逻辑,根据具体的业务逻辑来实现

        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

如何使用自定义分区器?在KafkaProdcer构造的配置prop中添加如下代码:

prop.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,DemoPartitioner.class.getName());

相关文章

  • kafka_05_Kafka的分区器

    Kafka 分区器 我们知道,Kafka中的每个Topic一般会分配N个Partition,那么生产者(Produ...

  • Spark-RDD分区器

    Spark中现在支持的分区器有Hash分区器和Range分区器,除此之外,用户也可以自定义分区方式。默认的分区方式...

  • partitionBy (通过分区器进行分区)

    作用:对pairRDD进行分区操作,通过指定的分区器决定数据计算的分区,spark默认使用的分区器是HashPar...

  • 键值对RDD数据分区

    前言 Spark目前支持Hash分区、Range分区和用户自定义分区。Hash分区为当前的默认分区。分区器直接决定...

  • Flink的八种分区策略源码解读

    Flink包含8中分区策略,这8中分区策略(分区器)分别如下面所示,本文将从源码的角度一一解读每个分区器的实现方式...

  • Spark中RangePartitioner的实现机制分析

    一.分区器的区别 HashPartitioner分区可能HashPartitioner导致每个分区中数据量的不均匀...

  • kafka05 开发自定义分区器

    开发自定义分区器 上一节我们看到,如果在发送消息的时候没有指定对应的分区,会使用默认分区器对消息进行分区,这一节我...

  • day 4 磁盘分区形式

    生产场景的磁盘分区 普通分区形式 存储服务器(含数据库)的分区方式 门户网站分区方式

  • linux磁盘分区fdisk命令详解

    1. 什么是分区? 分区是将一个硬盘驱动器分成若干个逻辑驱动器,分区是把硬盘连续的区块当做一个独立的磁硬使用。分区...

  • [LN_00] VMware虚拟机安装Linux系统:分区&am

    1. 分区过程 1)分区 Windows、Linux对比:Windows:分区-->格式化-->更改驱动器号和路径...

网友评论

    本文标题:kafka_05_Kafka的分区器

    本文链接:https://www.haomeiwen.com/subject/ryuivqtx.html