美文网首页
Kafka 自定义 Partition

Kafka 自定义 Partition

作者: cefa6a30d1c3 | 来源:发表于2019-08-02 23:35 被阅读0次
/**
 * The default partitioning strategy:
 * <ul>
 * <li>If a partition is specified in the record, use it
 * <li>If no partition is specified but a key is present choose a partition based
 * on a hash of the key
 * <li>If no partition or key is present choose a partition in a round-robin
 * fashion
 */
public class DefaultPartitioner implements Partitioner {
    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new
            ConcurrentHashMap<>();

    public void configure(Map<String, ?> configs) {
    }

    /**
     * Compute the partition for the given record.
     *
     * @param topic      The topic name
     * @param key        The key to partition on (or null if no key)
     * @param keyBytes   serialized key to partition on (or null if no key)
     * @param value      The value to partition on or null
     * @param valueBytes serialized value to partition on or null
     * @param cluster    The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object
            value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions =
                    cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) %
                        availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
// no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
// hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        AtomicInteger counter = topicCounterMap.get(topic);
        if (null == counter) {
            counter = new
                    AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter =
                    topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }
        return counter.getAndIncrement();
    }

    public void close() {
    }
}
* 如果指定 partition,就用 partition
* 如果指定 key,使用 key 进行 hash 取模。
* 如果没有指定 key,使用轮询的方式。

相关文章

网友评论

      本文标题:Kafka 自定义 Partition

      本文链接:https://www.haomeiwen.com/subject/hghddctx.html