美文网首页Kafka文字欲
无镜--kafka之生产者(四)

无镜--kafka之生产者(四)

作者: 绍圣 | 来源:发表于2018-08-08 13:35 被阅读6次

    前面三回在分析生产者时,重点在发送的主流程上:怎么生产,怎么发送,怎么调度。略过了一个重要的环节:Metadata,Producer中Metadata的更新。

    在KafkaProducer.doSend中第一句就是确认数据要发送到的topic的Metadata是可用的

    ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);

    当Metadata不可用的时,是不能发送消息的。因此在发送之前,必须确保topic对应的Metadata是可用的。每个topic中的每个partition都必须知道其对应的broker是多少,leader在哪里。

    Metadata中保存了集群信息:Cluster

    private final List nodes; // 节点列表

    private final Map<TopicPartition,PartitionInfo> partitionsByTopicPartition; // topic分区信息对应分区详细信息:主副本,所有副本,IRS等信息

    private final Map<String,List<PartitionInfo>> partitionsByTopic; // topic下所有分区的详细信息

    private final Map< String,List< PartitionInfo >> availablePartitionsByTopic; // 可用的分区(leader在)对应的topic下的 分区的详细信息

    private final Map<Integer,List< PartitionInfo  >> partitionsByNode; // 节点和 分区的详细信息

    private final Map<Integer,node> nodesById; // 节点ID和对应的节点信息

    所有在生产者中,想要获取kafka集群的信息,都能在Metadata中获取。

    每个KafkaProducer持有一个Metadata实例,其中KafkaProducer可能会被多个线程所持有调度,因此Metadata可以被多线程读取,在KafkaProducer中的Sender发送线程中更新Metadata,所以对Metadata的操作必须是线程安全的。这样在并发环境下,读取到的Metadata才是一致的。查看源码对Metadata的操作方法全是:synchronized。

    waitOnMetadata

    private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) {

    metadata.add(topic); // 在metadata中添加topic 保存到Map,如果需要更新又是不是全部更新的情况下,就会更新在 Map中保存的topic的metadata。

    Cluster cluster = metadata.fetch(); // 获取元数据中保存的集群信息

    // 从元数据保存的集群信息中获取此topic的分区数量,如果存在则返回该topic的partition数,否则返回null

    Integer partitionsCount = cluster.partitionCountForTopic(topic);

    // 当前此topic的分区数量存在,并且大于准备发送的partition小于topic的分区数量

    // 如果出现大于的情况,则可能是有分区所在的节点宕机了。所以要做更新操作,更新元数据中保存的集群信息

    // partition==null 就是让生产者根据规则决定发送到哪个分区上

    if (partitionsCount != null && (partition == null || partition < partitionsCount))

    return new ClusterAndWaitTime(cluster, 0);

    long begin = time.milliseconds();

    long remainingWaitMs = maxWaitMs;

    long elapsed;

    // 唤醒Sender线程发送获取metadata请求,直到获取了这个topic的metadata或者请求超时

    do {

    int version = metadata.requestUpdate(); // 把更新标志置为true,并返回当前元数据的版本号 sender.wakeup(); // 唤醒Sender,发送获取metadata请求

    try {

    metadata.awaitUpdate(version, remainingWaitMs); // 等待metadata的更新完成

    } catch (TimeoutException ex) {

    throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");

    }

    cluster = metadata.fetch(); // 获取新的元数据中保存的集群信息

    elapsed = time.milliseconds() - begin;

    if (elapsed >= maxWaitMs)

    throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");

    if (cluster.unauthorizedTopics().contains(topic)) // 认证失败

    throw new TopicAuthorizationException(topic);

    remainingWaitMs = maxWaitMs - elapsed;

    partitionsCount = cluster.partitionCountForTopic(topic);

    } while (partitionsCount == null); // 不停循环,直到 partitionsCount不为 null(即直到元数据保存的集群信息中获取此topic的分区数量)

    if (partition != null && partition >= partitionsCount) {

    throw new KafkaException( String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount));

    }

    return new ClusterAndWaitTime(cluster, elapsed);

    }

    有topic需要更新,设置Metadata的更新标志为true,并唤醒Sender发送线程进行Metadata更新请求的发送。

    Sender.poll

    在Sender.poll也会设置更新Metadata的标志,在调用RecordAccumulator.ready方法的时候,如果发现有topic-partition的leader是未知的,设置Metadata的更新标志为true。在Sender.poll回调用NetworkClient.poll。NetworkClient来负责发送Metadata请求,并处理Server端的响应。

    NetworkClient.poll

    在前面的方法中只是把Metadata的更新标志设置为true,真正发送更新Metadata请求是在NetworkClient中。

    public List poll(long timeout, long now) {

    long metadataTimeout = metadataUpdater.maybeUpdate(now); // 判断是否需要更新 meta,如果需要就更新发送更新请求

    }

    metadataUpdater默认实现方式是:DefaultMetadataUpdater(NetworkClient的内部类)。

    DefaultMetadataUpdater.maybeUpdate

    public long maybeUpdate(long now) {

    // metadata下次更新的时间(需要判断是强制更新还是 metadata过期更新,前者是立马更新,后者是计算 metadata的过期时间)

    long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);

    long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);

    // 如果一条metadata的请求还未收到服务端的响应,设置等待时间为Integer.MAX_VALUE long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;

    long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt), waitForMetadataFetch);

    if (metadataTimeout == 0) {

    Node node = leastLoadedNode(now); // 选择一个连接数最小的节点

    maybeUpdate(now, node); // 可以发送 metadata 请求的话,就发送 metadata 请求

    }

    return metadataTimeout;

    }

    NetworkClient.leastLoadedNode

    public Node leastLoadedNode(long now) {

    List nodes = this.metadataUpdater.fetchNodes();

    int inflight = Integer.MAX_VALUE;

    Node found = null;

    int offset = this.randOffset.nextInt(nodes.size());

    for (int i = 0; i < nodes.size(); i++) {

    int idx = (offset + i) % nodes.size();

    Node node = nodes.get(idx);

    int currInflight = this.inFlightRequests.inFlightRequestCount(node.idString());

    if (currInflight == 0 && this.connectionStates.isConnected(node.idString())) {

    return node;

    } else if (!this.connectionStates.isBlackedOut(node.idString(), now) && currInflight < inflight) {

    inflight = currInflight;

    found = node;

    }

    }

    return found;

    }

    选择连接数小的节点

    1,从inFlightRequests(在客户端缓存还没有收到响应的客户端请求)中获取节点上的请求数

    2,如果==0,就直接选择此节点返回

    3,如果>=,就循环选择其中一个连接数最小的节点返回

    NetworkClient.maybeUpdate

    private void maybeUpdate(long now, Node node) {

    if (node == null) {

    this.lastNoNodeAvailableMs = now;

    return;

    }

    String nodeConnectionId = node.idString();

    if (canSendRequest(nodeConnectionId)) {// 通道已经 ready并且现在可以发送请求

    // 准备开始发送数据,将metadataFetchInProgress置为true

    // 后面判断是否有没有收到服务端响应的Metadata请求使用 t

    his.metadataFetchInProgress = true;

    MetadataRequest metadataRequest; // 创建metadata请求

    if (metadata.needMetadataForAllTopics()) // 强制更新所有 topic 的 metadata(虽然默认不会更新所有 topic 的 metadata 信息,但是每个 Broker 会保存所有 topic 的 meta 信息) metadataRequest = MetadataRequest.allTopics();

    else // 只更新 metadata 中的 topics 列表(列表中的 topics 由 metadata.add() 得到) metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics()));

    // 构建metadata 请求

    ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest); log.debug("Sending metadata request {} to node {}", metadataRequest, node.id()); doSend(clientRequest, now); // 发送 metadata 请求

    } else if (connectionStates.canConnect(nodeConnectionId, now)) { // 如果没有连接这个 node,那就初始化连接

    initiateConnect(node, now); // 初始化连接

    } else {

    this.lastNoNodeAvailableMs = now;

    }

    }

    判断选择的节点能否接受请求,如果可以,就构建MetadataRequest。并设置到KafkaChannel上,按照正常的发送流程进行MetadataRequest发送。

    这里大家或许有疑问,在这里把MetadataRequest设置到KafkaChannel上,能设置成功吗?当然是可以的,因为在发送消息的第一句代码是如果Metadata需要更新就会走上面的流程来发送MetadataRequest。正常处理完收到的服务端响应,把Metadata更新完成后,才会去构建消息的发送请求。

    在MetadataRequest发送出去后,在NetworkClient.poll中处理服务端的响应。

    public List poll(long timeout, long now) {

    handleCompletedReceives(responses, updatedNow);

    }

    NetworkClient.handleCompletedReceives

    private void handleCompletedReceives(List responses, long now) {

    for (NetworkReceive receive : this.selector.completedReceives()) {

    String source = receive.source(); 

    ClientRequest req = inFlightRequests.completeNext(source);

    Struct body = parseResponse(receive.payload(), req.request().header());

    if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))

    responses.add(new ClientResponse(req, now, false, body));

    }

    }

    收到服务端的响应,清除inFlightRequests中保存的对应的请求信息。如果是Metadata的响应,构建MetadataResponse。更新Metadata中的集群信息。

    参考文档:

    Kafka 源码解析之 Producer Metadata 更新机制(二)

    Kafka源码深度解析-序列2 -Producer -Metadata的数据结构与读取、更新策略 - CSDN博客

    相关文章

      网友评论

        本文标题:无镜--kafka之生产者(四)

        本文链接:https://www.haomeiwen.com/subject/mmufvftx.html