美文网首页
kafka顺序消息踩坑记录

kafka顺序消息踩坑记录

作者: 你与我相似 | 来源:发表于2021-02-20 17:29 被阅读0次

    Kafka顺序消息消息

    1.消息发送的api

        public ListenableFuture<SendResult<K, V>> send(String topic, V data) {
            ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, data);
            return this.doSend(producerRecord);
        }
    
        public ListenableFuture<SendResult<K, V>> send(String topic, K key, V data) {
            ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, key, data);
            return this.doSend(producerRecord);
        }
    
        public ListenableFuture<SendResult<K, V>> send(String topic, int partition, V data) {
            ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, partition, (Object)null, data);
            return this.doSend(producerRecord);
        }
    
        public ListenableFuture<SendResult<K, V>> send(String topic, int partition, K key, V data) {
            ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, partition, key, data);
            return this.doSend(producerRecord);
        }
    

    2.KafkaProducer.class 封装获取partition方法,优先使用传入的partition

        private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
            Integer partition = record.partition();
            //优先返回传入的partition
            return partition != null ? partition : this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
        }
    

    3.DefaultPartitioner.class 封装获取partition方法,优先使用key的murmur2算法的hash值对partitionCount取模,其次使用本地原子类计数器自增值对partitionCount取模

        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            //根据topic获取partitionCount
            int numPartitions = partitions.size();
            if (keyBytes == null) {
                int nextValue = this.nextValue(topic);
                List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
                if (availablePartitions.size() > 0) {
                    int part = Utils.toPositive(nextValue) % availablePartitions.size();
                    return ((PartitionInfo)availablePartitions.get(part)).partition();
                } else {
                    return Utils.toPositive(nextValue) % numPartitions;
                }
            } else {
                //指定了key,则根据key的hash来取模选取partition
                return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
            }
        }
    
        private int nextValue(String topic) {
            AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);
            if (null == counter) {
                counter = new AtomicInteger((new Random()).nextInt());
                AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);
                if (currentCounter != null) {
                    counter = currentCounter;
                }
            }
    
            return counter.getAndIncrement();
        }
    
    • Utils.class

    |

    <pre> public static int toPositive(int number) {
    return number & 2147483647;
    }

    public static int murmur2(byte[] data) {
        int length = data.length;
        int seed = -1756908916;
        int m = 1540483477;
        int r = true;
        int h = seed ^ length;
        int length4 = length / 4;
    
        for(int i = 0; i < length4; ++i) {
            int i4 = i * 4;
            int k = (data[i4 + 0] & 255) + ((data[i4 + 1] & 255) << 8) + ((data[i4 + 2] & 255) << 16) + ((data[i4 + 3] & 255) << 24);
            k *= 1540483477;
            k ^= k >>> 24;
            k *= 1540483477;
            h *= 1540483477;
            h ^= k;
        }
    
        switch(length % 4) {
        case 3:
            h ^= (data[(length & -4) + 2] & 255) << 16;
        case 2:
            h ^= (data[(length & -4) + 1] & 255) << 8;
        case 1:
            h ^= data[length & -4] & 255;
            h *= 1540483477;
        default:
            h ^= h >>> 13;
            h *= 1540483477;
            h ^= h >>> 15;
            return h;
        }
    }</pre>
    

    |

    踩坑记录

    public ListenableFuture<SendResult<K, V>> send(String topic, int partition, V data) 
    

    使用上述方法时,partition传参不能为Integer类型,生成serializedKey时出会出现类型转换异常,导致消息发送失败

     private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
            TopicPartition tp = null;
            try {
                // first make sure the metadata for the topic is available
                ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
                long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
                Cluster cluster = clusterAndWaitTime.cluster;
                byte[] serializedKey;
                //此处keySerializer会做类型转换,传入Integer会报错
                try {
                    serializedKey = keySerializer.serialize(record.topic(), record.key());
                } catch (ClassCastException cce) {
                    throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                            " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                            " specified in key.serializer");
                }
                byte[] serializedValue;
                try {
                    serializedValue = valueSerializer.serialize(record.topic(), record.value());
                } catch (ClassCastException cce) {
                    throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                            " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                            " specified in value.serializer");
                }
    
                int partition = partition(record, serializedKey, serializedValue, cluster);
                int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
                ensureValidRecordSize(serializedSize);
                tp = new TopicPartition(record.topic(), partition);
                long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
                log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
                // producer callback will make sure to call both 'callback' and interceptor callback
                Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
                RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
                if (result.batchIsFull || result.newBatchCreated) {
                    log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                    this.sender.wakeup();
                }
                return result.future;
                // handling exceptions and record the errors;
                // for API exceptions return them in the future,
                // for other exceptions throw directly
            }
    

    相关文章

      网友评论

          本文标题:kafka顺序消息踩坑记录

          本文链接:https://www.haomeiwen.com/subject/fglxfltx.html