1 消费方式与订阅状态
1.1 消费方式
分为订阅(subscribe)和 分配(assign)
subscribe:指定订阅的主题,由协调者(ConsumerCoordinator和GroupCoordinator)为消费者分配分区
assign:消费者指定消费的分区,这种模式失去协调者为消费者动态分配分区的功能
1.1.1
相关代码如下:
subscribe方法的参数是Topic,而assign方法的参数是Partition,这两个方法都会更新消费者订阅状态对象(SubscriptionState)。
subscribe方法中 this.subscriptions.subscribe(new HashSet<>(topics), listener);
assign方法中this.subscriptions.assignFromUser(new HashSet<>(partitions));
获取分配结果方法assignment中,保存了分配给消费者的分区到分区状态映射关系
1.2 订阅状态
public class SubscriptionState {
private static final String SUBSCRIPTION_EXCEPTION_MESSAGE =
"Subscription to topics, partitions and pattern are mutually exclusive";
//状态有哪些
private enum SubscriptionType {
NONE, AUTO_TOPICS, AUTO_PATTERN, USER_ASSIGNED
}
private SubscriptionType subscriptionType;
private Pattern subscribedPattern;
//用户订阅的主题
private Set<String> subscription;
/* the list of topics the group has subscribed to (set only for the leader on join group completion) */
//消费组订阅的主题
private final Set<String> groupSubscription;
/* the partitions that are currently assigned, note that the order of partition matters (see FetchBuilder for more details) */
private final PartitionStates<TopicPartitionState> assignment;
/* Default offset reset strategy */
private final OffsetResetStrategy defaultResetStrategy;
/* Listeners provide a hook for internal state cleanup (e.g. metrics) on assignment changes */
private final List<Listener> listeners = new ArrayList<>();
/* User-provided listener to be invoked when assignment changes */
private ConsumerRebalanceListener rebalanceListener;
}
subscribe和assign是如何改变SubscriptionState的??
subscribe
public void subscribe(Set<String> topics, ConsumerRebalanceListener listener) {
if (listener == null)
throw new IllegalArgumentException("RebalanceListener cannot be null");
// 设置类型为auto_topics
setSubscriptionType(SubscriptionType.AUTO_TOPICS);
this.rebalanceListener = listener;
changeSubscription(topics);
}
private void changeSubscription(Set<String> topicsToSubscribe) {
// 判断每一次订阅有没有改变,没有改变就不需要为消费者重新分配分区
if (!this.subscription.equals(topicsToSubscribe)) {
this.subscription = topicsToSubscribe;
this.groupSubscription.addAll(topicsToSubscribe);
}
}
assign
public void assignFromUser(Set<TopicPartition> partitions) {
// 设置类型为user_assigned
setSubscriptionType(SubscriptionType.USER_ASSIGNED);
// 分配的分区有没有改变
if (!this.assignment.partitionSet().equals(partitions)) {
fireOnAssignment(partitions);
Map<TopicPartition, TopicPartitionState> partitionToState = new HashMap<>();
for (TopicPartition partition : partitions) {
TopicPartitionState state = assignment.stateValue(partition);
if (state == null)
state = new TopicPartitionState();
partitionToState.put(partition, state);
}
this.assignment.set(partitionToState);
}
}
在SubscriptionState中还有通过分配结果assignment获取“部分或所有”的方法,
hasAllFetchPositions()
:判断是否所有的分区都存在有效的拉取偏移量
missingFetchPositions()
:并不是所有分区都有拉取偏移量,找出没有拉取偏移量的分区
fetchablePartitions()
:获取允许拉取,即存在拉取偏移量的分区,用来构建拉取请求
1.3 分区状态
旧版TopicPartitionState
private static class TopicPartitionState{
private Long position; // 拉取偏移量
private OffsetAndMetadata committed; //消费偏移量,提交偏移量
private boolean paused; //分区是否被暂停拉取
private OffsetResetStrategy resetStrategy; //重置策略
}
新版TopicPartitionState
private Long position; // last consumed position 拉取偏移量
private Long highWatermark; // the high watermark from last fetch 高水位,消费者只能拉取到这个offset之前的消息
private Long logStartOffset; // the log start offset 日志文件起始处
private Long lastStableOffset; // 用于事务处理,当前已经提交的事务的最大位点
private boolean paused; // whether this partition has been paused by the user 分区是否被暂停拉取
private OffsetResetStrategy resetStrategy; // the strategy to use if the offset needs resetting
private Long nextAllowedRetryTimeMs;
}
新版将OffsetAndMetadata从TopicPartitionState中移出来
public class OffsetAndMetadata implements Serializable {
private static final long serialVersionUID = 2019555404968089681L;
private final long offset;
private final String metadata;
// We use null to represent the absence of a leader epoch to simplify serialization.
// I.e., older serializations of this class which do not have this field will automatically
// initialize its value to null.
private final Integer leaderEpoch;
}
todo 分析
2.消费者拉取消息前,如何获取各分区的拉取偏移量
分为第一次轮询和多次轮询
(1)客户端订阅主题后通过KafkaConsumer轮询,准备拉取消息
(2)如果所有的分区都有拉取偏移量,进入步骤6,否则进入步骤3
(3)从订阅状态的分配结果中找出所有没有拉取偏移量的分区
(4)通过updateFetchPositions()更新步骤3中分区的拉取偏移量
(5)不管是从步骤2直接进来还是步骤4更新过的分区,现在都允许消费者拉取
(6)对所有存在拉取偏移量并且允许拉取的分区,构建拉取请求开始拉取消息
2.1 关于偏移量
调用KafkaConsumer的更新拉取偏移量方法有以下两个步骤:
<1> 通过“消费者的协调者”(ConsumerCoordinator)更新分区状态的提交偏移量(OffsetAndMetadata)
<2> 通过“拉取器”(Fetcher)更新分区状态的拉取偏移量(TopicPartitionState的position)
拉取偏移量:用于在发送拉取请求时指定从分区的的哪里开始拉取消息
提交偏移量:表示消费者处理分区消息的进度
消费者拉取消息时要更新拉取偏移量,处理消息时需要更新提交偏移量。
通常“提交偏移量”会赋值给“拉取偏移量”,尤其是发生再平衡时,分区分配给新的消费者。新消费者之前在本地没有记录这个分区的消费进度,它要获取“拉取偏移量”,需要从协调者获取这个分区的“提交偏移量”,将提交偏移量作为分区的起始“拉取偏移量”。
获取分区偏移量:1.OFFSET_FETCH请求(OffsetFetchRequest)是通过消费者的协调者发送给服务端协调节点,2.LIST_OFFSET请求(ListOffsetRequest)则是由拉取器发送给分区的主副本节点。协调节点保存了消费组的相关数据,即分区的提交偏移量;而分区主副本节点只保存分区的日志文件,和偏移量相关的只有日志文件中的消息偏移量。 在新API获取拉取偏移量时综合使用了这两个功能:只有协调节点没有记录分区的提交偏移量时,才会从分区的主副本节点获取偏移量。
2.2 流程梳理
在上图中【updateFetchPositions】中,
<1> 先去调用【hasAllFetchPositions】,从分配结果assignment(这里的assignment是在调用ConsumerCoordinator.poll进行分配的)中,判断所有的分区是否都有“拉取偏移量”,都有的话直接返回true
<2> 如果存在某些分区没有“拉取偏移量”,调用ConsumerCoordinator的【refreshCommittedOffsetsIfNeeded】,在该方法中
(1) 调用【missingFetchPositions】,找出没有拉取偏移量的分区
(2) 调用【fetchCommittedOffsets】,实际是通过coordinator去获取分区状态的“提交偏移量”
(3) 更新每个分区的OffsetAndMetadata的leaderEpoch,因为分区主副本可能已经变更了
(4) 将“提交偏移量”赋值给“拉取偏移量”
特别提醒:通过assign方式订阅分区的,通过给分区指定初始position,避免coordinator的参与
<3> 如果2中,某些分区还是没有position,即服务端的协调者节点并没有记录这个分区的“已提交偏移量”,调用【resetMissingPositions】,使用定义的重置策略来设置重置策略,并将position置为null,将nextAllowedRetryTimeMs置为null
<4> 调用fetcher的【resetOffsetsIfNeeded】,获取需要重置的partitions,使用partitionsNeedingReset方法(满足两个条件就是resetStrategy != null
并且nextAllowedRetryTimeMs == null || nowMs >= nextAllowedRetryTimeMs
), 调用【resetOffsetsAsync】方法
<5> 在【resetOffsetsAsync】中,使用groupListOffsetRequests获取Map<Node, Map<TopicPartition, ListOffsetRequest.PartitionData>>
请求类,这里的Node是分区的主副本所在的节点,
PartitionData构造:
传入的timstamp是之前从重置策略得到的,currentLeaderEpoch传入的是空。
接着根据Node异步发送请求【sendListOffsetRequest】,返回结果ListOffsetResult
private static class ListOffsetResult {
private final Map<TopicPartition, OffsetData> fetchedOffsets; // 主题分区和偏移量结果的map集合
private final Set<TopicPartition> partitionsToRetry; // 主题分区的set集合
}
2.3 源码分析
2.3.1 hasAllFetchPositions
方法
public boolean hasAllFetchPositions() {
return assignment.stream().allMatch(state -> state.value().hasValidPosition());
}
从ConsumerCoordinator的poll方法,GroupCoordinator返回的分区分配结果中,进行遍历,查看哪些分区是由合法的“拉取偏移量”即TopiPartitionState中的position。
2.3.2 refreshCommittedOffsetsIfNeeded
方法
public boolean refreshCommittedOffsetsIfNeeded(Timer timer) {
// 查找哪些分区是没有合法的position
final Set<TopicPartition> missingFetchPositions = subscriptions.missingFetchPositions();
final Map<TopicPartition, OffsetAndMetadata> offsets = fetchCommittedOffsets(missingFetchPositions, timer);
if (offsets == null) return false;
// 获取到提交偏移量之后,将分区的提交偏移量赋值给分区的拉取偏移量
for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
final TopicPartition tp = entry.getKey();
final long offset = entry.getValue().offset();
log.debug("Setting offset for partition {} to the committed offset {}", tp, offset);
// 更新分区的leader epoch
entry.getValue().leaderEpoch().ifPresent(epoch -> this.metadata.updateLastSeenEpochIfNewer(entry.getKey(), epoch));
// 将offset赋值给分区的拉取偏移量position
this.subscriptions.seek(tp, offset);
}
return true;
}
2.3.3 resetMissingPositions
方法
public void resetMissingPositions() {
final Set<TopicPartition> partitionsWithNoOffsets = new HashSet<>();
// 遍历分配的分区
assignment.stream().forEach(state -> {
TopicPartition tp = state.topicPartition();
TopicPartitionState partitionState = state.value();
// 如果该分区还是没有拉取偏移量position
if (partitionState.isMissingPosition()) {
// 如果还没有设置重置策略
if (defaultResetStrategy == OffsetResetStrategy.NONE)
partitionsWithNoOffsets.add(tp);
else
// 设置重置策略,清空position,将nextAllowedRetryTimeMs设置为null
partitionState.reset(defaultResetStrategy);
}
});
// 如果存在分区没有position并且没有配置重置策略,抛出异常
if (!partitionsWithNoOffsets.isEmpty())
throw new NoOffsetForPartitionException(partitionsWithNoOffsets);
}
Offset重置策略如下,对应很熟悉的属性配置auto.offset.reset
earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
public enum OffsetResetStrategy {
LATEST, EARLIEST, NONE
}
2.3.4 resetOffsetsIfNeeded
方法
public void resetOffsetsIfNeeded() {
// Raise exception from previous offset fetch if there is one
RuntimeException exception = cachedListOffsetsException.getAndSet(null);
if (exception != null)
throw exception;
// 获取需要重置position的分区
Set<TopicPartition> partitions = subscriptions.partitionsNeedingReset(time.milliseconds());
if (partitions.isEmpty())
return;
final Map<TopicPartition, Long> offsetResetTimestamps = new HashMap<>();
for (final TopicPartition partition : partitions) {
// 根据重置策略 auto.offset.reset 获取时间戳
Long timestamp = offsetResetStrategyTimestamp(partition);
if (timestamp != null)
// 存放的是分区和时间戳(重置策略:最早的时间戳和最晚的时间戳)
offsetResetTimestamps.put(partition, timestamp);
}
// 异步提交重置请求
resetOffsetsAsync(offsetResetTimestamps);
}
2.3.5 resetOffsetsAsync
方法
// 同步重置每个分区的拉取偏移量
private void resetOffsetsAsync(Map<TopicPartition, Long> partitionResetTimestamps) {
// Add the topics to the metadata to do a single metadata fetch.
for (TopicPartition tp : partitionResetTimestamps.keySet())
metadata.add(tp.topic());
// 调用groupListOffsetRequests方法
Map<Node, Map<TopicPartition, ListOffsetRequest.PartitionData>> timestampsToSearchByNode =
groupListOffsetRequests(partitionResetTimestamps, new HashSet<>());
for (Map.Entry<Node, Map<TopicPartition, ListOffsetRequest.PartitionData>> entry : timestampsToSearchByNode.entrySet()) {
Node node = entry.getKey();
final Map<TopicPartition, ListOffsetRequest.PartitionData> resetTimestamps = entry.getValue();
subscriptions.setResetPending(resetTimestamps.keySet(), time.milliseconds() + requestTimeoutMs);
RequestFuture<ListOffsetResult> future = sendListOffsetRequest(node, resetTimestamps, false);
future.addListener(new RequestFutureListener<ListOffsetResult>() {
@Override
public void onSuccess(ListOffsetResult result) {
if (!result.partitionsToRetry.isEmpty()) {
subscriptions.resetFailed(result.partitionsToRetry, time.milliseconds() + retryBackoffMs);
metadata.requestUpdate();
}
for (Map.Entry<TopicPartition, OffsetData> fetchedOffset : result.fetchedOffsets.entrySet()) {
TopicPartition partition = fetchedOffset.getKey();
OffsetData offsetData = fetchedOffset.getValue();
ListOffsetRequest.PartitionData requestedReset = resetTimestamps.get(partition);
resetOffsetIfNeeded(partition, requestedReset.timestamp, offsetData);
}
}
@Override
public void onFailure(RuntimeException e) {
subscriptions.resetFailed(resetTimestamps.keySet(), time.milliseconds() + retryBackoffMs);
metadata.requestUpdate();
if (!(e instanceof RetriableException) && !cachedListOffsetsException.compareAndSet(null, e))
log.error("Discarding error in ListOffsetResponse because another error is pending", e);
}
});
}
}
2.3.6 groupListOffsetRequests
方法
private Map<Node, Map<TopicPartition, ListOffsetRequest.PartitionData>> groupListOffsetRequests(
Map<TopicPartition, Long> timestampsToSearch, Set<TopicPartition> partitionsToRetry) {
final Map<Node, Map<TopicPartition, ListOffsetRequest.PartitionData>> timestampsToSearchByNode = new HashMap<>();
for (Map.Entry<TopicPartition, Long> entry: timestampsToSearch.entrySet()) {
// 获取分区
TopicPartition tp = entry.getKey();
// 获取分区详细信息
Optional<PartitionInfo> currentInfo = metadata.partitionInfoIfCurrent(tp);
// 分区信息不存在
if (!currentInfo.isPresent()) {
metadata.add(tp.topic());
log.debug("Leader for partition {} is unknown for fetching offset", tp);
metadata.requestUpdate();
partitionsToRetry.add(tp);
} else if (currentInfo.get().leader() == null) {
log.debug("Leader for partition {} is unavailable for fetching offset", tp);
metadata.requestUpdate();
partitionsToRetry.add(tp);
} else if (client.isUnavailable(currentInfo.get().leader())) {
client.maybeThrowAuthFailure(currentInfo.get().leader());
// The connection has failed and we need to await the blackout period before we can
// try again. No need to request a metadata update since the disconnect will have
// done so already.
log.debug("Leader {} for partition {} is unavailable for fetching offset until reconnect backoff expires",
currentInfo.get().leader(), tp);
partitionsToRetry.add(tp);
} else {
// 这里的Node是分区的主副本所在的节点
Node node = currentInfo.get().leader();
Map<TopicPartition, ListOffsetRequest.PartitionData> topicData =
timestampsToSearchByNode.computeIfAbsent(node, n -> new HashMap<>());
ListOffsetRequest.PartitionData partitionData = new ListOffsetRequest.PartitionData(
entry.getValue(), Optional.empty());
topicData.put(entry.getKey(), partitionData);
}
}
return timestampsToSearchByNode;
}
网友评论