美文网首页
Kafka之Producer端如何保证partition内消息的

Kafka之Producer端如何保证partition内消息的

作者: Roger1234 | 来源:发表于2019-07-14 19:17 被阅读0次

    一、前言

    我们知道Producer端发送的相同topic下的消息、依据特定的算法会进入该topic下的不同partition序列中去 ,kafka无法保证跨partition级别(topic级别)顺序性,但是单个partition的级别的有序性,Kafka是可以保证。本文我们主要讲解Kafka的Producer端是如何保证单个partition内的消息的有序性。声明一下,为了保证分析的简单性,我们在此不考虑幂等性和事务性,后面我们会专门介绍。

    二、消息的有序性

    我们通过前面的两篇文章知道,KafkaProducer端的doSend()方法主要所做的工作是更新metadata、将消息进入不同的topicPartition对应的消息队列ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches、唤醒Sender线程。真正的发送逻辑在Sender线程中,其实保证有序性的逻辑也是在Sender中,主要依赖了Map<TopicPartition, Long> muted这个数据结构,key是对应的tp,value表示tp静默到什么时候结束。整个保证有序性的逻辑主要分布在三个方法中Sender.sendProducerData(), RecordAccumulator.ready(),RecordAccumulator.drainBatchesForOneNode()。

    2.1 ready()

    public ReadyCheckResult ready(Cluster cluster, long nowMs) {
        Set<Node> readyNodes = new HashSet<>();
        long nextReadyCheckDelayMs = Long.MAX_VALUE;
        Set<String> unknownLeaderTopics = new HashSet<>();
    
        boolean exhausted = this.free.queued() > 0;
        for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
            TopicPartition part = entry.getKey();
            Deque<ProducerBatch> deque = entry.getValue();
    
            Node leader = cluster.leaderFor(part);
            synchronized (deque) {
                if (leader == null && !deque.isEmpty()) {
                    // This is a partition for which leader is not known, but messages are available to send.
                    // Note that entries are currently not removed from batches when deque is empty.
                    unknownLeaderTopics.add(part.topic());
                } else if (!readyNodes.contains(leader) && !isMuted(part, nowMs)) {
                    ProducerBatch batch = deque.peekFirst();
                    if (batch != null) {
                        long waitedTimeMs = batch.waitedTimeMs(nowMs);
                        boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
                        long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                        boolean full = deque.size() > 1 || batch.isFull();
                        boolean expired = waitedTimeMs >= timeToWaitMs;
                        boolean sendable = full || expired || exhausted || closed || flushInProgress();
                        if (sendable && !backingOff) {
                            readyNodes.add(leader);
                        } else {
                            long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                            // Note that this results in a conservative estimate since an un-sendable partition may have
                            // a leader that will later be found to have sendable data. However, this is good enough
                            // since we'll just wake up and then sleep again for the remaining time.
                            nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                        }
                    }
                }
            }
        }
        return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
    }
    

    本方法的主要逻辑是遍历所有的topicPartitions,返回对应tp所对应的结点。在遍历的过程中,我们发现会判断对应的tp是否在集合muted中,!readyNodes.contains(leader) && !isMuted(part, nowMs)。如果在集合muted中,则跳过该tp对应的结点,表明该tp存在已经准备发送的数据。根据字意,我们称tp所处的这种状态为处于静默状态(mute)。

    2.2 drainBatchesForOneNode()

    private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) {
            int size = 0;
            List<PartitionInfo> parts = cluster.partitionsForNode(node.id()); //leader partitions
            List<ProducerBatch> ready = new ArrayList<>();
            /* to make starvation less likely this loop doesn't start at 0 */
            int start = drainIndex = drainIndex % parts.size();
            do {
                PartitionInfo part = parts.get(drainIndex);
                TopicPartition tp = new TopicPartition(part.topic(), part.partition());
                this.drainIndex = (this.drainIndex + 1) % parts.size();
    
                // Only proceed if the partition has no in-flight batches.
                if (isMuted(tp, now))
                    continue;
    
                Deque<ProducerBatch> deque = getDeque(tp);
                if (deque == null)
                    continue;
    
                synchronized (deque) {
                    // invariant: !isMuted(tp,now) && deque != null
                    ProducerBatch first = deque.peekFirst();
                    if (first == null)
                        continue;
    
                    // first != null
                    boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs;
                    // Only drain the batch if it is not during backoff period.
                    if (backoff)
                        continue;
    
                    if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) {
                        // there is a rare case that a single batch size is larger than the request size due to
                        // compression; in this case we will still eventually send this batch in a single request
                        break;
                    } else {
                        if (shouldStopDrainBatchesForPartition(first, tp))
                            break;
    
                        boolean isTransactional = transactionManager != null ? transactionManager.isTransactional() : false;
                        ProducerIdAndEpoch producerIdAndEpoch =
                            transactionManager != null ? transactionManager.producerIdAndEpoch() : null;
                        ProducerBatch batch = deque.pollFirst();
                        if (producerIdAndEpoch != null && !batch.hasSequence()) {
                            // If the batch already has an assigned sequence, then we should not change the producer id and
                            // sequence number, since this may introduce duplicates. In particular, the previous attempt
                            // may actually have been accepted, and if we change the producer id and sequence here, this
                            // attempt will also be accepted, causing a duplicate.
                            //
                            // Additionally, we update the next sequence number bound for the partition, and also have
                            // the transaction manager track the batch so as to ensure that sequence ordering is maintained
                            // even if we receive out of order responses.
                            batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
                            transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
                            log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " +
                                    "{} being sent to partition {}", producerIdAndEpoch.producerId,
                                producerIdAndEpoch.epoch, batch.baseSequence(), tp);
    
                            transactionManager.addInFlightBatch(batch);
                        }
                        batch.close();
                        size += batch.records().sizeInBytes();
                        ready.add(batch);
    
                        batch.drained(now);
                    }
                }
            } while (start != drainIndex);
            return ready;
        }
    

    该方法会遍历某个结点上的所有topicPartition,取对应tp消息队列上的最早的ProducerBatch,然后返回。在这个过程中,也会判断当前的tp在此刻是否处于静默状态isMuted(tp, now),

    private boolean isMuted(TopicPartition tp, long now) {
            boolean result = muted.containsKey(tp) && muted.get(tp) > now;
            if (!result)
                muted.remove(tp);
            return result;
        }
    

    2.3 sendProducerData()

    private long sendProducerData(long now) {
        Cluster cluster = metadata.fetch();
        // get the list of partitions with data ready to send
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
    
        // if there are any partitions whose leaders are not known yet, force metadata update
        if (!result.unknownLeaderTopics.isEmpty()) {
            // The set of topics with unknown leader contains topics with leader election pending as well as
            // topics which may have expired. Add the topic again to metadata to ensure it is included
            // and request metadata update, since there are messages to send to the topic.
            for (String topic : result.unknownLeaderTopics)
                this.metadata.add(topic);
    
            log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
                result.unknownLeaderTopics);
            this.metadata.requestUpdate();
        }
    
        // remove any nodes we aren't ready to send to
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
            }
        }
    
        // create produce requests
        Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
        addToInflightBatches(batches);
        if (guaranteeMessageOrder) {
            // Mute all the partitions drained
            for (List<ProducerBatch> batchList : batches.values()) {
                for (ProducerBatch batch : batchList)
                    this.accumulator.mutePartition(batch.topicPartition);
            }
        }
    
        accumulator.resetNextBatchExpiryTime();
        List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
        List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
        expiredBatches.addAll(expiredInflightBatches);
    
        // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
        // for expired batches. see the documentation of @TransactionState.resetProducerId to understand why
        // we need to reset the producer id here.
        if (!expiredBatches.isEmpty())
            log.trace("Expired {} batches in accumulator", expiredBatches.size());
        for (ProducerBatch expiredBatch : expiredBatches) {
            String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
                + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
            failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);
            if (transactionManager != null && expiredBatch.inRetry()) {
                // This ensures that no new batches are drained until the current in flight batches are fully resolved.
                transactionManager.markSequenceUnresolved(expiredBatch.topicPartition);
            }
        }
        sensors.updateProduceRequestMetrics(batches);
    
        // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
        // loop and try sending more data. Otherwise, the timeout will be the smaller value between next batch expiry
        // time, and the delay time for checking data availability. Note that the nodes may have data that isn't yet
        // sendable due to lingering, backing off, etc. This specifically does not include nodes with sendable data
        // that aren't ready to send since they would cause busy looping.
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
        pollTimeout = Math.max(pollTimeout, 0);
        if (!result.readyNodes.isEmpty()) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            // if some partitions are already ready to be sent, the select time would be 0;
            // otherwise if some partition already has some data accumulated but not ready yet,
            // the select time will be the time difference between now and its linger expiry time;
            // otherwise the select time will be the time difference between now and the metadata expiry time;
            pollTimeout = 0;
        }
        sendProduceRequests(batches, now);
        return pollTimeout;
    }
    

    是否需要保证消息的有序性,其实要依据具体的场景来看,如果要求消息的有序性,那么准备好要发送的ProducerBatch对应的tp都要进入静默状态,防止后面的消息也进入请求发送队列。Kafka支持通过配置来决定是否开启有序性。

    /** <code>max.in.flight.requests.per.connection</code> */
    public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection";
    private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking." + " Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of" + " message re-ordering due to retries (i.e., if retries are enabled).";
    

    如果设置max.in.flight.requests.per.connection=1,表示开启有序性,否则就不保证有序性。

    2.4 completeBatch()

    该方法主要处理返回来的消息响应,同时取消对应的tp的静默状态

    .....
    if (guaranteeMessageOrder)
                this.accumulator.unmutePartition(batch.topicPartition, throttleUntilTimeMs);
    

    三、总结

    本文介绍了Producer端如何保证单个partition内的消息的有序性,这个特性并不是必须的,需要结合具体的场景来看。如果关闭有序性,理论上的吞入率会更高,这需要经过一定的取舍。下篇文章我们会介绍Kafka是如何实现幂等性的。

    相关文章

      网友评论

          本文标题:Kafka之Producer端如何保证partition内消息的

          本文链接:https://www.haomeiwen.com/subject/hcuikctx.html