前面三回在分析生产者时,重点在发送的主流程上:怎么生产,怎么发送,怎么调度。略过了一个重要的环节:Metadata,Producer中Metadata的更新。
在KafkaProducer.doSend中第一句就是确认数据要发送到的topic的Metadata是可用的
ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
当Metadata不可用的时,是不能发送消息的。因此在发送之前,必须确保topic对应的Metadata是可用的。每个topic中的每个partition都必须知道其对应的broker是多少,leader在哪里。
Metadata中保存了集群信息:Cluster
private final List nodes; // 节点列表
private final Map<TopicPartition,PartitionInfo> partitionsByTopicPartition; // topic分区信息对应分区详细信息:主副本,所有副本,IRS等信息
private final Map<String,List<PartitionInfo>> partitionsByTopic; // topic下所有分区的详细信息
private final Map< String,List< PartitionInfo >> availablePartitionsByTopic; // 可用的分区(leader在)对应的topic下的 分区的详细信息
private final Map<Integer,List< PartitionInfo >> partitionsByNode; // 节点和 分区的详细信息
private final Map<Integer,node> nodesById; // 节点ID和对应的节点信息
所有在生产者中,想要获取kafka集群的信息,都能在Metadata中获取。
每个KafkaProducer持有一个Metadata实例,其中KafkaProducer可能会被多个线程所持有调度,因此Metadata可以被多线程读取,在KafkaProducer中的Sender发送线程中更新Metadata,所以对Metadata的操作必须是线程安全的。这样在并发环境下,读取到的Metadata才是一致的。查看源码对Metadata的操作方法全是:synchronized。
waitOnMetadata
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) {
metadata.add(topic); // 在metadata中添加topic 保存到Map,如果需要更新又是不是全部更新的情况下,就会更新在 Map中保存的topic的metadata。
Cluster cluster = metadata.fetch(); // 获取元数据中保存的集群信息
// 从元数据保存的集群信息中获取此topic的分区数量,如果存在则返回该topic的partition数,否则返回null
Integer partitionsCount = cluster.partitionCountForTopic(topic);
// 当前此topic的分区数量存在,并且大于准备发送的partition小于topic的分区数量
// 如果出现大于的情况,则可能是有分区所在的节点宕机了。所以要做更新操作,更新元数据中保存的集群信息
// partition==null 就是让生产者根据规则决定发送到哪个分区上
if (partitionsCount != null && (partition == null || partition < partitionsCount))
return new ClusterAndWaitTime(cluster, 0);
long begin = time.milliseconds();
long remainingWaitMs = maxWaitMs;
long elapsed;
// 唤醒Sender线程发送获取metadata请求,直到获取了这个topic的metadata或者请求超时
do {
int version = metadata.requestUpdate(); // 把更新标志置为true,并返回当前元数据的版本号 sender.wakeup(); // 唤醒Sender,发送获取metadata请求
try {
metadata.awaitUpdate(version, remainingWaitMs); // 等待metadata的更新完成
} catch (TimeoutException ex) {
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
}
cluster = metadata.fetch(); // 获取新的元数据中保存的集群信息
elapsed = time.milliseconds() - begin;
if (elapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
if (cluster.unauthorizedTopics().contains(topic)) // 认证失败
throw new TopicAuthorizationException(topic);
remainingWaitMs = maxWaitMs - elapsed;
partitionsCount = cluster.partitionCountForTopic(topic);
} while (partitionsCount == null); // 不停循环,直到 partitionsCount不为 null(即直到元数据保存的集群信息中获取此topic的分区数量)
if (partition != null && partition >= partitionsCount) {
throw new KafkaException( String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount));
}
return new ClusterAndWaitTime(cluster, elapsed);
}
有topic需要更新,设置Metadata的更新标志为true,并唤醒Sender发送线程进行Metadata更新请求的发送。
Sender.poll
在Sender.poll也会设置更新Metadata的标志,在调用RecordAccumulator.ready方法的时候,如果发现有topic-partition的leader是未知的,设置Metadata的更新标志为true。在Sender.poll回调用NetworkClient.poll。NetworkClient来负责发送Metadata请求,并处理Server端的响应。
NetworkClient.poll
在前面的方法中只是把Metadata的更新标志设置为true,真正发送更新Metadata请求是在NetworkClient中。
public List poll(long timeout, long now) {
long metadataTimeout = metadataUpdater.maybeUpdate(now); // 判断是否需要更新 meta,如果需要就更新发送更新请求
}
metadataUpdater默认实现方式是:DefaultMetadataUpdater(NetworkClient的内部类)。
DefaultMetadataUpdater.maybeUpdate
public long maybeUpdate(long now) {
// metadata下次更新的时间(需要判断是强制更新还是 metadata过期更新,前者是立马更新,后者是计算 metadata的过期时间)
long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
// 如果一条metadata的请求还未收到服务端的响应,设置等待时间为Integer.MAX_VALUE long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;
long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt), waitForMetadataFetch);
if (metadataTimeout == 0) {
Node node = leastLoadedNode(now); // 选择一个连接数最小的节点
maybeUpdate(now, node); // 可以发送 metadata 请求的话,就发送 metadata 请求
}
return metadataTimeout;
}
NetworkClient.leastLoadedNode
public Node leastLoadedNode(long now) {
List nodes = this.metadataUpdater.fetchNodes();
int inflight = Integer.MAX_VALUE;
Node found = null;
int offset = this.randOffset.nextInt(nodes.size());
for (int i = 0; i < nodes.size(); i++) {
int idx = (offset + i) % nodes.size();
Node node = nodes.get(idx);
int currInflight = this.inFlightRequests.inFlightRequestCount(node.idString());
if (currInflight == 0 && this.connectionStates.isConnected(node.idString())) {
return node;
} else if (!this.connectionStates.isBlackedOut(node.idString(), now) && currInflight < inflight) {
inflight = currInflight;
found = node;
}
}
return found;
}
选择连接数小的节点
1,从inFlightRequests(在客户端缓存还没有收到响应的客户端请求)中获取节点上的请求数
2,如果==0,就直接选择此节点返回
3,如果>=,就循环选择其中一个连接数最小的节点返回
NetworkClient.maybeUpdate
private void maybeUpdate(long now, Node node) {
if (node == null) {
this.lastNoNodeAvailableMs = now;
return;
}
String nodeConnectionId = node.idString();
if (canSendRequest(nodeConnectionId)) {// 通道已经 ready并且现在可以发送请求
// 准备开始发送数据,将metadataFetchInProgress置为true
// 后面判断是否有没有收到服务端响应的Metadata请求使用 t
his.metadataFetchInProgress = true;
MetadataRequest metadataRequest; // 创建metadata请求
if (metadata.needMetadataForAllTopics()) // 强制更新所有 topic 的 metadata(虽然默认不会更新所有 topic 的 metadata 信息,但是每个 Broker 会保存所有 topic 的 meta 信息) metadataRequest = MetadataRequest.allTopics();
else // 只更新 metadata 中的 topics 列表(列表中的 topics 由 metadata.add() 得到) metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics()));
// 构建metadata 请求
ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest); log.debug("Sending metadata request {} to node {}", metadataRequest, node.id()); doSend(clientRequest, now); // 发送 metadata 请求
} else if (connectionStates.canConnect(nodeConnectionId, now)) { // 如果没有连接这个 node,那就初始化连接
initiateConnect(node, now); // 初始化连接
} else {
this.lastNoNodeAvailableMs = now;
}
}
判断选择的节点能否接受请求,如果可以,就构建MetadataRequest。并设置到KafkaChannel上,按照正常的发送流程进行MetadataRequest发送。
这里大家或许有疑问,在这里把MetadataRequest设置到KafkaChannel上,能设置成功吗?当然是可以的,因为在发送消息的第一句代码是如果Metadata需要更新就会走上面的流程来发送MetadataRequest。正常处理完收到的服务端响应,把Metadata更新完成后,才会去构建消息的发送请求。
在MetadataRequest发送出去后,在NetworkClient.poll中处理服务端的响应。
public List poll(long timeout, long now) {
handleCompletedReceives(responses, updatedNow);
}
NetworkClient.handleCompletedReceives
private void handleCompletedReceives(List responses, long now) {
for (NetworkReceive receive : this.selector.completedReceives()) {
String source = receive.source();
ClientRequest req = inFlightRequests.completeNext(source);
Struct body = parseResponse(receive.payload(), req.request().header());
if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
responses.add(new ClientResponse(req, now, false, body));
}
}
收到服务端的响应,清除inFlightRequests中保存的对应的请求信息。如果是Metadata的响应,构建MetadataResponse。更新Metadata中的集群信息。
参考文档:
网友评论