美文网首页
KafkaProducer之 RecordAccumulator

KafkaProducer之 RecordAccumulator

作者: _孙行者_ | 来源:发表于2020-09-14 17:23 被阅读0次

    写缓存相关

    append() 方法

    org.apache.kafka.clients.producer.internals.RecordAccumulator#append()

    /**
         * 添加 record 到 accumulator,并返回结果
         * 返回结果是带有 future metadata。并且带有标识,添加的 batch 是否满了,或者是不是新创建的。
         */
        public RecordAppendResult append(TopicPartition tp, // topic和partition的绑定
                                         long timestamp,//指定时间戳
                                         byte[] key,    //key
                                         byte[] value,  //value
                                         Header[] headers,  //headers
                                         Callback callback, //回调方法
                                         long maxTimeToBlock,   //最大等待时长
                                         boolean abortOnNewBatch,   //在要创建新的 batch时,是否放弃
                                         long nowMs) throws InterruptedException {
            // We keep track of the number of appending thread to make sure we do not miss batches in
            // abortIncompleteBatches().
            //记录进行中的线程数
            appendsInProgress.incrementAndGet();
            //接收返回的buffer
            ByteBuffer buffer = null;
            if (headers == null) headers = Record.EMPTY_HEADERS; //初始化一个 空的 headers
            try {
                // check if we have an in-progress batch
                // 取当前topicPartition 的 deque,没有的话,创建一个
                Deque<ProducerBatch> dq = getOrCreateDeque(tp);
                synchronized (dq) {
                    if (closed)
                        throw new KafkaException("Producer closed while send in progress");
                    //尝试下写record 到 buffer 里
                    RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
                    if (appendResult != null)
                        // 写成功了的,就结束了
                        return appendResult;
                }
                //当前已有的batch写失败了,不满足了,需要创建新的 batch了
                // we don't have an in-progress record batch try to allocate a new batch
                if (abortOnNewBatch) {
                    // 不创建的时候,返回失败的内容
                    // Return a result that will cause another call to append.
                    return new RecordAppendResult(null, false, false, true);
                }
    
                // 取魔数,即 api 的版本号,这个是用来跟 broker 端是接口互通的
                byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
                // 不够一个 batch ,也取一个 batch的大小,后面再来的可以往一块塞
                int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
                log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
                //分配内存
                buffer = free.allocate(size, maxTimeToBlock);
    
                // Update the current time in case the buffer allocation blocked above.
                nowMs = time.milliseconds();
                synchronized (dq) {
                    // Need to check if producer is closed again after grabbing the dequeue lock.
                    // 校验 closed
                    if (closed)
                        throw new KafkaException("Producer closed while send in progress");
                    // 再次尝试写 record 到 buffer
                    RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
                    if (appendResult != null) {
                        // 写成功了的,返回
                        // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                        return appendResult;
                    }
                    //还是写失败了,再通过buffer构建 MemoryRecordsBuilder
                    MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
                    // 构建 batch
                    ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);
                    // 再次写record 到 缓存中
                    FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
                            callback, nowMs));
                    // 把 batch 加到 deque 中
                    dq.addLast(batch);
                    // 把 batch 加到 incomplete 中
                    incomplete.add(batch);
    
                    // Don't deallocate this buffer in the finally block as it's being used in the record batch
                    // 这里是个小技巧,已初始化的 buffer , 已经保存到 MemoryRecordsBuilder中了,把这里的 buffer 指针,指向 null
                    // 这样就确保后面的 final 不会释放了 这个buffer 。
                    buffer = null;
                    return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);
                }
            } finally {
                if (buffer != null)
                    // 分配了内存,但是没用上 ,再释放了
                    free.deallocate(buffer);
                // 流程结束了,数量减 1
                appendsInProgress.decrementAndGet();
            }
        }
    
        private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer, byte maxUsableMagic) {
            if (transactionManager != null && maxUsableMagic < RecordBatch.MAGIC_VALUE_V2) {
                throw new UnsupportedVersionException("Attempting to use idempotence with a broker which does not " +
                    "support the required message format (v2). The broker must be version 0.11 or later.");
            }
            // 构建 MemoryRecordsBuilder
            return MemoryRecords.builder(buffer, maxUsableMagic, compression, TimestampType.CREATE_TIME, 0L);
        }
    
        /**
         *  尝试添加 record 到 ProducerBatch 中
         *
         *  如果deque 里的 batch 都 满了 ,返回 null 。
         *  如果刚好 lastBatch 满了 , 那么把 lastBatch 标识为 关了。不再写了
         */
        private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
                                             Callback callback, Deque<ProducerBatch> deque, long nowMs) {
            ProducerBatch last = deque.peekLast();
            if (last != null) {
                // 有batch , 往lastBatch 写数据
                FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs);
                if (future == null)
                    // lastBatch 空间不足了,把 lastBatch 关了。
                    last.closeForRecordAppends();
                else
                    // 写record 成功了,返回带 future的结果
                    return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false);
            }
            // deque 里没有 batch ,返回 null
            return null;
        }
    
    KafkaProducer-RecordAccumulate-append.png

    发送 ( sender ) 相关

    ready() 方法,有Node准备好可以发送了
       /**
         * 这个方法拿的是节点(Node),并不关心当前的数据
         * Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable
         * partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated
         * partition batches.
         * 获取已经准备好可以发送的Node节点,及相关partition,对于最早的那些,还没准备好的(non-sendable)partition 也会设置为 ready。
         * 还有一些已经准备好的,但缺少leader的 partition,也会在 return 结果中。
         * <p>
         * A destination node is ready to send data if:
         * 一个节点是否准备好可以发送数据,需要具体以下的条件。
         * <ol>
         * <li>There is at least one partition that is not backing off its send
         * 至少有一个partition 没有在等待中,也就是说至少有一个 partition的数据,已经写缓存写完了。
         * <li><b>and</b> those partitions are not muted (to prevent reordering if
         *   {@value org.apache.kafka.clients.producer.ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION}
         *   is set to one)</li>
         *   并且 这些已经写完缓存的 partition 不是 静默处理的。也就是说可以发送的。
         * <li><b>and <i>any</i></b> of the following are true</li>
         * 以下条件,有一个是true就可以发送了
         * <ul>
         *     <li>The record set is full</li> // 缓存满了
         *     <li>The record set has sat in the accumulator for at least lingerMs milliseconds</li>
         *     // 缓存还没满,但是已经等了 lingerMs 的时间了
         *     <li>The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions
         *     are immediately considered ready).</li>accumulator 的缓存池已经满了,再写不下数据了,写缓存的线程都在阻塞了
         *
         *     <li>The accumulator has been closed</li> accumulator 被关了
         * </ul>
         * </ol>
         */
        public ReadyCheckResult ready(Cluster cluster, long nowMs) {
            Set<Node> readyNodes = new HashSet<>();
            long nextReadyCheckDelayMs = Long.MAX_VALUE;
            Set<String> unknownLeaderTopics = new HashSet<>();
            // free是否被用光了。queued > 0 ,表示有等待写free的线程,free就满了
            boolean exhausted = this.free.queued() > 0;
            //遍历数据 TopicPartition的ProducerBatch
            for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
                Deque<ProducerBatch> deque = entry.getValue();
                //deque会同时存在写和读两种场景,所以必须得加锁
                synchronized (deque) {
                    // When producing to a large number of partitions, this path is hot and deques are often empty.
                    // We check whether a batch exists first to avoid the more expensive checks whenever possible.
                    // 这里是一个优化动作, 之前是 leader 和 deque 同时判断. 但发现大部分情况都是失败在 deque 这里了, leader 判断用不到 . 优化做了个拆分,先判断 deque , 再判断leader
                    // 这里的意思是,deque是取第一个作为代表了,有数据就有数据,没数据就没数据。
                    // 因为是锁内操作,如果全遍历的话,操作比较重,性能不行。
                    ProducerBatch batch = deque.peekFirst();
                    if (batch != null) {
                        //有数据要发送,取 partition
                        TopicPartition part = entry.getKey();
                        //从集群中,取partition的leader
                        Node leader = cluster.leaderFor(part);
                        if (leader == null) {
                            // This is a partition for which leader is not known, but messages are available to send.
                            // Note that entries are currently not removed from batches when deque is empty.
                            // 异常情况,没有 leader ,远端服务异常了,但数据已经准备好可以发送了,
                            // 将情况作为返回值抛出去,让sender去决定
                            unknownLeaderTopics.add(part.topic());
                        } else if (!readyNodes.contains(leader) && !isMuted(part)) {
                            //leader 节点是准备好可以发送的,那么判断下
                            //batch从最后一次写数据,到当前所经历的时间,
                            long waitedTimeMs = batch.waitedTimeMs(nowMs);
                            //是否是等待中的 batch
                            boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
                            // 是等待中的,1:等待重试的,取重试的时间。2:等待数据的,取等待数据的时间
                            long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                            //batch 是否满了,deque.size() > , 说明有多个 batch,当前这个batch 就算没满,也装不下更多的数据了。 || batch 真的满了。
                            boolean full = deque.size() > 1 || batch.isFull();
                            // 等待是否超时了
                            boolean expired = waitedTimeMs >= timeToWaitMs;
                            // batch 满了,超时了,free用光了, RecordAccumulator 被关了,有线程正在等待清空缓存,发送数据
                            boolean sendable = full || expired || exhausted || closed || flushInProgress();
                            // 可发送,不在等待中。决定了,就你了。去吧皮卡丘!
                            if (sendable && !backingOff) {
                                readyNodes.add(leader);
                            } else {
                                //不可发送,或者还在等待中
                                // 有正在等待中的,计算下还要剩余等待的时长
                                long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                                // Note that this results in a conservative estimate since an un-sendable partition may have
                                // a leader that will later be found to have sendable data. However, this is good enough
                                // since we'll just wake up and then sleep again for the remaining time.
                                // 可能当前还不能发送的 partition , 过会就有可以发送的了 . 所以这里会重复重置最短的等待时间 .
                                // 取最短的等待时间睡眠, 然后再起来取数据, 再睡眠,再起来... 一直循环.
                                nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                            }
                        }
                    }
                }
            }
            return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
        }
    
    
    RecordAccumulator-ready.png

    ready的条件比较多,具体看图吧。

    drain() 方法,取缓存数据
    
        /**
         * 把给定的 节点 (Node) 的数据给 Drain (抽干,取完) 了
         * 在进行中的时候,别的方法不能操作这个节点,禁掉(Mute)
         */
        public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
            if (nodes.isEmpty())
                return Collections.emptyMap();
    
    
            Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
            for (Node node : nodes) {
                //遍历所有可发送的Node ,取出Node上所有相关的数据
                List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now);
                //正常取出,放到结果集中
                batches.put(node.id(), ready);
            }
            return batches;
        }
    
        private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) {
            int size = 0;
            //从 cluster 中取出 node中的所有 partition
            List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
            List<ProducerBatch> ready = new ArrayList<>();
            /* to make starvation less likely this loop doesn't start at 0 */
            // 为了防止饥饿的场景,循环不是从0开始的,为什么会有饥饿场景 ???
            int start = drainIndex = drainIndex % parts.size();
            do {
                // 取partition
                PartitionInfo part = parts.get(drainIndex);
                TopicPartition tp = new TopicPartition(part.topic(), part.partition());
                //下一次遍历的index
                this.drainIndex = (this.drainIndex + 1) % parts.size();
    
                // Only proceed if the partition has no in-flight batches.
                if (isMuted(tp))
                    //正在读写数据中的,跳过
                    continue;
                //取到deque
                Deque<ProducerBatch> deque = getDeque(tp);
                if (deque == null)
                    //已经空了,跳过
                    continue;
                // deque的所有操作,都要先加锁
                synchronized (deque) {
                    // invariant: !isMuted(tp,now) && deque != null
                    // 拿第一个,判断一下
                    ProducerBatch first = deque.peekFirst();
                    if (first == null)
                        continue;
    
                    // first != null
                    // 是否等待中的batch
                    boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs;
                    // Only drain the batch if it is not during backoff period.
                    if (backoff)
                        // 是等待中的,跳过
                        continue;
    
                    // 大小超出 一次请求的最大限制
                    if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) {
                        // 那就这样吧,一次请求满了为止
                        // there is a rare case that a single batch size is larger than the request size due to
                        // compression; in this case we will still eventually send this batch in a single request
                        break;
                    } else {
                        // 当前在事务中的,事务还没完的,不抽数据
                        if (shouldStopDrainBatchesForPartition(first, tp))
                            break;
    
                        //当前是否开启了事务
                        boolean isTransactional = transactionManager != null && transactionManager.isTransactional();
                        ProducerIdAndEpoch producerIdAndEpoch =
                            transactionManager != null ? transactionManager.producerIdAndEpoch() : null;
                        // 真正从 deque 里取出 batch , poll出来
                        // 每个 deque 里只取第一个,
                        // 取数据是从deque的第一个取,写数据是从deque的最后一个写,所以deque需要一个双向队列
                        ProducerBatch batch = deque.pollFirst();
                        if (producerIdAndEpoch != null && !batch.hasSequence()) {
                            // 处理事务相关的,关于batch的顺序
                            // If the batch already has an assigned sequence, then we should not change the producer id and
                            // sequence number, since this may introduce duplicates. In particular, the previous attempt
                            // may actually have been accepted, and if we change the producer id and sequence here, this
                            // attempt will also be accepted, causing a duplicate.
                            //
                            // Additionally, we update the next sequence number bound for the partition, and also have
                            // the transaction manager track the batch so as to ensure that sequence ordering is maintained
                            // even if we receive out of order responses.
                            batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
                            transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
                            log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " +
                                    "{} being sent to partition {}", producerIdAndEpoch.producerId,
                                producerIdAndEpoch.epoch, batch.baseSequence(), tp);
    
                            transactionManager.addInFlightBatch(batch);
                        }
                        // batch 关了,不再读写数据
                        batch.close();
                        // 累加 batch 里的数据量大小
                        size += batch.records().sizeInBytes();
                        // batch 加到待发送的一批里
                        ready.add(batch);
                        // 标识抽完数据的时间
                        batch.drained(now);
                    }
                }
            } while (start != drainIndex);
            return ready;
        }
    
    RecordAccumulator-drain.png

    如果文章有帮助到您,请点个赞,您的反馈会让我感到文章是有价值的

    相关文章

      网友评论

          本文标题:KafkaProducer之 RecordAccumulator

          本文链接:https://www.haomeiwen.com/subject/iqsbektx.html