美文网首页
kafka顺序消息踩坑记录

kafka顺序消息踩坑记录

作者: 你与我相似 | 来源:发表于2021-02-20 17:29 被阅读0次

Kafka顺序消息消息

1.消息发送的api

    public ListenableFuture<SendResult<K, V>> send(String topic, V data) {
        ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, data);
        return this.doSend(producerRecord);
    }

    public ListenableFuture<SendResult<K, V>> send(String topic, K key, V data) {
        ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, key, data);
        return this.doSend(producerRecord);
    }

    public ListenableFuture<SendResult<K, V>> send(String topic, int partition, V data) {
        ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, partition, (Object)null, data);
        return this.doSend(producerRecord);
    }

    public ListenableFuture<SendResult<K, V>> send(String topic, int partition, K key, V data) {
        ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, partition, key, data);
        return this.doSend(producerRecord);
    }

2.KafkaProducer.class 封装获取partition方法,优先使用传入的partition

    private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        Integer partition = record.partition();
        //优先返回传入的partition
        return partition != null ? partition : this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
    }

3.DefaultPartitioner.class 封装获取partition方法,优先使用key的murmur2算法的hash值对partitionCount取模,其次使用本地原子类计数器自增值对partitionCount取模

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        //根据topic获取partitionCount
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = this.nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return ((PartitionInfo)availablePartitions.get(part)).partition();
            } else {
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            //指定了key,则根据key的hash来取模选取partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger((new Random()).nextInt());
            AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }

        return counter.getAndIncrement();
    }
  • Utils.class

|

<pre> public static int toPositive(int number) {
return number & 2147483647;
}

public static int murmur2(byte[] data) {
    int length = data.length;
    int seed = -1756908916;
    int m = 1540483477;
    int r = true;
    int h = seed ^ length;
    int length4 = length / 4;

    for(int i = 0; i < length4; ++i) {
        int i4 = i * 4;
        int k = (data[i4 + 0] & 255) + ((data[i4 + 1] & 255) << 8) + ((data[i4 + 2] & 255) << 16) + ((data[i4 + 3] & 255) << 24);
        k *= 1540483477;
        k ^= k >>> 24;
        k *= 1540483477;
        h *= 1540483477;
        h ^= k;
    }

    switch(length % 4) {
    case 3:
        h ^= (data[(length & -4) + 2] & 255) << 16;
    case 2:
        h ^= (data[(length & -4) + 1] & 255) << 8;
    case 1:
        h ^= data[length & -4] & 255;
        h *= 1540483477;
    default:
        h ^= h >>> 13;
        h *= 1540483477;
        h ^= h >>> 15;
        return h;
    }
}</pre>

|

踩坑记录

public ListenableFuture<SendResult<K, V>> send(String topic, int partition, V data) 

使用上述方法时,partition传参不能为Integer类型,生成serializedKey时出会出现类型转换异常,导致消息发送失败

 private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;
        try {
            // first make sure the metadata for the topic is available
            ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
            long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
            Cluster cluster = clusterAndWaitTime.cluster;
            byte[] serializedKey;
            //此处keySerializer会做类型转换,传入Integer会报错
            try {
                serializedKey = keySerializer.serialize(record.topic(), record.key());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in key.serializer");
            }
            byte[] serializedValue;
            try {
                serializedValue = valueSerializer.serialize(record.topic(), record.value());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in value.serializer");
            }

            int partition = partition(record, serializedKey, serializedValue, cluster);
            int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
            ensureValidRecordSize(serializedSize);
            tp = new TopicPartition(record.topic(), partition);
            long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
            log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
            // producer callback will make sure to call both 'callback' and interceptor callback
            Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
            if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }
            return result.future;
            // handling exceptions and record the errors;
            // for API exceptions return them in the future,
            // for other exceptions throw directly
        }

相关文章

  • kafka顺序消息踩坑记录

    Kafka顺序消息消息 1.消息发送的api 2.KafkaProducer.class 封装获取partiti...

  • 【Kafka】Kafka入门手记

    1. 前言 本文为 Kafka 入门笔记,主要包括 Kafka 单节点部署、生产消费消息,以及新手踩坑记录。 Ka...

  • Kafka/RocketMQ顺序消息对比

    一、Kafka顺序消息 Producer端:Kafka的顺序消息是通过partition key,将某类消息(例如...

  • Kafka对于消息顺序性的最佳实践

    Kafka可以保证消息在一个Partition分区内的顺序性。如果生产者按照顺序发送消息,Kafka将按照这个顺序...

  • docker启动kafka

    记录下自己的操作,避免下次用的时候又去踩坑 kafka需要zookeeper管理,所以需要先安装zookeeper...

  • SpringStreaming+Kafka

    摘自 :Spark踩坑记——Spark Streaming+Kafka [TOC] SpringStreaming...

  • Kafka Produce流程

    Kafka是一个消息订阅系统,通过接收消息顺序存储在本地磁盘,以便后端应用从kafka读取消息。本文基于Kafka...

  • MQ随记(2)

    如何保证消息不会被重复消费(保证消息消费时的幂等性) kafka 按照数据进入kafka的顺序,kafka会给每条...

  • 2020-10-19随笔 踩坑0传值

    踩坑:当值传入0时,if条件判断时候会自己转换,记录踩坑。

  • hbase-mutator踩坑

    spark-streaming消费kafka数据,采用buffermutator写hbase踩坑记 场景 otte...

网友评论

      本文标题:kafka顺序消息踩坑记录

      本文链接:https://www.haomeiwen.com/subject/fglxfltx.html