美文网首页程序员
Kafka源码分析-Producer(8)-总结(1)

Kafka源码分析-Producer(8)-总结(1)

作者: 陈阳001 | 来源:发表于2018-10-02 22:14 被阅读0次

    一.整体流程

    kafka生产者整体架构 (1).png

    步骤:
    1.ProducerInterceptors对消息进行拦截。
    2.Serializer对消息的key和value进行序列化。
    3.Partitioner为消息选择合适的Partition。
    4.RecordAccumulator收集消息,实现批量发送。
    5.Sender从RecordAccumulator获取消息。
    6.构造ClientRequest。
    7.将ClientRequest交给NetworkClient,准备发送。
    8.NetworkClient将请求送入KafkaChannel的缓存。
    9.执行网络I/O,发送请求。
    10.收到响应,调用ClientRequest的回调函数。
    11.调用RecordBatch的回调函数,最终调用每个消息上注册的回调函数。
    消息发送过程中,涉及两个线程协同工作。主线程首先将业务数据封装成ProducerRecord对象,之后调用send()方法将消息放入RecordAccumulator(消息收集器,也是主线程和sender线程共享的缓冲区)中暂存。Sender线程负责将消息信息构成请求,最终执行网络I/O的线程,它从RecordAccumulator中取出消息并批量发送出去。

    二.主线程:

    1.Metadata更新流程:

    MetaData更新流程.jpg

    第一个阶段

    在KafkaProducer的构造方法里工作,主要目的是初始化MetaData对象,并把MetaData对象传到Sender类里。

    1. 初始化MetaData:
     this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));
    

    retryBackoffMs:避让时间。
    ProducerConfig.METADATA_MAX_AGE_CONFIG:即使没有请求要求更新MetaData,超出一定时间也要更新,默认5min。

    2.第一次调用this.metadata.update方法:

    this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
    

    把用来启动的server node节点给metadata对象内的Cluster属性。
    3.实例化NetworkClient和Sender,并把metadata作为构造方法的参数传进去。
    4.启动执行Sender任务的线程。

    this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
    this.ioThread.start();
    

    第二阶段

    send()主线程发起Metadata的更新。
    1.KafkaProducer调用send方法,会调用waitOnMetadata(record.topic(), this.maxBlockTimeMs)方法。

    /**
         * Wait for cluster metadata including partitions for the given topic to be available.
         * @param topic The topic we want metadata for
         * @param maxWaitMs The maximum time in ms for waiting on the metadata
         * @return The amount of time we waited in ms
         */
        private long waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException {
            // add topic to metadata topic list if it is not there already.
            if (!this.metadata.containsTopic(topic))
                this.metadata.add(topic);
    
            if (metadata.fetch().partitionsForTopic(topic) != null)
                return 0;
    
            long begin = time.milliseconds();
            long remainingWaitMs = maxWaitMs;
            while (metadata.fetch().partitionsForTopic(topic) == null) {
                log.trace("Requesting metadata update for topic {}.", topic);
                int version = metadata.requestUpdate();
                sender.wakeup();
                metadata.awaitUpdate(version, remainingWaitMs);
                long elapsed = time.milliseconds() - begin;
                if (elapsed >= maxWaitMs)
                    throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
                if (metadata.fetch().unauthorizedTopics().contains(topic))
                    throw new TopicAuthorizationException(topic);
                remainingWaitMs = maxWaitMs - elapsed;
            }
            return time.milliseconds() - begin;
        }
    

    2.首先请求metadata.requestUpdate(),修改metadata的属性needUpdate 为 true,同时返回metadata的版本号。
    3.调用metadata.awaitUpdate(version, remainingWaitMs);等待Sender线程调用metadata.update()更新来释放主线程。

    /**
         * Wait for metadata update until the current version is larger than the last version we know of
         */
        public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
            if (maxWaitMs < 0) {
                throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
            }
            long begin = System.currentTimeMillis();
            long remainingWaitMs = maxWaitMs;
            while (this.version <= lastVersion) {
                if (remainingWaitMs != 0)
                    wait(remainingWaitMs);
                long elapsed = System.currentTimeMillis() - begin;
                if (elapsed >= maxWaitMs)
                    throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
                remainingWaitMs = maxWaitMs - elapsed;
            }
        }
    

    第三阶段

    Sender线程负责把更新MetaData的request发给Server端。
    1.Sender线程run()方法轮询调用NetworkClient.poll()。
    2.调用maybeUpdate(long now)。用来判断更新时间是否到了,更新时间到了才会发出更新,而且找到负载最小的 node。
    3.调用maybeUpdate(now, node)。构造ClientRequest对象发送给doSend(clientRequest,now)。

    ...
    ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);
    log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
    doSend(clientRequest, now);//缓存请求,下次poll()操作会将其发送出去
    ...
    

    4.request加入his.inFlightRequests.add(request);

    private void doSend(ClientRequest request, long now) {
            request.setSendTimeMs(now);
            this.inFlightRequests.add(request);
            selector.send(request.request());
        }
    

    5.调用selector.send(request.request()):

    /**
         * Queue the given request for sending in the subsequent {@link #poll(long)} calls
         * @param send The request to send
         */
        public void send(Send send) {
            KafkaChannel channel = channelOrFail(send.destination());
            try {
                channel.setSend(send);
            } catch (CancelledKeyException e) {
                this.failedSends.add(send.destination());
                close(channel);
            }
        }
    

    5.调用selector.send(request.request()):赋值给selector的send属性,同时关注OP_WRITE事件:

    public void setSend(Send send) {
            if (this.send != null)
                throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
            this.send = send;//设置send字段
            this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);//关注OP_WRITE事件
        }
    

    第四阶段:

    处理server端返回用来更新metaData的数据。
    1.NetworkClient.poll()调用selector.poll()轮询到了读事件。

     /* if channel is ready read from any connections that have readable data */
                    if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
                        NetworkReceive networkReceive;//OP_READ事件处理。
    
                        while ((networkReceive = channel.read()) != null)
                        /*
                            上面channel.read()读取到一个完整的 NetworkReceive,则将其添加到stagedReceives中保存,若读取不到一个完整的则将其添加到stagedReceives,则返回null,下次处理OP_READ事件时,继续读取,直到读到一个完整的NetworkReceive。
                         */
                            addToStagedReceives(channel, networkReceive);
                    }
    

    2.调用addToStagedReceives方法,把读的消息放到stagedReceives里对应channel的ArrayDeque。

    /**
         * adds a receive to staged receives
         */
        private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {
            if (!stagedReceives.containsKey(channel))
                stagedReceives.put(channel, new ArrayDeque<NetworkReceive>());
    
            Deque<NetworkReceive> deque = stagedReceives.get(channel);
            deque.add(receive);
        }
    
    1. selector.poll()后,NetworkClient执行handleCompletedReceives()方法:
    /**
         * 从completedReceives得到从broker的返回值 NetworkReceive,然后找到inFlightRequests对应的ClientRequest,
         * 构造一个NetworkReceive和ClientRequest 为参数的ClientResponse,并加入到responses里
         * Handle any completed receives and update the response list with the responses received.
         *
         * @param responses The list of responses to update
         * @param now The current time
         */
        private void handleCompletedReceives(List<ClientResponse> responses, long now) {
            for (NetworkReceive receive : this.selector.completedReceives()) {
                String source = receive.source();//返回响应的NodeId
                //从inFlightRequests中取出对应的ClientRequest
                ClientRequest req = inFlightRequests.completeNext(source);
                //解析响应
                Struct body = parseResponse(receive.payload(), req.request().header());
                //调用MetadataUpdater.maybeHandleCompletedReceive()方法处理
                // MetadataResponse。其中会更新Metadata中记录的集群元数据,并唤醒所有
                //等待Metadata更新完成的线程。
                if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
                    //如果不是MetadataResponse,则创建ClientResponse并添加到response集合里
                    responses.add(new ClientResponse(req, now, false, body));
            }
        }
    

    4.调用metadataUpdater.maybeHandleCompletedReceive(req, now, body)方法:判断是否是metaData的response

    public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) {
                short apiKey = req.request().header().apiKey();
                //检测是否为MetadataRequest请求。
                if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {
                    handleResponse(req.request().header(), body, now);
                    return true;
                }
                return false;
            }
    

    5.如果是metaData的response,调用handleResponse(),处理返回值。
    构造MetadataResponse,取出response.cluster(),调用metadata.update(cluster, now),更新了metadata。

    private void handleResponse(RequestHeader header, Struct body, long now) {
                this.metadataFetchInProgress = false;//收到 MetadataResponse 了,于是修改metadataFetchInProgress=false。
                //解析MetadataResponse
                MetadataResponse response = new MetadataResponse(body);
                //创建Cluster对象
                Cluster cluster = response.cluster();
                // check if any topics metadata failed to get updated。检测 MetadataResponse 里的错误码。
                Map<String, Errors> errors = response.errors();
                if (!errors.isEmpty())
                    log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), errors);
    
                // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
                // created which means we will get errors and no nodes until it exists
                if (cluster.nodes().size() > 0) {
    
                    this.metadata.update(cluster, now);
                } else {
                    log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
                    this.metadata.failedUpdate(now);
                }
            }
    

    6.调用metadata.update(Cluster cluster, long now)。

    • 把this.needUpdate 置为false。
    • 版本号加1,这样前面的metaData.awaitUpdate()方法内就获得了新的版本好,跳出while循环。
    • 调用notifyAll():因为metaData.awaitUpdate()在while循环内调用了wait(),是为了给sender线程更新metaData获取时间的。因为metaData更新完了,再notifyAll()后,主线程就不用wait了。
    /**
         * Update the cluster metadata
         */
        public synchronized void update(Cluster cluster, long now) {
            this.needUpdate = false;
            this.lastRefreshMs = now;
            this.lastSuccessfulRefreshMs = now;
            this.version += 1;
            //1.通知Metadata上的监听器。
            for (Listener listener: listeners)
                listener.onMetadataUpdate(cluster);
            //更新cluster字段。
            // Do this after notifying listeners as subscribed topics' list can be changed by listeners
            this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster;
    
            notifyAll();
            log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
        }
    

    最终,主线程获取到了最新版的metadata对象,可以获得topic的partition,然后继续往下走了。

    2. RecordAccumulator流程:

    RecordAccumulator.append()方法的流程

    RecordAccumulator的append工作流程.jpg
    /**
         * Add a record to the accumulator, return the append result
         * <p>
         * The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created
         * <p>
         *
         * @param tp The topic/partition to which this record is being sent
         * @param timestamp The timestamp of the record
         * @param key The key for the record
         * @param value The value for the record
         * @param callback The user-supplied callback to execute when the request is complete
         * @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available
         */
        public RecordAppendResult append(TopicPartition tp,
                                         long timestamp,
                                         byte[] key,
                                         byte[] value,
                                         Callback callback,
                                         long maxTimeToBlock) throws InterruptedException {
            // We keep track of the number of appending thread to make sure we do not miss batches in
            // abortIncompleteBatches().
            appendsInProgress.incrementAndGet();
            try {
                // check if we have an in-progress batch
                //1.查找TopicPartition对应的Deque
                Deque<RecordBatch> dq = getOrCreateDeque(tp);
                synchronized (dq) {//2.对Deque加锁
                    if (closed)
                        throw new IllegalStateException("Cannot send after the producer is closed.");
                    //3.向Deque中最后一个RecordBatch追加Record
                    RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
                    if (appendResult != null)
                        return appendResult;//4.追加成功返回
                }//5.解锁
    
                // we don't have an in-progress record batch try to allocate a new batch
                int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
                log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
                //6.追加失败,从BufferPool中申请新空间。
                ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
                synchronized (dq) {
                    // Need to check if producer is closed again after grabbing the dequeue lock.
                    if (closed)
                        throw new IllegalStateException("Cannot send after the producer is closed.");
                    //7.对Deque加锁后,再次调用tryAppend()方法尝试追加Record
                    RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
                    //8.追加成功,则返回,释放步骤7申请的新空间
                    if (appendResult != null) {
                        // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                        free.deallocate(buffer);
                        return appendResult;
                    }
                    MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
                    //新建RecordBatch。
                    RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
                    //9.在新创建的RecordBatch中追加Record,并将其添加到Batches集合中
                    FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
    
                    dq.addLast(batch);
                    //10.新创建的RecordBatch中追加到incomplete集合。
                    incomplete.add(batch);
                    //11.返回RecordAppendResult
                    return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
                }//12.解锁
            } finally {
                appendsInProgress.decrementAndGet();
            }
        }
    

    相关文章

      网友评论

        本文标题:Kafka源码分析-Producer(8)-总结(1)

        本文链接:https://www.haomeiwen.com/subject/oawaoftx.html