Kafka 分区器
我们知道,Kafka中的每个Topic一般会分配N个Partition,那么生产者(Producer
)在将消息记录(ProducerRecord
)发送到某个Topic
对应的Partition
时采用何种策略呢?Kafka中采用了分区器(Partitioner
)来为我们进行分区路由的操作。本文将详细讨论Kafka给我们提供的分区器实现DefaultPartitioner
,当然我们也可以实现自定义的分区器,只需要实现Partitioner
接口。
org.apache.kafka.clients.producer.Partitioner源代码如下:
package org.apache.kafka.clients.producer;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.Cluster;
import java.io.Closeable;
/**
* Partitioner Interface
*/
public interface Partitioner extends Configurable, Closeable {
/**
* 为给定的record计算分取
*
* @param topic 主题的名字
* @param key The key to partition on (or null if no key)
* @param keyBytes The serialized key to partition on( or null if no key)
* @param value The value to partition on or null
* @param valueBytes The serialized value to partition on or null
* @param cluster 当前集群的元数据
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
/**
* This is called when partitioner is closed.
*/
public void close();
}
Partitioner实现了Configurable接口,Configurable源码如下:
/**
* A Mix-in style interface for classes that are instantiated by reflection and need to take configuration parameters
*/
public interface Configurable {
/**
* Configure this class with the given key-value pairs
*/
void configure(Map<String, ?> configs);
}
org.apache.kafka.clients.producer.internals.DefaultPartitioner源码如下:
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
/**
* The default partitioning strategy: 默认的分取策略
* <ul>
* <li>If a partition is specified in the record, use it
如果record指定了分取器,那么就使用指定的分区器
* <li>If no partition is specified but a key is present choose a partition based on a hash of the key
如果没有指定分区器,但是key选择一个分区,该分区基于key的Hash值
* <li>If no partition or key is present choose a partition in a round-robin fashion
如果没有指定分区器,或者key是以轮询(round-robin)的方式选择分区
*/
public class DefaultPartitioner implements Partitioner {
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
public void configure(Map<String, ?> configs) {}
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//获取该主题对应的主题分区列表
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();//获取主题分区列表的大小
if (keyBytes == null) {//如果key为null
int nextValue = nextValue(topic);//获取到下一个值
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);//获取该主题的可用的分区列表
if (availablePartitions.size() > 0) {//如果可用的分区列表大于0
//采用轮询的方式获取分区号
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// 没有可用分区,对所有的非可用分区轮询
return Utils.toPositive(nextValue) % numPartitions;
}
} else {//如果key不为null
// 采用Utils.murmur2(keyBytes)哈希算法,高性能,低碰撞
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
//获取下一个value
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);//从缓存中获取counter
if (null == counter) {//如果counter不存在
//创建counter
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
//将counter存入缓存
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
//如果存入缓存已经存在了内容
if (currentCounter != null) {
counter = currentCounter;//将已经存在内容赋值为counter
}
}
return counter.getAndIncrement();//获取并且增加1
}
public void close() {}
}
对Utils两个方法的源码解读:
/**
* A cheap way to deterministically convert a number to a positive value. When the input is
* positive, the original value is returned. When the input number is negative, the returned
* positive value is the original value bit AND against 0x7fffffff which is not its absolutely
* value.
*
* Note: changing this method in the future will possibly cause partition selection not to be
* compatible with the existing messages already placed on a partition since it is used
* in producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}
*
* @param number a given number
* @return a positive number.
*/
//0x7fffffff: 01111111 11111111 11111111 11111111
public static int toPositive(int number) {
return number & 0x7fffffff;
}
/**
* Generates 32 bit murmur2 hash from byte array
* @param data byte array to hash
* @return 32 bit hash of the given array
*/
public static int murmur2(final byte[] data) {
int length = data.length;
int seed = 0x9747b28c;
// 'm' and 'r' are mixing constants generated offline.
// They're not really 'magic', they just happen to work well.
final int m = 0x5bd1e995;
final int r = 24;
// Initialize the hash to a random value
int h = seed ^ length;
int length4 = length / 4;
for (int i = 0; i < length4; i++) {
final int i4 = i * 4;
int k = (data[i4 + 0] & 0xff) + ((data[i4 + 1] & 0xff) << 8) + ((data[i4 + 2] & 0xff) << 16) + ((data[i4 + 3] & 0xff) << 24);
k *= m;
k ^= k >>> r;
k *= m;
h *= m;
h ^= k;
}
// Handle the last few bytes of the input array
switch (length % 4) {
case 3:
h ^= (data[(length & ~3) + 2] & 0xff) << 16;
case 2:
h ^= (data[(length & ~3) + 1] & 0xff) << 8;
case 1:
h ^= data[length & ~3] & 0xff;
h *= m;
}
h ^= h >>> 13;
h *= m;
h ^= h >>> 15;
return h;
}
自定义分区器:
package com.ghq.kafka.server;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
public class DemoPartitioner implements Partitioner {
private final AtomicInteger integer = new AtomicInteger(0);
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//分区的逻辑,根据具体的业务逻辑来实现
return 0;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
如何使用自定义分区器?在KafkaProdcer构造的配置prop中添加如下代码:
prop.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,DemoPartitioner.class.getName());
网友评论