1 前言
在查看KafkaProducer生产消息的源码过程中,经常能看到对元数据进行更新的请求,粗糙地看了下,Producer Metadata的更新应该分为两种情况:强制更新和定时更新。
那么今天就来看一下这其中的更新机制到底是怎样的呢!
2 源码剖析
2.1 Metadata内容
Metadata类注释说明:
该类封装了关于围绕元数据的逻辑。这个类被client线程和后台sender线程所共享,它只是保存了所有topic的一部分,当我们请求了一个其中没有的topic相关元数据时,它会通过发送metadata update来更新元数据信息。如果topic的元数据信息过期策略是被允许的,那么任何topic过期的话会被从集合中移除。但是consumer是不允许topic过期的,因为它明确地知道它需要管理哪些topic。
public class Metadata implements Closeable {
private static final Logger log = LoggerFactory.getLogger(Metadata.class);
// Topic过期时间
public static final long TOPIC_EXPIRY_MS = 5 * 60 * 1000;
private static final long TOPIC_EXPIRY_NEEDS_UPDATE = -1L;
// metadata 更新失败时,为了避免频繁更新meta,最小的时间间隔,默认为100ms
private final long refreshBackoffMs;
// metadata的过期时间,默认为60,000ms
private final long metadataExpireMs;
// metadata更新版本,每更新1次,version自增1,主要用于判断metadata是否更新
private int updateVersion; // bumped on every metadata response
private int requestVersion; // bumped on every new topic addition
// 最近一次更新时的时间(包含更新失败的)
private long lastRefreshMs;
// 最后一次成功更新的时间(如果每次都成功的话,与lastRefreshMs一致)
private long lastSuccessfulRefreshMs;
private AuthenticationException authenticationException;
// 元数据信息的Cache缓存
private MetadataCache cache = MetadataCache.empty();
// 是否需要更新的标识,每次更新前需要将该属性置为true
private boolean needUpdate;
/* Topics with expiry time */
// topic与其过期时间的对应关系
private final Map<String, Long> topics;
// 事件监控者
private final List<Listener> listeners;
private final ClusterResourceListeners clusterResourceListeners;
// 是否强制更新所有的metadata
private boolean needMetadataForAllTopics;
private final boolean allowAutoTopicCreation;
// 默认为true,Producer会定时移除过期的 topic,consumer 则不会移除
private final boolean topicExpiryEnabled;
private boolean isClosed;
private final Map<TopicPartition, Integer> lastSeenLeaderEpochs;
}
MetadataCache:Kafka集群中关于node,topic和partition的信息。(是只读的)
class MetadataCache {
private final String clusterId;
private final List<Node> nodes;
private final Set<String> unauthorizedTopics;
private final Set<String> invalidTopics;
private final Set<String> internalTopics;
private final Node controller;
private final Map<TopicPartition, PartitionInfoAndEpoch> metadataByPartition;
private Cluster clusterInstance;
}
Cluster:保存Topic的详细信息(leader所在节点,replica所在节点,isr列表)
public final class Cluster {
// 从命名直接就看出了各个变量的用途
private final boolean isBootstrapConfigured;
private final List<Node> nodes; // node 列表
private final Set<String> unauthorizedTopics; // 未认证的 topic 列表
private final Set<String> internalTopics; // 内置的 topic 列表
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition; // partition 的详细信息
private final Map<String, List<PartitionInfo>> partitionsByTopic; // topic 与 partition 的对应关系
private final Map<String, List<PartitionInfo>> availablePartitionsByTopic; // 可用(leader 不为 null)的 topic 与 partition 的对应关系
private final Map<Integer, List<PartitionInfo>> partitionsByNode; // node 与 partition 的对应关系
private final Map<Integer, Node> nodesById; // node 与 broker id 的对应关系
private final ClusterResource clusterResource;
}
// 包含topic,partition,partition的主副本节点,replicas,isr,osr
public class PartitionInfo {
private final String topic;
private final int partition;
private final Node leader;
private final Node[] replicas;
private final Node[] inSyncReplicas;
private final Node[] offlineReplicas;
}
2.2 Metadata更新流程
2.2.1 获取topic的metadata信息
producer在调用doSend()方法,第一步就是通过waitOnMetadata
方法获取该topic的metadata信息。
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
// add topic to metadata topic list if it is not there already and reset expiry
// 获取集群元数据信息
Cluster cluster = metadata.fetch();
if (cluster.invalidTopics().contains(topic))
throw new InvalidTopicException(topic);
//在metadata中添加topic后,如果metadata中没有这个topic的meta,那么metadata的更新标志needUpdate属性设置为了true
metadata.add(topic);
//获取当前这个topic在集群中的分区信息
Integer partitionsCount = cluster.partitionCountForTopic(topic);
// Return cached metadata if we have it, and if the record's partition is either undefined
// or within the known partition range
// 如果当前topic的相关分区信息在metadata里已经有了直接返回元数据(partition为null是因为生产消息时,消息可以不指定分区)
if (partitionsCount != null && (partition == null || partition < partitionsCount))
return new ClusterAndWaitTime(cluster, 0);
long begin = time.milliseconds();
long remainingWaitMs = maxWaitMs;
long elapsed;
// Issue metadata requests until we have metadata for the topic and the requested partition,
// or until maxWaitTimeMs is exceeded. This is necessary in case the metadata
// is stale and the number of partitions for this topic has increased in the meantime.
// 如果metadata没有更新成功,方法一直在循环中
do {
if (partition != null) {
log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);
} else {
log.trace("Requesting metadata update for topic {}.", topic);
}
metadata.add(topic);
//将needUpdate设置为true,表示元数据需要更新
int version = metadata.requestUpdate();
//唤醒sender线程
sender.wakeup();
try {
//等待sender线程去更新元数据信息,更新成功之前一直阻塞主线程
metadata.awaitUpdate(version, remainingWaitMs);
} catch (TimeoutException ex) {
// Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
throw new TimeoutException(
String.format("Topic %s not present in metadata after %d ms.",
topic, maxWaitMs));
}
cluster = metadata.fetch();
elapsed = time.milliseconds() - begin;
//超时
if (elapsed >= maxWaitMs) {
throw new TimeoutException(partitionsCount == null ?
String.format("Topic %s not present in metadata after %d ms.",
topic, maxWaitMs) :
String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
partition, topic, partitionsCount, maxWaitMs));
}
//认证失败,对当前topic没有write权限
if (cluster.unauthorizedTopics().contains(topic))
throw new TopicAuthorizationException(topic);
if (cluster.invalidTopics().contains(topic))
throw new InvalidTopicException(topic);
remainingWaitMs = maxWaitMs - elapsed;
partitionsCount = cluster.partitionCountForTopic(topic);
//不停循环,直到 partitionsCount 不为 null(即直到 metadata 中已经包含了这个 topic 的相关信息)
} while (partitionsCount == null || (partition != null && partition >= partitionsCount));
return new ClusterAndWaitTime(cluster, elapsed);
}
如果 metadata 中不存在这个 topic 的 metadata,那么就请求更新 metadata,如果 metadata 没有更新的话,方法就一直处在 do ... while 的循环之中,在循环之中,主要做以下操作:
(1)metadata.requestUpdate()
将metadata的needUpdate
属性设置为true(强制更新),并且返回当前的版本号(version),通过版本号来判断metadata是否完成更新。
(2)sender.wakeup()
唤醒sender线程,sender线程又会去唤醒NetworkClient
线程,NetworkClient
线程进行一些实际的操作
(3)metadata.awaitUpdate(version, remainingWaitMs);
等待metadata的更新
// 更新metadata信息(根据当前version值来判断)
public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
if (maxWaitMs < 0)
throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milliseconds");
long begin = System.currentTimeMillis();
long remainingWaitMs = maxWaitMs;
//不断循环,知道metadata更新成功,version自增
//通过比较版本号的方式,控制数据一致性
while ((this.updateVersion <= lastVersion) && !isClosed()) {
AuthenticationException ex = getAndClearAuthenticationException();
if (ex != null)
throw ex;
if (remainingWaitMs != 0)
wait(remainingWaitMs); //阻塞线程,等待metadata的更新,超时的话,就会唤醒线程了
long elapsed = System.currentTimeMillis() - begin;
//如果没更新成功的话,elapsed=maxWaitMs
if (elapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
remainingWaitMs = maxWaitMs - elapsed;
}
if (isClosed())
throw new KafkaException("Requested metadata update after close");
}
2.2.2 什么时候需要更新metadata
那么,什么时候,producer metadata是需要更新的呢?
public List<ClientResponse> poll(long timeout, long now) {
ensureActive();
if (!abortedSends.isEmpty()) {
// If there are aborted sends because of unsupported version exceptions or disconnects,
// handle them immediately without waiting for Selector#poll.
List<ClientResponse> responses = new ArrayList<>();
handleAbortedSends(responses);
completeResponses(responses);
return responses;
}
// 判断是否需要更新metadata,如果需要就更新(请求更新metadata的地方)
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
// process completed actions
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow); //通过selector中获取Server端的response
handleCompletedReceives(responses, updatedNow); //在返回handler中,处理metadata的更新
handleDisconnections(responses, updatedNow);
handleConnections();
handleInitiateApiVersionRequests(updatedNow);
handleTimedOutRequests(responses, updatedNow);
completeResponses(responses);
return responses;
}
-
metadataUpdater.maybeUpdate(now)
:判断是否需要更新Metadata,如果需要更新的话,先与Broker建立连接,然后发送更新metadata的请求。 -
处理 Server 端的一些响应,这里主要讨论的是
handleCompletedReceives(responses, updatedNow)
方法,它会处理 Server 端返回的 metadata 结果。
DefaultMetadataUpdater的maybeUpdate方法
public long maybeUpdate(long now) {
// should we update our metadata?
// metadata下次更新的时间(需要判断是强制更新还是metadata过期更新,前者是立马更新,后者是计算)
long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
// 如果一条metadata的fetch请求还没有从server端收到回复,那么设置等待时间为waitForMetadataFetch(默认是30s)
long waitForMetadataFetch = this.hasFetchInProgress() ? defaultRequestTimeoutMs : 0;
long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch);
if (metadataTimeout > 0) { //时间未到时,直接返回下次应该更新的时间
return metadataTimeout;
}
// Beware that the behavior of this method and the computation of timeouts for poll() are
// highly dependent on the behavior of leastLoadedNode.
Node node = leastLoadedNode(now); //选择一个请求连接数最小的节点
if (node == null) {
log.debug("Give up sending metadata request since no node is available");
return reconnectBackoffMs;
}
return maybeUpdate(now, node); //可以发送请求的话,就发送metadata请求
}
计算metadata下次更新的时间,timeToNextUpdate
public synchronized long timeToNextUpdate(long nowMs) {
long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
return Math.max(timeToExpire, timeToAllowUpdate(nowMs));
}
public synchronized long timeToAllowUpdate(long nowMs) {
// lastRefreshMs:最近更新时间(包括失败) refreshBackoffMs:间隔时长
return Math.max(this.lastRefreshMs + this.refreshBackoffMs - nowMs, 0);
}
下次更新元数据信息的时间:当前metadata信息即将到期的时间 即 timeToExpire 和 距离允许更新metadata信息的时间 即 timeToAllowUpdate 中的最大值
timeToExpire:needUpdate为true,表示强制更新,此时该值为0;否则的话,就按照定时更新时间,即元数据信息过期时间(默认是60000ms即1分钟)进行周期性更新
timeToAllowUpdate:默认就是refreshBackoffMs的默认值,即100ms
接着调用私有的maybeUpdata方法
// 判断是否可以发送请求,可以的话就将metadata请求加入到发送列表中
private long maybeUpdate(long now, Node node) {
String nodeConnectionId = node.idString();
if (canSendRequest(nodeConnectionId, now)) { //通道已经ready,并且支持发送更多的请求
// 更新所有topic还是metadata中的topic
Metadata.MetadataRequestAndVersion metadataRequestAndVersion = metadata.newMetadataRequestAndVersion();
inProgressRequestVersion = metadataRequestAndVersion.requestVersion;
// 创建metadata请求
MetadataRequest.Builder metadataRequest = metadataRequestAndVersion.requestBuilder;
log.debug("Sending metadata request {} to node {}", metadataRequest, node);
// 发送metadata请求
sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
return defaultRequestTimeoutMs;
}
// If there's any connection establishment underway, wait until it completes. This prevents
// the client from unnecessarily connecting to additional nodes while a previous connection
// attempt has not been completed.
if (isAnyNodeConnecting()) { // 如果client正在与任何一个node的连接状态是connecting,那么就进行等待
// Strictly the timeout we should return here is "connect timeout", but as we don't
// have such application level configuration, using reconnect backoff instead.
return reconnectBackoffMs;
}
// 如果没有连接这个node,就初始化链接
if (connectionStates.canConnect(nodeConnectionId, now)) {
// we don't have a connection to this node right now, make one
log.debug("Initialize connection to node {} for sending metadata request", node);
// 初始化连接
initiateConnect(node, now);
return reconnectBackoffMs;
}
// connected, but can't send more OR connecting
// In either case, we just need to wait for a network event to let us know the selected
// connection might be usable again.
return Long.MAX_VALUE;
}
public synchronized MetadataRequestAndVersion newMetadataRequestAndVersion() {
final MetadataRequest.Builder metadataRequestBuilder;
// 强制更新所有Topic的metadata(虽然默认不会更新所有topic的metadata信息,但是每个broker会保存所有topic的meta信息)
if (needMetadataForAllTopics)
metadataRequestBuilder = MetadataRequest.Builder.allTopics();
else
// 只会更新metadata中的topics列表(列表中的topics是由metadata.add()得到)
metadataRequestBuilder = new MetadataRequest.Builder(new ArrayList<>(this.topics.keySet()),
allowAutoTopicCreation());
return new MetadataRequestAndVersion(metadataRequestBuilder, requestVersion);
}
因此,每次producer请求更新metadata时,会有以下几种情况:
(1)通道已经ready,node可以发送请求,那么就直接发送请求
(2)如果该node正在建立连接,则直接返回
(3)如果该node还没建立连接,则向broker初始化连接
而KafkaProducer线程一直是阻塞在两个while循环中的,直到metadata更新
(1)sender线程第一次调用poll,初始化与node的连接
(2)sender线程第二次调用poll,发送Metadata请求
(3)sender线程第三次调用poll,获取 metadataResponse,并更新 metadata
2.2.3 接收Server端的响应,更新Metadata信息
handleCompletedReceives
是如何处理任何已完成的接收响应
private void handleCompletedReceives(List<ClientResponse> responses, long now) {
for (NetworkReceive receive : this.selector.completedReceives()) {
String source = receive.source();
InFlightRequest req = inFlightRequests.completeNext(source);
Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,
throttleTimeSensor, now);
if (log.isTraceEnabled()) {
log.trace("Completed receive from node {} for {} with correlation id {}, received {}", req.destination,
req.header.apiKey(), req.header.correlationId(), responseStruct);
}
// If the received response includes a throttle delay, throttle the connection.
AbstractResponse body = AbstractResponse.
parseResponse(req.header.apiKey(), responseStruct, req.header.apiVersion());
maybeThrottle(body, req.header.apiVersion(), req.destination, now);
// 这里有判断类型,如果是Metadata请求的响应
if (req.isInternalRequest && body instanceof MetadataResponse)
metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
else if (req.isInternalRequest && body instanceof ApiVersionsResponse)
handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body);
else
responses.add(req.completed(body, now));
}
}
之后进一步调用handleCompletedMetadataResponse
。
3 总结
Metadata有两种更新方式:
- 强制更新:调用
Metadata.requestUpdate()
将needUpdate
置为true来强制更新 - 周期性更新:通过Metadata的
lastSuccessfulRefreshMs
和metadataExpireMs
来实现,一般情况下,默认周期时间就是metadataExpireMs
,1分钟时长
在NetworkClient的poll()
方法调用时,会去检查两种更新机制,只要达到一种,就会触发更新操作。
Metadata 的强制更新会在以下几种情况下进行:
- initConnect 方法调用时,初始化连接;
- poll() 方法中对 handleDisconnections() 方法调用来处理连接断开的情况,这时会触发强制更新;
- poll() 方法中对 handleTimedOutRequests() 来处理请求超时时;
- 发送消息时,如果无法找到 partition 的 leader;
- 处理 Producer 响应(handleProduceResponse),如果返回关于 Metadata 过期的异常,比如:没有 topic-partition 的相关 meta 或者 client 没有权限获取其 metadata。
强制更新主要是用于处理各种异常情况。
网友评论