美文网首页工作生活
kafka之Producer端消息发送过程(一)

kafka之Producer端消息发送过程(一)

作者: Roger1234 | 来源:发表于2019-06-30 14:35 被阅读0次

    一、前言

    有一段时间,工作的主要内容是做业务安全监控。整个流程包含数据采集、清洗、聚合、存储、指定策略、报警透达等步骤,其中会用到Kafka这样的消息中间件、确实很好用。自己在闲暇的时间找了很多技术博客学习,收获很多。但是很多技术博客存在不一致的地方,对自己造成很大困惑,自己一直没有梳理清楚,后来渐渐的意识到,很多不一致的说法是由于Kafka版本不一致的原因造成的。例如:

    (1) Kafka-0.8.2之后producer端不再区分同步(sync)和异步方式(async)。所有的请求以异步方式发送。

    (2) Kafka-0.8.* 之前介绍 producer -> broker 没有实现exactly once,只能保证at least once or at most once; 而在0.11.* 之后的版本中producer -> broker 通过幂等性实现了exactly once。

    如果仅仅想使用Kafka应付日常工作,了解基本的原理就可以了。但是如果想进一步学习,仅仅看别人的技术博客,往往会有一种浮于表面的感觉,Kafak其中的技术实现细节还需自己通过阅读源码学习。首先指明本文及后续的文章,都是基于Kafka-0.11.2版本书写的,以免造成不必要的困惑,kafka源码大致可以分成三个部分:producer端、server端、和consumer端,其producer端和consumer端是由java实现,server端是由Scala来实现的。Client端是用户最常接触的部分,打算先从producer端开始,producer端主要分析如下的几个问题:

    • producer消息发送的过程。
    • producer如何更新metadata。
    • producer如何保证单个partition的有序性。
    • producer如何创建topic。(topic的创建在server完成)
    • producer如何保证幂等性。

    今天分析第一个为问题:producer端消息发送的过程。

    二、Producer消息发送过程

    kafka对底层做了很好的封装,并且对用户提供了非常简单易用的api,我们在利用Producer端发送消息的过程中,往往仅需要指明简单的配置参数,调用KafkaProducer的send方法即可。下面是Kafak文档中给出的一个简单的使用demo。

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "all");
    props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
    Producer<String, String> producer = new KafkaProducer<>(props);
    
     for (int i = 0; i < 100; i++)
         producer.send(new ProducerRecord<String, String>("my-topic",Integer.toString(i), Integer.toString(i)));
    
    producer.close();
    

    2.1 KafakProducer的send方法的实现

    下面一步步的分析send方法的实现过程:

    /**
     * producer端异步的方式向kafka发送消息。
     */
    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
        return send(record, null);
    }
    
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
            //捕获record,这个方法不会抛出异常
            ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
            return doSend(interceptedRecord, callback);
    }
    

    2.2 KafakProducer的doSend方法的实现

    private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
            TopicPartition tp = null;
            try {
                throwIfProducerClosed();
                // first make sure the metadata for the topic is available
                ClusterAndWaitTime clusterAndWaitTime;
                try {
                    clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
                } catch (KafkaException e) {
                    if (metadata.isClosed())
                        throw new KafkaException("Producer closed while send in progress", e);
                    throw e;
                }
                long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
                Cluster cluster = clusterAndWaitTime.cluster;
                byte[] serializedKey;
                try {
                    serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
                } catch (ClassCastException cce) {
                    throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                            " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                            " specified in key.serializer", cce);
                }
                byte[] serializedValue;
                try {
                    serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
                } catch (ClassCastException cce) {
                    throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                            " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                            " specified in value.serializer", cce);
                }
                int partition = partition(record, serializedKey, serializedValue, cluster);
                tp = new TopicPartition(record.topic(), partition);
    
                setReadOnly(record.headers());
                Header[] headers = record.headers().toArray();
    
                int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
                        compressionType, serializedKey, serializedValue, headers);
                ensureValidRecordSize(serializedSize);
                long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
                log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
                // producer callback will make sure to call both 'callback' and interceptor callback
                Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
    
                if (transactionManager != null && transactionManager.isTransactional())
                    transactionManager.maybeAddPartitionToTransaction(tp);
    
                RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                        serializedValue, headers, interceptCallback, remainingWaitMs);
                if (result.batchIsFull || result.newBatchCreated) {
                    log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                    this.sender.wakeup();
                }
                return result.future;
                // handling exceptions and record the errors;
                // for API exceptions return them in the future,
                // for other exceptions throw directly
            } catch (ApiException e) {
                log.debug("Exception occurred during message send:", e);
                if (callback != null)
                    callback.onCompletion(null, e);
                this.errors.record();
                this.interceptors.onSendError(record, tp, e);
                return new FutureFailure(e);
            } catch (InterruptedException e) {
                this.errors.record();
                this.interceptors.onSendError(record, tp, e);
                throw new InterruptException(e);
            } catch (BufferExhaustedException e) {
                this.errors.record();
                this.metrics.sensor("buffer-exhausted-records").record();
                this.interceptors.onSendError(record, tp, e);
                throw e;
            } catch (KafkaException e) {
                this.errors.record();
                this.interceptors.onSendError(record, tp, e);
                throw e;
            } catch (Exception e) {
                // we notify interceptor about all exceptions, since onSend is called before anything else in this method
                this.interceptors.onSendError(record, tp, e);
                throw e;
            }
        }
    

    Producer端的消息发送逻辑主要在doSend方法中,主要的逻辑如下,为了简单理解,我们假设没有开启Kafka的事务性,关于事务性,后面打算再详细介绍。
    (1) 判断producer实例是否关闭,一旦关闭,抛出异常。

    (2) 判断要发送的topic对应的matadata是可用的,如果不可用,则需要发送请求更新metadata数据。
    关于如何更新metadata,会另写一片文章,详细介绍。

    (3) 序列化要发送消息的key、value信息。
    Kafka内部提供了许多序列化和返序列的相关算法。Producer端对record的key和value值进行序列化操作,在Consumer端再进行相应的反序列化。各种各种算法的具体实现在package org.apache.kafka.common.serialization路径下;
    [站外图片上传中...(image-871ebb-1561876501754)]

    (4) 判定record信息发送到topic下的哪个partition。

    private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
            Integer partition = record.partition();
            return partition != null ?
                    partition :
                    partitioner.partition(
                            record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
        }
    
    // 如果没有自定义的partitioner,默认使用kafka提供的DefaultPartioner。
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            int numPartitions = partitions.size();
            if (keyBytes == null) {
                int nextValue = nextValue(topic);
                List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
                if (availablePartitions.size() > 0) {
                    int part = Utils.toPositive(nextValue) % availablePartitions.size();
                    return availablePartitions.get(part).partition();
                } else {
                    // no partitions are available, give a non-available partition
                    return Utils.toPositive(nextValue) % numPartitions;
                }
            } else {
                // hash the keyBytes to choose a partition
                return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
            }
        }
    

    如果指明要发送的partition,直接发送;若没有指明topic对用的partition和Partitioner的情况下,默认使用kafka提供的 org.apache.kafka.clients.producer.internals.DefaultPartitioner。当然也可以定义自己的Partitioner。当key存在的时候,对key值hash取模确定partition;当key不存在的时候,采用round-rabin算法。具体算法详情见链接:https://www.iteblog.com/archives/2209.html

    (5)向topic对应的队列中追加数据
    Producer端发送的消息record并没有直接发送给kafka,而是进入到一个Buffer。RecordAccumulator充当一个队列,收集要发送到server的record消息,然后batch发送。RecordAccumulator模型如下图所示,其中很重要的一个属性,ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches,每个TopicPartition都会对应一个Deque<RecordBatch>,当添加数据时,会向其topic-partition对应的这个queue最新创建的一个RecordBatch中添加record,而发送数据时,则会先从queue中最老的那个RecordBatch开始发送。
    [站外图片上传中...(image-b318fb-1561876501754)]

    public RecordAppendResult append(TopicPartition tp,
                                         long timestamp,
                                         byte[] key,
                                         byte[] value,
                                         Header[] headers,
                                         Callback callback,
                                         long maxTimeToBlock) throws InterruptedException {
            // We keep track of the number of appending thread to make sure we do not miss batches in
            // abortIncompleteBatches().
            appendsInProgress.incrementAndGet();
            ByteBuffer buffer = null;
            if (headers == null) headers = Record.EMPTY_HEADERS;
            try {
                // check if we have an in-progress batch
                Deque<ProducerBatch> dq = getOrCreateDeque(tp);
                synchronized (dq) {
                    if (closed)
                        throw new KafkaException("Producer closed while send in progress");
                    RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
                    if (appendResult != null)
                        return appendResult;
                }
    
                // we don't have an in-progress record batch try to allocate a new batch
                byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
                int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
                log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
                buffer = free.allocate(size, maxTimeToBlock);
                synchronized (dq) {
                    // Need to check if producer is closed again after grabbing the dequeue lock.
                    if (closed)
                        throw new KafkaException("Producer closed while send in progress");
    
                    RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
                    if (appendResult != null) {
                        // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                        return appendResult;
                    }
    
                    MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
                    ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
                    FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
    
                    dq.addLast(batch);
                    incomplete.add(batch);
    
                    // Don't deallocate this buffer in the finally block as it's being used in the record batch
                    buffer = null;
                    return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
                }
            } finally {
                if (buffer != null)
                    free.deallocate(buffer);
                appendsInProgress.decrementAndGet();
            }
        }
    

    向发送队列添加record的消息主要逻辑如下:

    (1)、getOrCreateDeque方法获取topicPartition对应的消息队列,如果对应的队列不存在,创建一个新的消息队列。
    (2)、tryAppend方法尝试将消息添加到消息队列中最后的ProducerBatch对象中,如果不存在直接返回null;如果存在,尝试向ProducerBatch添加数据,如果空间不足,返回null,不然成功添加。
    (3)、代码中存在两段重复的代码,个人对官方注解理解:第一次对dq释放锁后,其他线程可能拿到dq的锁,在次期间,对应的dq成功创建了一个ProducerBatch对象。
    (4)、如果两次锁后都没发现对应的deque存在的ProducerBatch对象,新创建ProducerBatch实例,将消息添加到其中,并追加到对tp的队列中去。

    2.3 唤醒Sender线程,发送消息

    如果对应topicPartition的消息队列最新的ProducerBtach的is full或者有新的batch创建,就唤醒Sender线程,发送消息数据。这部分的内容,可能需要了解Java NIO的相关的知识。

    void run(long now) {
            if (transactionManager != null) {
                try {
                    if (transactionManager.shouldResetProducerStateAfterResolvingSequences())
                        // Check if the previous run expired batches which requires a reset of the producer state.
                        transactionManager.resetProducerId();
                    if (!transactionManager.isTransactional()) {
                        // this is an idempotent producer, so make sure we have a producer id
                        maybeWaitForProducerId();
                    } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) {
                        transactionManager.transitionToFatalError(
                            new KafkaException("The client hasn't received acknowledgment for " +
                                "some previously sent messages and can no longer retry them. It isn't safe to continue."));
                    } else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) {
                        // as long as there are outstanding transactional requests, we simply wait for them to return
                        client.poll(retryBackoffMs, now);
                        return;
                    }
    
                    // do not continue sending if the transaction manager is in a failed state or if there
                    // is no producer id (for the idempotent case).
                    if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) {
                        RuntimeException lastError = transactionManager.lastError();
                        if (lastError != null)
                            maybeAbortBatches(lastError);
                        client.poll(retryBackoffMs, now);
                        return;
                    } else if (transactionManager.hasAbortableError()) {
                        accumulator.abortUndrainedBatches(transactionManager.lastError());
                    }
                } catch (AuthenticationException e) {
                    // This is already logged as error, but propagated here to perform any clean ups.
                    log.trace("Authentication exception while processing transactional request: {}", e);
                    transactionManager.authenticationFailed(e);
                }
            }
    
            long pollTimeout = sendProducerData(now);
            client.poll(pollTimeout, now);
        }
    

    为了方便理解,我们假设不开启Kafka的事务属性,所以主要的逻辑在sendProducerData方法中:

    private long sendProducerData(long now) {
            Cluster cluster = metadata.fetch();
            // get the list of partitions with data ready to send
            RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
    
            // if there are any partitions whose leaders are not known yet, force metadata update
            if (!result.unknownLeaderTopics.isEmpty()) {
                // The set of topics with unknown leader contains topics with leader election pending as well as
                // topics which may have expired. Add the topic again to metadata to ensure it is included
                // and request metadata update, since there are messages to send to the topic.
                for (String topic : result.unknownLeaderTopics)
                    this.metadata.add(topic);
    
                log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
                    result.unknownLeaderTopics);
                this.metadata.requestUpdate();
            }
    
            // remove any nodes we aren't ready to send to
            Iterator<Node> iter = result.readyNodes.iterator();
            long notReadyTimeout = Long.MAX_VALUE;
            while (iter.hasNext()) {
                Node node = iter.next();
                if (!this.client.ready(node, now)) {
                    iter.remove();
                    notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
                }
            }
    
            // create produce requests
            Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
            addToInflightBatches(batches);
            if (guaranteeMessageOrder) {
                // Mute all the partitions drained
                for (List<ProducerBatch> batchList : batches.values()) {
                    for (ProducerBatch batch : batchList)
                        this.accumulator.mutePartition(batch.topicPartition);
                }
            }
    
            accumulator.resetNextBatchExpiryTime();
            List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
            List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
            expiredBatches.addAll(expiredInflightBatches);
    
            // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
            // for expired batches. see the documentation of @TransactionState.resetProducerId to understand why
            // we need to reset the producer id here.
            if (!expiredBatches.isEmpty())
                log.trace("Expired {} batches in accumulator", expiredBatches.size());
            for (ProducerBatch expiredBatch : expiredBatches) {
                String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
                    + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
                failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);
                if (transactionManager != null && expiredBatch.inRetry()) {
                    // This ensures that no new batches are drained until the current in flight batches are fully resolved.
                    transactionManager.markSequenceUnresolved(expiredBatch.topicPartition);
                }
            }
            sensors.updateProduceRequestMetrics(batches);
    
            // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
            // loop and try sending more data. Otherwise, the timeout will be the smaller value between next batch expiry
            // time, and the delay time for checking data availability. Note that the nodes may have data that isn't yet
            // sendable due to lingering, backing off, etc. This specifically does not include nodes with sendable data
            // that aren't ready to send since they would cause busy looping.
            long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
            pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
            pollTimeout = Math.max(pollTimeout, 0);
            if (!result.readyNodes.isEmpty()) {
                log.trace("Nodes with data ready to send: {}", result.readyNodes);
                // if some partitions are already ready to be sent, the select time would be 0;
                // otherwise if some partition already has some data accumulated but not ready yet,
                // the select time will be the time difference between now and its linger expiry time;
                // otherwise the select time will be the time difference between now and the metadata expiry time;
                pollTimeout = 0;
            }
            sendProduceRequests(batches, now);
            return pollTimeout;
        }
    
    private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
        for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet())
            sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue());
    }
    
    private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
            if (batches.isEmpty())
                return;
    
            Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
            final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
    
            // find the minimum magic version used when creating the record sets
            byte minUsedMagic = apiVersions.maxUsableProduceMagic();
            for (ProducerBatch batch : batches) {
                if (batch.magic() < minUsedMagic)
                    minUsedMagic = batch.magic();
            }
    
            for (ProducerBatch batch : batches) {
                TopicPartition tp = batch.topicPartition;
                MemoryRecords records = batch.records();
    
                // down convert if necessary to the minimum magic used. In general, there can be a delay between the time
                // that the producer starts building the batch and the time that we send the request, and we may have
                // chosen the message format based on out-dated metadata. In the worst case, we optimistically chose to use
                // the new message format, but found that the broker didn't support it, so we need to down-convert on the
                // client before sending. This is intended to handle edge cases around cluster upgrades where brokers may
                // not all support the same message format version. For example, if a partition migrates from a broker
                // which is supporting the new magic version to one which doesn't, then we will need to convert.
                if (!records.hasMatchingMagic(minUsedMagic))
                    records = batch.records().downConvert(minUsedMagic, 0, time).records();
                produceRecordsByPartition.put(tp, records);
                recordsByPartition.put(tp, batch);
            }
    
            String transactionalId = null;
            if (transactionManager != null && transactionManager.isTransactional()) {
                transactionalId = transactionManager.transactionalId();
            }
            ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
                    produceRecordsByPartition, transactionalId);
            RequestCompletionHandler callback = new RequestCompletionHandler() {
                public void onComplete(ClientResponse response) {
                    handleProduceResponse(response, recordsByPartition, time.milliseconds());
                }
            };
    
            String nodeId = Integer.toString(destination);
            ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
                    requestTimeoutMs, callback);
            client.send(clientRequest, now);
            log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
        }
    

    sendProducerData方法的逻辑包括如下:
    (1)、accumulator.ready方法:遍历每一tp对应的消息队列Deque,如果某一tp对应的leader不存在就放入unknownLeaderTopics集合中;如果tp的最早batch满足发送条件,就把对应的leader方入readyNodes集合中,最后把包含这两个属性的结果返回。
    (2)、如果上面一步返回的结果中的unknownLeaderTopics集合不空,遍历集合,然后更新metadata。具体的更新策略过程,会再出一篇文章详细介绍。
    (3)、依据client是否与leader对应的node建立好连接,判断node是否ready。对于没有建立连接的node,会初始化连接。
    (4)、accumulator.drain方法:遍历readyNodes集合, 对其上的每一个的tp对应的消息队列的最早的batch,添加到ready列表中,返回结果类型为Map<Integer, List<ProducerBatch>>,Integer为结点id,List<ProducerBatch>为该结点上所有tp的最早的消息batch。
    (5)、如果要求发送消息的有序性,将对应的tp静默。
    (6)、删除过期batch。
    (7)、sendProduceRequest方法将每一个node节点的消息batch,封装成一个发送clientRequest请求,然后调用NetWorkClient的send方法。
    (8)、调用KafkaClient的poll方法。关于socket的IO操作都是在这个方法进行的,它还是调用 Selector进行的相应操作,而Selector底层则是封装的JavaNIO的相关接口。打算出一篇文章详细介绍。

    三、总结

    本篇文章简单介绍了Producer发送消息的全过程,其中还会有细节没有讲解到, 后续会按照前言中的计划,分成几篇文章来讲解。

    相关文章

      网友评论

        本文标题:kafka之Producer端消息发送过程(一)

        本文链接:https://www.haomeiwen.com/subject/jckbcctx.html