美文网首页
Kafka 自定义 Partition

Kafka 自定义 Partition

作者: cefa6a30d1c3 | 来源:发表于2019-08-02 23:35 被阅读0次
    /**
     * The default partitioning strategy:
     * <ul>
     * <li>If a partition is specified in the record, use it
     * <li>If no partition is specified but a key is present choose a partition based
     * on a hash of the key
     * <li>If no partition or key is present choose a partition in a round-robin
     * fashion
     */
    public class DefaultPartitioner implements Partitioner {
        private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new
                ConcurrentHashMap<>();
    
        public void configure(Map<String, ?> configs) {
        }
    
        /**
         * Compute the partition for the given record.
         *
         * @param topic      The topic name
         * @param key        The key to partition on (or null if no key)
         * @param keyBytes   serialized key to partition on (or null if no key)
         * @param value      The value to partition on or null
         * @param valueBytes serialized value to partition on or null
         * @param cluster    The current cluster metadata
         */
        public int partition(String topic, Object key, byte[] keyBytes, Object
                value, byte[] valueBytes, Cluster cluster) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            int numPartitions = partitions.size();
            if (keyBytes == null) {
                int nextValue = nextValue(topic);
                List<PartitionInfo> availablePartitions =
                        cluster.availablePartitionsForTopic(topic);
                if (availablePartitions.size() > 0) {
                    int part = Utils.toPositive(nextValue) %
                            availablePartitions.size();
                    return availablePartitions.get(part).partition();
                } else {
    // no partitions are available, give a non-available partition
                    return Utils.toPositive(nextValue) % numPartitions;
                }
            } else {
    // hash the keyBytes to choose a partition
                return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
            }
        }
    
        private int nextValue(String topic) {
            AtomicInteger counter = topicCounterMap.get(topic);
            if (null == counter) {
                counter = new
                        AtomicInteger(ThreadLocalRandom.current().nextInt());
                AtomicInteger currentCounter =
                        topicCounterMap.putIfAbsent(topic, counter);
                if (currentCounter != null) {
                    counter = currentCounter;
                }
            }
            return counter.getAndIncrement();
        }
    
        public void close() {
        }
    }
    * 如果指定 partition,就用 partition
    * 如果指定 key,使用 key 进行 hash 取模。
    * 如果没有指定 key,使用轮询的方式。
    
    

    相关文章

      网友评论

          本文标题:Kafka 自定义 Partition

          本文链接:https://www.haomeiwen.com/subject/hghddctx.html