Kafka Produce流程

作者: WestC | 来源:发表于2018-07-09 17:34 被阅读18次

    Kafka是一个消息订阅系统,通过接收消息顺序存储在本地磁盘,以便后端应用从kafka读取消息。本文基于Kafka 0.10.0版本对kafka的消息发送流程进行分析:

    确认消息要发送到哪个分区:

    Record的partition确认方法:

    record的partition为非空且合法(0 =< partition <= topic.partitions.size)时,直接使用record中的partition

    record的partition为空时,通过partitioner的的partition方法得到record的partition。此处的分区选择已经和之前版本的分区选择发生了变化,不再是选中一个partition使用10min,然后再次选择一个partition,而是使用如下的方案(当然用户可以根据需求自定义通过partitioner.class来设置自己的partitioner):

    //所谓的availablepartitions也就是存在leader的partition
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = counter.getAndIncrement();
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            //key为空时,使用一个AtomicInteger的累加值对可用partitions的值取
            if (availablePartitions.size() > 0) {
                int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                //可用partitions为o个时,同样使用AtomicInteger的累加值对topic.partitions.size取余
                // no partitions are available, give a non-available partition
                return DefaultPartitioner.toPositive(nextValue) % numPartitions;
            }
        } else {
        //key为非空时,直接使用key的hash值对topic.partitions.size取余
            // hash the keyBytes to choose a partition
            return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }
    

    将消息放置recordBatch中

    RecordAccumulator 作为一个queue将消息累计放置在memoryRecords中,以便成批发送至server

    调用RecordAccumulator的append方法,将消息放置在对应partition的recordBatch中,以待发送,主要逻辑如下:

    public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
        if (!this.records.hasRoomFor(key, value)) {
            return null;
        } else {
            //消息放入records中
            long checksum = this.records.append(offsetCounter++, timestamp, key, value);
            this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
            //此时会更新lastAppendTime
            this.lastAppendTime = now;
            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                                   timestamp, checksum,
                                                                   key == null ? -1 : key.length,
                                                                   value == null ? -1 : value.length);
            if (callback != null)
                thunks.add(new Thunk(callback, future));
            this.recordCount++;
            return future;
        }
    }
    

    此时仅仅是将消息record放入memoryRecords中,然而消息是何时发送出去的呢

    独立消息发送sender线程

    KafkaProducer在初始化时会初始化并启动名为kafka-producer-network-thread的线程。线程的run方法会循环执行如下run(long now)方法:

    void run(long now) {
        Cluster cluster = metadata.fetch();
        // get the list of partitions with data ready to send
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
    
        // if there are any partitions whose leaders are not known yet, force metadata update
        // 如果有partition找不到leader,则需要设置重新获取metadata的标志
        if (result.unknownLeadersExist)
            this.metadata.requestUpdate();
    
        // remove any nodes we aren't ready to send to
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        // 根据netWorkClient中维持的与各broker的链接信息,去除部分链接状态无效的readyNodes
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
            }
        }
    
        // 将每个batch要发送的消息与每个ready节点对应起来
        Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
                                                                         result.readyNodes,
                                                                         this.maxRequestSize,
                                                                         now);
        if (guaranteeMessageOrder) {
            // Mute all the partitions drained
            for (List<RecordBatch> batchList : batches.values()) {
                for (RecordBatch batch : batchList)
                    this.accumulator.mutePartition(batch.topicPartition);
            }
        }
    
        // 将一些长时间没有发送出去的batch,置为expire状态
        List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
        // update sensors
        for (RecordBatch expiredBatch : expiredBatches)
            this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
    
        sensors.updateProduceRequestMetrics(batches);
        
        // 为每一个node创建producerequest
        List<ClientRequest> requests = createProduceRequests(batches, now);
        // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
        // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
        // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
        // with sendable data that aren't ready to send since they would cause busy looping.
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        if (result.readyNodes.size() > 0) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            log.trace("Created {} produce requests: {}", requests.size(), requests);
            pollTimeout = 0;
        }
        //发送request
        for (ClientRequest request : requests)
            client.send(request, now);
    
        // if some partitions are already ready to be sent, the select time would be 0;
        // otherwise if some partition already has some data accumulated but not ready yet,
        // the select time will be the time difference between now and its linger expiry time;
        // otherwise the select time will be the time difference between now and the metadata expiry time;
        // 发送获取metadata请求,handle各种发request送和返回的response,handle各种连接状态
        this.client.poll(pollTimeout, now);
    }
    

    可以看出,该run方法主要有如下流程:

    • 获取cluster信息,cluster类包含了topic,broker等信息,可以用来表示一个kafka集群
    • 获取可以发送消息的Node节点(readyNode),如果有partition找不到leader,则标志需要更新metadata,如果node连接异常,则从readyNode中去除
    • 找出本次需要发送消息的Node和要发送至该节点的recordBatch
    • 清理“长时间”没有发送出去的recordBatch
    • 创建并发送produce 请求
    • 发送获取metadata请求,handle各种发request送和返回的response,handle各种连接状态

    Cluster 信息

    cluster类包含了集群的broker,topic,partition等信息,客户端可以通过cluster类完成与集群的交互,如下:

    private final boolean isBootstrapConfigured;
    private final List<Node> nodes;
    private final Set<String> unauthorizedTopics;
    private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
    private final Map<String, List<PartitionInfo>> partitionsByTopic;
    private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
    private final Map<Integer, List<PartitionInfo>> partitionsByNode;
    private final Map<Integer, Node> nodesById;
    

    查找readyNodes

    //先找出readyNodes 
    public ReadyCheckResult ready(Cluster cluster, long nowMs) {
        Set<Node> readyNodes = new HashSet<>();
        long nextReadyCheckDelayMs = Long.MAX_VALUE;
        boolean unknownLeadersExist = false;
        boolean exhausted = this.free.queued() > 0;
        for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
            TopicPartition part = entry.getKey();
            Deque<RecordBatch> deque = entry.getValue();
            Node leader = cluster.leaderFor(part);
            if (leader == null) {
                unknownLeadersExist = true;
            } else if (!readyNodes.contains(leader) && !muted.contains(part)) {
                synchronized (deque) {
                    RecordBatch batch = deque.peekFirst();
                    if (batch != null) {
                    //判断是否需要backoff attempts lastAttemptMs的值是在上次发送失败后,handleResponse时更新
                        boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
                        long waitedTimeMs = nowMs - batch.lastAttemptMs;
                        long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                        long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                        boolean full = deque.size() > 1 || batch.records.isFull();
                        boolean expired = waitedTimeMs >= timeToWaitMs;
                        // 根据如下条件判断该节点是否有可以发送的消息
                        boolean sendable = full || expired || exhausted || closed || flushInProgress();
                        if (sendable && !backingOff) {
                            readyNodes.add(leader);
                        } else {
                            // Note that this results in a conservative estimate since an un-sendable partition may have
                            // a leader that will later be found to have sendable data. However, this is good enough
                            // since we'll just wake up and then sleep again for the remaining time.
                            nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                        }
                    }
                }
            }
        }
    
        return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist);
    }
    
    此处的核心逻辑为如何判断一个节点本次可以发送消息:
     (  full || expired || exhausted || closed || flushInProgress() ) && (!backingOff)
     full : 该partition有多于一个batch或者当前batch处于full状态
     expired : 已经等待的时间(当前时间 -  上次尝试时间)  >  本身需要等待的时间(if(需要backoff) : retry的backoff else lingerMs)
     exhausted : 有消息处在分配状态,内存还没有回收
     closed : 此recordAccuulator已经被关闭
     flushInProgress : 业务层调用了flush方法
    

    找出Node以及该Node要发送的消息的对应关系

    过程 : 遍历所有的readyNodes,针对每个readyNode,找出该Node上面的partition,如果该partition存在queued的recordBatch,且符合发送条件(size 超过限制 且针对该Node已经有partition要发送,则跳过该batch的发送)则将该消息放入List<RecordBatch>中返回

    //遍历readyNodes
    for (Node node : nodes) {
            int size = 0;
            //找出该Node的所有partitions
            List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
            List<RecordBatch> ready = new ArrayList<>();
            /* to make starvation less likely this loop doesn't start at 0 */
            int start = drainIndex = drainIndex % parts.size();
            do {
                PartitionInfo part = parts.get(drainIndex);
                TopicPartition tp = new TopicPartition(part.topic(), part.partition());
                // Only proceed if the partition has no in-flight batches.
                if (!muted.contains(tp)) {
                    //找出partition对应的deque
                    Deque<RecordBatch> deque = getDeque(new TopicPartition(part.topic(), part.partition()));
                    if (deque != null) {
                        synchronized (deque) {
                        //取出第一个recordBach
                            RecordBatch first = deque.peekFirst();
                            if (first != null) {
                                boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now;
                                // Only drain the batch if it is not during backoff period.
                                if (!backoff) {
                                // size 超过限制 且针对该Node已经有partition要发送,则跳过该batch的发送
                                    if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) {
                                        // there is a rare case that a single batch size is larger than the request size due
                                        // to compression; in this case we will still eventually send this batch in a single
                                        // request
                                        break;
                                    } else {
                                    //更新drainedMs
                                        RecordBatch batch = deque.pollFirst();
                                        batch.records.close();
                                        size += batch.records.sizeInBytes();
                                        ready.add(batch);
                                        batch.drainedMs = now;
                                    }
                                }
                            }
                        }
                    }
                }
                this.drainIndex = (this.drainIndex + 1) % parts.size();
            } while (start != drainIndex);
            batches.put(node.id(), ready);
    

    找出expiredRecordBatch并清理

    过程 : 遍历batches,针对每一个partition的recordBatch,判断该batch是否过期,如果过期,则清理。

    for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
        Deque<RecordBatch> dq = entry.getValue();
        TopicPartition tp = entry.getKey();
        if (!muted.contains(tp)) {
                synchronized (dq) {
                    // iterate over the batches and expire them if they have been in the accumulator for more than requestTimeOut
                    RecordBatch lastBatch = dq.peekLast();
                    Iterator<RecordBatch> batchIterator = dq.iterator();
                    while (batchIterator.hasNext()) {
                        RecordBatch batch = batchIterator.next();
                        boolean isFull = batch != lastBatch || batch.records.isFull();
                        // check if the batch is expired
                        if (batch.maybeExpire(requestTimeout, retryBackoffMs, now, this.lingerMs, isFull)) {
                            expiredBatches.add(batch);
                            count++;
                            batchIterator.remove();
                            deallocate(batch);
                        } else {
                            // Stop at the first batch that has not expired.
                            break;
                        }
                    }
                }
            }
        }
        
        过期判断方法如下:
            /*lastAppendTime :  创建时生成,append消息时更新*/
            /*createdMs : 创建batch时生成*/
            /*lastAttemptMs : 创建batch时生成,批次发送失败后,如果可以重试,则会重新设置该值*/
        
            public boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs, boolean isFull) {
            boolean expire = false;
            //首次发送,batch 已经处于full状态,now > requestTimeoutMs +  lastAppendTime
            if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime))
                expire = true;
            //首次发送,now >  this.createdMs + requestTimeoutMs + lingerMs
            else if (!this.inRetry() && requestTimeoutMs < (now - (this.createdMs + lingerMs)))
                expire = true;
            // 处于retry状态,now > this.lastAttemptMs + retryBackoffMs + requestTimeoutMs
            else if (this.inRetry() && requestTimeoutMs < (now - (this.lastAttemptMs + retryBackoffMs)))
                expire = true;
            if (expire) {
                this.records.close();
                this.done(-1L, Record.NO_TIMESTAMP, new TimeoutException("Batch containing " + recordCount + " record(s) expired due to timeout while requesting metadata from brokers for " + topicPartition));
            }
            return expire;
        }
    

    创建和发送produce请求

    //为每个节点创建一个produceRequest请求
    private List<ClientRequest> createProduceRequests(Map<Integer, List<RecordBatch>> collated, long now) {
        List<ClientRequest> requests = new ArrayList<ClientRequest>(collated.size());
        for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet())
            requests.add(produceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue()));
        return requests;
    }
    //遍历requests并发出
    for (ClientRequest request : requests)
            client.send(request, now);
    

    更新metadata(cluster)并处理response以及连接问题

    /*lastSuccessfulRefreshMs : 初始值为0,每次更新成功后,刷新该值*/
    /*lastRefreshMs : 初始值为0,每次更新成功或失败后,都会刷新该值*/
    /*refreshBackoffMs : 初始化metadata时生成,由参数metadata.max.age.ms控制*/
    public synchronized long timeToNextUpdate(long nowMs) {
        long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
        long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;
        return Math.max(timeToExpire, timeToAllowUpdate);
    }
    
    /*lastNoNodeAvailableMs : 上次调用metadataupdate方法但没有可用node节点的时间*/
    /*metadataFetchInProgress : 默认false,在调用metadataupdate时设置为true,调用完毕设置为false*/
    /*refreshBackoffMs : 初始化metadata时生成,由参数retry.backoff.ms控制*/
    public long maybeUpdate(long now) {
        // should we update our metadata?
        long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
        long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
        long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;
        // if there is no node available to connect, back off refreshing metadata
        long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
                waitForMetadataFetch);
    
            if (metadataTimeout == 0) {
                // Beware that the behavior of this method and the computation of timeouts for poll() are
                // highly dependent on the behavior of leastLoadedNode.
                Node node = leastLoadedNode(now);
                maybeUpdate(now, node);
            }
    
            return metadataTimeout;
        }
    
        // Do actual reads and writes to sockect
        public List<ClientResponse> poll(long timeout, long now) {
            // 判断是否需要更新metadata
            long metadataTimeout = metadataUpdater.maybeUpdate(now);
            try {
                this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
            } catch (IOException e) {
                log.error("Unexpected error during I/O", e);
            }
    
        // process completed actions
        long updatedNow = this.time.milliseconds();
        List<ClientResponse> responses = new ArrayList<>();
        //处理各种send, response,断掉的连接,新建连接,超时请求
        handleCompletedSends(responses, updatedNow);
        handleCompletedReceives(responses, updatedNow);
        handleDisconnections(responses, updatedNow);
        handleConnections();
        handleTimedOutRequests(responses, updatedNow);
    
        // invoke callbacks 
        for (ClientResponse response : responses) {
            if (response.request().hasCallback()) {
                try {
                    response.request().callback().onComplete(response);
                } catch (Exception e) {
                    log.error("Uncaught error in request completion:", e);
                }
            }
        }
    
        return responses;
    }
    

    常用的参数:

    RecordAccumulator :消息“累加器”,kafka消息发送并非每条消息发送一个请求,而是会将消息放入“累加器”中,以recordBatch的方式发送,可通过一些参数控制相关逻辑,可配置参数如下:

    参数 功能
    batch.size 一条消息占用内存的最小值
    buffer.memory 总共可用内存空间
    linger.ms 判断batch expire使用,给判断 batch expire加上一个固定浮动时间
    compression.type 压缩类型
    retry.backoff.ms batch retry的时间间隔

    NetWorkClient : 是一个对客户端与kafka集群连接的封装,管理IO连接,屏蔽了消息发送接收细节,可以通过传递一些参数来控制交互细节。

    参数 功能
    connections.max.idle.ms 一个连接可以处于idle状态的最长时间
    max.in.flight.requests.per.connection 对于单节点每次可发送的最大batch数,为1时,可以保证消息发送的顺序性,设置为大于1的值,则可能导致消息发送不是完全顺序
    reconnect.backoff.ms 连接断掉重新创建的间隔
    send.buffer.bytes 每次发送消息buffer(SO_SNDBUF)的大小
    receive.buffer.bytes 每次接收消息buffer(SO_RCVBUF)的大小
    request.timeout.ms 发送请求后等待response的时间

    Sender : Sender启动独立线程完成消息发送,可通过部分参数控制相关逻辑,有如下参数可配置

    参数 功能
    max.request.size 每次课发送消息的最大size
    acks 发送消息“可靠性”模式控制
    retries 消息发送失败后可重试的最大次数

    相关的代码逻辑课参考

    org.apache.kafka.clients.producer.KafkaProducer
    org.apache.kafka.clients.producer.internals.RecordAccumulator
    org.apache.kafka.clients.producer.internals.RecordBatch
    org.apache.kafka.clients.producer.internals.Sender
    org.apache.kafka.clients.NetworkClient
    

    相关的参数详细说明,可参考:

    org.apache.kafka.clients.producer.ProducerConfig
    

    相关文章

      网友评论

        本文标题:Kafka Produce流程

        本文链接:https://www.haomeiwen.com/subject/euptpftx.html