public class KafkaConsumer<K, V> implements Consumer<K, V> {
// clientId的生成器
private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
// comsumer的唯一id
private final String clientId;
// 控制consumer和服务端Coordinator之间的通信逻辑
private final ConsumerCoordinator coordinator;
private final Deserializer<K> keyDeserializer;
private final Deserializer<V> valueDeserializer;
// poll方法返回用户之前拦截,服务端返回commit响应时拦截
private final ConsumerInterceptors<K, V> interceptors;
// 负责consumer与broker之间的通信
private final ConsumerNetworkClient client;
// 维护消费者的消费状态
private final SubscriptionState subscriptions;
// Kafka集群元信息
private final Metadata metadata;
// 当前使用KafkaConsumer的线程id
private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
// 重入次数
// 检测是否有多线程并发操作consumer
private final AtomicInteger refcount = new AtomicInteger(0);
@Override
public ConsumerRecords<K, V> poll(long timeout) {
acquire(); // 防止多线程操作
try {
long start = time.milliseconds();
long remaining = timeout;
do {
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
// 有消息就返回,退出poll
if (!records.isEmpty()) {
// 为了提升效率,对records集合处理之前,先发送一次FetchRequest,
// 这样线程处理完records的同时,FetchRequest和FetchResponse在网络上也在并行传输
fetcher.sendFetches();
// 发送FetchRequest,不可中断原因:
// since the consumed position has already been updated, we must not allow
// wakeups or any other errors to be triggered prior to returning the fetched records.
// 之前的pollOnce
client.pollNoWakeup();
if (this.interceptors == null)
return new ConsumerRecords<>(records);
else
return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
long elapsed = time.milliseconds() - start;
remaining = timeout - elapsed;
} while (remaining > 0);
// 超过时间还收到消息就返回空
return ConsumerRecords.empty();
} finally {
release();
}
}
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
coordinator.ensureCoordinatorReady();
if (subscriptions.partitionsAutoAssigned())
// 如果是AUTO模式,要先完成rebalance
coordinator.ensurePartitionAssignment();
// 如果consumer存在订阅的TopicPartition没有position,还需要恢复SubscriptionState中对应TopicPartitionState状态
// 如果缺失commit,从服务端拉取commited,然后同步到position
if (!subscriptions.hasAllFetchPositions())
updateFetchPositions(this.subscriptions.missingFetchPositions());
long now = time.milliseconds();
// 执行定时任务 HeartbeatTask和AutoCommitedTask
// 从delayedTasks队列里拉取计划在当前时间前执行的定时任务
client.executeDelayedTasks(now);
// 尝试从completedFetch队列缓存里获取解析消息
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty())
return records;
fetcher.sendFetches();
client.poll(timeout, now);
return fetcher.fetchedRecords();
}
private void acquire() {
ensureNotClosed();
long threadId = Thread.currentThread().getId();
if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
// 检测到被其他线程占用,就抛出异常
throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
refcount.incrementAndGet();
}
// 释放占用
private void release() {
if (refcount.decrementAndGet() == 0)
currentThread.set(NO_CURRENT_THREAD);
}
private void updateFetchPositions(Set<TopicPartition> partitions) {
// 刷新本地SubscriptionState.TopicPartitionState.commit的offset
coordinator.refreshCommittedOffsetsIfNeeded();
// 如果commit是null,就重置offset
fetcher.updateFetchPositions(partitions);
}
}
public class ConsumerNetworkClient implements Closeable {
// NetworkClient
private final KafkaClient client;
// consumer之外的thread设置,表示要中断consumer线程
private final AtomicBoolean wakeup = new AtomicBoolean(false);
// 定时任务队列,主要是心跳任务
// 底层实现是PriorityQueue
private final DelayedTaskQueue delayedTasks = new DelayedTaskQueue();
// 缓冲队列
private final Map<Node, List<ClientRequest>> unsent = new HashMap<>();
// 集群元数据
private final Metadata metadata;
// 在unset中缓存的超时时长
private final long unsentExpiryMs;
// consumer每进入一个不可中断的method加1,退出时减1
// 判断是否允许唤醒selector阻塞
private int wakeupDisabledCount = 0;
// 待发送的请求封装成ClientRequest,然后保存到unsent
public RequestFuture<ClientResponse> send(Node node, ApiKeys api, AbstractRequest request) {
long now = time.milliseconds();
RequestFutureCompletionHandler future = new RequestFutureCompletionHandler();
RequestHeader header = client.nextRequestHeader(api);
RequestSend send = new RequestSend(node.idString(), header, request.toStruct());
put(node, new ClientRequest(now, true, send, future));
return future;
}
public void poll(RequestFuture<?> future) {
while (!future.isDone()) // 同步阻塞等待future完成响应
poll(Long.MAX_VALUE);
}
// 不阻塞等待也不唤醒
public void pollNoWakeup() {
disableWakeups();
try {
poll(0, time.milliseconds(), false);
} finally {
enableWakeups();
}
}
private void poll(long timeout, long now, boolean executeDelayedTasks) {
// 遍历处理unsent缓存中的请求
trySend(now);
// 比较取最小值,避免影响定时任务执行
timeout = Math.min(timeout, delayedTasks.nextTimeout(now));
// 实际发送请求,检测wakeup标识为true就抛出异常中断consumer.poll方法
clientPoll(timeout, now);
now = time.milliseconds();
// 如果连接断开,从unsent队列里删除后,再调用这些request的callback
checkDisconnects(now);
// 执行定时任务
if (executeDelayedTasks)
delayedTasks.poll(now);
// 可能已经新建了某些node的连接,再尝试一把
trySend(now);
// 遍历unsent中已经超时的request,执行callback,然后从unsent里删除
failExpiredRequests(now);
}
private boolean trySend(long now) {
boolean requestsSent = false;
for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet()) {
Node node = requestEntry.getKey();
Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
while (iterator.hasNext()) {
ClientRequest request = iterator.next();
// 检测连接、在途请求队列数量
if (client.ready(node, now)) {
// 复制到KafkaChannel的send
client.send(request, now);
iterator.remove();
requestsSent = true;
}
}
}
return requestsSent;
}
// 设置MAX超时时长,同步阻塞等待
public void awaitMetadataUpdate() {
int version = this.metadata.requestUpdate();
do {
poll(Long.MAX_VALUE);
} while (this.metadata.version() == version);
}
// 等待unsent和InFlightRequests中的请求全部完成
public void awaitPendingRequests(Node node) {
while (pendingRequestCount(node) > 0)
poll(retryBackoffMs);
}
public static class RequestFutureCompletionHandler extends RequestFuture<ClientResponse> implements RequestCompletionHandler {
// 请求是否已经完成
private boolean isDone = false;
// 成功响应,与exception互斥
private T value;
// 导致异常的类
private RuntimeException exception;
// 监听请求完成的情况,onSucess和onFailure方法
private List<RequestFutureListener<T>> listeners = new ArrayList<>();
@Override
public void onComplete(ClientResponse response) {
if (response.wasDisconnected()) {
ClientRequest request = response.request();
RequestSend send = request.request();
ApiKeys api = ApiKeys.forId(send.header().apiKey());
int correlation = send.header().correlationId();
raise(DisconnectException.INSTANCE);
} else {
complete(response);
}
}
// 适配将本实例的泛型类型T转换成S
public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) {
final RequestFuture<S> adapted = new RequestFuture<S>();
addListener(new RequestFutureListener<T>() {
@Override
public void onSuccess(T value) {
adapter.onSuccess(value, adapted);
}
@Override
public void onFailure(RuntimeException e) {
adapter.onFailure(e, adapted);
}
});
return adapted;
}
}
}
public class SubscriptionState {
private enum SubscriptionType {
NONE,
AUTO_TOPICS, // 指定topic名
AUTO_PATTERN, // 正则匹配topic名
USER_ASSIGNED // 用户指定
};
private SubscriptionType subscriptionType; // 表示订阅的模式
private Pattern subscribedPattern; // 正则匹配模式的表达式
private final Set<String> subscription; // 所有订阅的topic名
private final Set<String> groupSubscription; // 只有consumerGroup的leader才有,记录该consumerGroup订阅的所有topic; follower只有自己订阅的topic
private final Set<TopicPartition> userAssignment; // 手动分配给consumer的topicPartition集合,与subscription互斥
// 分配给当前consumer的分区
// 记录每个topicPartition的消费状况,例如offset
private final Map<TopicPartition, TopicPartitionState> assignment;
// 是否需要分区分配, needsRejoin会根据这个判断处理
// consumer订阅某个topic时设置成true
private boolean needsPartitionAssignment;
// 是否需要拉取offset,在异步提交offset或rebalance分区时候会设置成true
private boolean needsFetchCommittedOffsets;
private final OffsetResetStrategy defaultResetStrategy; // 重置offset策略
private ConsumerRebalanceListener listener; // 监听分区分配操作
private static class TopicPartitionState {
private Long position; // 最近消费消息的offset
private OffsetAndMetadata committed; // 最近commit的offset
private boolean paused; // 是否处于暂停状态
private OffsetResetStrategy resetStrategy; // 重置offset的策略
}
// comsumer订阅topic时候回被调用
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
setSubscriptionType(SubscriptionType.AUTO_TOPICS);
// 缺省的listener是 NoOpConsumerRebalanceListener
this.listener = listener;
// 更新subscription、groupSubscription、needsPartitionAssignment=true
changeSubscription(topics);
}
}
public final class ConsumerCoordinator extends AbstractCoordinator {
// consumer发送的JoinGroupRequest中包含了自身支持的PartitionAssigner,
// GroupCoordinator从所有consumer的分配策略里选择一个,通知leader使用此策略做分区分配
private final List<PartitionAssignor> assignors;
private final Metadata metadata;
private final SubscriptionState subscriptions;
private final boolean autoCommitEnabled;
private final AutoCommitTask autoCommitTask; // 自动提交offset的定时任务
private final ConsumerInterceptors<?, ?> interceptors;
private final boolean excludeInternalTopics; // 是否排除内部topic
// 用来检测topic是否发生了分区数量的变化
private MetadataSnapshot metadataSnapshot;
// 构造方法
public ConsumerCoordinator(ConsumerNetworkClient client, String groupId, int sessionTimeoutMs, int heartbeatIntervalMs,
List<PartitionAssignor> assignors, Metadata metadata, SubscriptionState subscriptions,
Metrics metrics, String metricGrpPrefix, Time time, long retryBackoffMs,
OffsetCommitCallback defaultOffsetCommitCallback, boolean autoCommitEnabled,
long autoCommitIntervalMs, ConsumerInterceptors<?, ?> interceptors, boolean excludeInternalTopics) {
super(client, groupId, sessionTimeoutMs, heartbeatIntervalMs, metrics, metricGrpPrefix, time, retryBackoffMs);
this.metadata = metadata;
this.metadata.requestUpdate();
this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch());
this.subscriptions = subscriptions;
this.defaultOffsetCommitCallback = defaultOffsetCommitCallback;
this.autoCommitEnabled = autoCommitEnabled;
this.assignors = assignors;
// 添加metadata更新监听
addMetadataListener();
if (autoCommitEnabled) {
this.autoCommitTask = new AutoCommitTask(autoCommitIntervalMs);
this.autoCommitTask.reschedule();
} else {
this.autoCommitTask = null;
}
this.interceptors = interceptors;
this.excludeInternalTopics = excludeInternalTopics;
}
// Metadata更新监听
private void addMetadataListener() {
this.metadata.addListener(new Metadata.Listener() {
@Override
public void onMetadataUpdate(Cluster cluster) {
// 正则匹配topic模式
if (subscriptions.hasPatternSubscription()) {
final List<String> topicsToSubscribe = new ArrayList<>();
for (String topic : cluster.topics())
if (filterTopic(topic)) // 正则匹配
topicsToSubscribe.add(topic);
// 更新subscription、groupScription集合、assignment集合
subscriptions.changeSubscription(topicsToSubscribe);
// 更新元信息的topic集合
metadata.setTopics(subscriptions.groupSubscription());
} else if (!cluster.unauthorizedTopics().isEmpty()) {
throw new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics()));
}
// 非手动,即AUTO_TOPICS或AUTO_PATTERN
if (subscriptions.partitionsAutoAssigned()) {
MetadataSnapshot snapshot = new MetadataSnapshot(subscriptions, cluster);
// metadataSnapshot底层是map: topic -> partition数量
// 不相等说明分区产生了变化,需要rebalance
if (!snapshot.equals(metadataSnapshot)) {
metadataSnapshot = snapshot;
subscriptions.needReassignment();
}
}
}
});
}
// JoinGroup的入口,即rebalance
public void ensurePartitionAssignment() {
// 只有自动分配分区的才需要rebalance
if (subscriptions.partitionsAutoAssigned()) {
if (subscriptions.hasPatternSubscription())
// 订阅是正则匹配模式,还需要检查是否需要更新Metadata
// 防止使用过期的Metadata进行rebalance
client.ensureFreshMetadata();
ensureActiveGroup();
}
}
@Override
protected void onJoinPrepare(int generation, String memberId) {
// 如果开启了自动提交offset,则同步提交offset
maybeAutoCommitOffsetsSync();
ConsumerRebalanceListener listener = subscriptions.listener();
Set<TopicPartition> revoked = new HashSet<>(subscriptions.assignedPartitions());
// 调用分区重新分配的callback
listener.onPartitionsRevoked(revoked);
assignmentSnapshot = null;
// groupSubscription收缩到自身的subscription
// needsPartitionAssignment=true
subscriptions.needReassignment();
}
// 收到JoinGroupResponse后,被指定为join leader的consumer,执行分配策略
@Override
protected Map<String, ByteBuffer> performAssignment(String leaderId,
String assignmentStrategy,
Map<String, ByteBuffer> allSubscriptions) {
// 默认是range分配策略
PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
Set<String> allSubscribedTopics = new HashSet<>();
Map<String, Subscription> subscriptions = new HashMap<>();
for (Map.Entry<String, ByteBuffer> subscriptionEntry : allSubscriptions.entrySet()) {
Subscription subscription = ConsumerProtocol.deserializeSubscription(subscriptionEntry.getValue());
subscriptions.put(subscriptionEntry.getKey(), subscription);
allSubscribedTopics.addAll(subscription.topics());
}
// leader需要更新整个consumer group的订阅topic
// 可能有新的topic加入,需要更新Metadata
this.subscriptions.groupSubscribe(allSubscribedTopics);
metadata.setTopics(this.subscriptions.groupSubscription());
client.ensureFreshMetadata();
assignmentSnapshot = metadataSnapshot;
// 默认调用RangeAssignor
// 分配结果: memberId -> 分配结果
Map<String, Assignment> assignment = assignor.assign(metadata.fetch(), subscriptions);
Map<String, ByteBuffer> groupAssignment = new HashMap<>();
for (Map.Entry<String, Assignment> assignmentEntry : assignment.entrySet()) {
ByteBuffer buffer = ConsumerProtocol.serializeAssignment(assignmentEntry.getValue());
groupAssignment.put(assignmentEntry.getKey(), buffer);
}
return groupAssignment;
}
// 处理SyncGroupResponse
@Override
protected void onJoinComplete(int generation, String memberId, String assignmentStrategy, ByteBuffer assignmentBuffer) {
// 快照与最新的不一致,需要重新分区Assign
if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot)) {
subscriptions.needReassignment();
return;
}
PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);
// 从服务端获取最近一次的offset标识
subscriptions.needRefreshCommits();
// 更新当前consumer订阅的topic
subscriptions.assignFromSubscribed(assignment.partitions());
// 重新启动AutoCommitTask定时任务
if (autoCommitEnabled)
autoCommitTask.reschedule();
// rebalance后执行callback
ConsumerRebalanceListener listener = subscriptions.listener();
Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions());
listener.onPartitionsAssigned(assigned);
}
}
public class RangeAssignor extends AbstractPartitionAssignor {
@Override
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
Map<String, List<String>> subscriptions) {
for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
Collections.sort(consumersForTopic);
// 每个consumer订阅partition数量
int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
// 除不尽余数的partition单独分配给consumer
int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
}
}
return assignment;
}
}
public abstract class AbstractPartitionAssignor implements PartitionAssignor {
// 完成partition分配
@Override
public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
Set<String> allSubscribedTopics = new HashSet<>();
Map<String, List<String>> topicSubscriptions = new HashMap<>();
// 父类默认是去掉userData不处理的
// 如果子类需要用到userData,就要自己实现PartitionAssignor接口的assign方法
for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) {
List<String> topics = subscriptionEntry.getValue().topics();
allSubscribedTopics.addAll(topics);
topicSubscriptions.put(subscriptionEntry.getKey(), topics);
}
// 统计每个topic的分区数量
Map<String, Integer> partitionsPerTopic = new HashMap<>();
for (String topic : allSubscribedTopics) {
Integer numPartitions = metadata.partitionCountForTopic(topic);
if (numPartitions != null && numPartitions > 0)
partitionsPerTopic.put(topic, numPartitions);
}
Map<String, List<TopicPartition>> rawAssignments = assign(partitionsPerTopic, topicSubscriptions);
Map<String, Assignment> assignments = new HashMap<>();
for (Map.Entry<String, List<TopicPartition>> assignmentEntry : rawAssignments.entrySet())
assignments.put(assignmentEntry.getKey(), new Assignment(assignmentEntry.getValue()));
return assignments;
}
}
public interface PartitionAssignor {
// 每个member的订阅信息
class Subscription {
private final List<String> topics; // 订阅的topic集合
private final ByteBuffer userData;
}
class Assignment {
private final List<TopicPartition> partitions; // 分区分配的结果
private final ByteBuffer userData;
}
}
public abstract class AbstractCoordinator implements Closeable {
private final Heartbeat heartbeat; // 心跳任务的辅助类
private final HeartbeatTask heartbeatTask; // 定时任务,发送心跳和处理响应
protected final String groupId; // consumer group id
protected final ConsumerNetworkClient client; // 网络通信
private boolean needsJoinPrepare = true; // 是否需要发送joinGroupRequest前的准备操作
// 是否需要重新发送JoinGroupRequest的条件之一
// 一般收到response的错误码是需要rebalance时,会设置成true
// JoinGroupResponse收到后设置成false
// 缺省是true
private boolean rejoinNeeded = true;
protected Node coordinator; // 记录服务端GroupCoordinator所在的node节点
protected String memberId; // 服务端GroupCoordinator返回的分配给consumer的唯一id
protected int generation; // 可以理解每次rebalance的版本号,避免消费历史的rebalance请求
private class HeartbeatTask implements DelayedTask {
// 外部调用触发心跳任务
public void reset() {
long now = time.milliseconds();
heartbeat.resetSessionTimeout(now);
client.unschedule(this);
if (!requestInFlight)
client.schedule(this, now);
}
@Override
public void run(final long now) {
// 之前的心跳请求正常收到响应
// 不处于正在等待rebalance分配结果的状态
// 服务端的GroupCoordinator已连接
if (generation < 0 || needRejoin() || coordinatorUnknown()) {
return;
}
if (heartbeat.sessionTimeoutExpired(now)) {
// 心跳超时则认为服务端GroupCoordinator已经宕机
coordinatorDead();
return;
}
if (!heartbeat.shouldHeartbeat(now)) {
// 还没到下一次心跳间隔触发时间,不发送请求(等于本次任务结束),
// 更新下一个触发时间点,再添加一个新的定时任务
client.schedule(this, now + heartbeat.timeToNextHeartbeat(now));
} else {
heartbeat.sentHeartbeat(now);
requestInFlight = true; // 防止重复发送
// 发送心跳请求
RequestFuture<Void> future = sendHeartbeatRequest();
// 注册该请求收到响应的callback
future.addListener(new RequestFutureListener<Void>() {
// 发送完成后新增定时任务调度
@Override
public void onSuccess(Void value) {
requestInFlight = false;
long now = time.milliseconds();
heartbeat.receiveHeartbeat(now);
long nextHeartbeatTime = now + heartbeat.timeToNextHeartbeat(now);
client.schedule(HeartbeatTask.this, nextHeartbeatTime);
}
@Override
public void onFailure(RuntimeException e) {
requestInFlight = false;
client.schedule(HeartbeatTask.this, time.milliseconds() + retryBackoffMs);
}
});
}
}
}
// 处理心跳响应
private class HeartbeatCompletionHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> {
@Override
public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
Errors error = Errors.forCode(heartbeatResponse.errorCode());
if (error == Errors.NONE) {
// 成功响应,传播成功事件
future.complete(null);
} else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR_FOR_GROUP) {
coordinatorDead();
future.raise(error);
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
// 说明coordinator已经发起了rebalance
// 触发发送JoinGroupRequest的标识
AbstractCoordinator.this.rejoinNeeded = true;
future.raise(Errors.REBALANCE_IN_PROGRESS);
} else if (error == Errors.ILLEGAL_GENERATION) {
AbstractCoordinator.this.rejoinNeeded = true;
future.raise(Errors.ILLEGAL_GENERATION);
} else if (error == Errors.UNKNOWN_MEMBER_ID) {
memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
AbstractCoordinator.this.rejoinNeeded = true;
future.raise(Errors.UNKNOWN_MEMBER_ID);
future.raise(new GroupAuthorizationException(groupId));
} else {
future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message()));
}
}
}
protected void coordinatorDead() {
if (this.coordinator != null) {
// unsent缓存中的请求清空,并且调用异常的回调
client.failUnsentRequests(this.coordinator, GroupCoordinatorNotAvailableException.INSTANCE);
// 表示重新选择GroupCoordinator
this.coordinator = null;
}
}
// 查找服务端GroupCoordinator
// 后面的rebalance、fetch消息和commit offset,都是和GroupCoordinator打交道
public void ensureCoordinatorReady() {
while (coordinatorUnknown()) {
RequestFuture<Void> future = sendGroupCoordinatorRequest();
// 阻塞等待future有响应
client.poll(future);
if (future.failed()) {
if (future.isRetriable())
client.awaitMetadataUpdate();
else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
coordinatorDead();
// 通过sleep控制重试连接间隔
time.sleep(retryBackoffMs);
}
}
}
// 处理服务端返回查找GroupCoordinator的应答
// 赋值coordinator字段,连接coordinator,启动心跳任务
private void handleGroupMetadataResponse(ClientResponse resp, RequestFuture<Void> future) {
if (!coordinatorUnknown()) {
// consumer已经找到GroupCoordinator了,不处理这个应答
future.complete(null);
} else {
GroupCoordinatorResponse groupCoordinatorResponse = new GroupCoordinatorResponse(resp.responseBody());
Errors error = Errors.forCode(groupCoordinatorResponse.errorCode());
if (error == Errors.NONE) {
this.coordinator = new Node(Integer.MAX_VALUE - groupCoordinatorResponse.node().id(),
groupCoordinatorResponse.node().host(),
groupCoordinatorResponse.node().port());
client.tryConnect(coordinator);
if (generation > 0)
heartbeatTask.reset();
future.complete(null);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(groupId));
} else {
future.raise(error);
}
}
}
public void ensureActiveGroup() {
if (!needRejoin())
return;
if (needsJoinPrepare) {
onJoinPrepare(generation, memberId);
needsJoinPrepare = false;
}
while (needRejoin()) {
// 检查已经连接服务端的groupCoordinator
ensureCoordinatorReady();
// 如果还有发送给GroupCoordinator的请求,阻塞等待这些请求收到响应
// 即等待unsent和InFlightRequests队列为空
if (client.pendingRequestCount(this.coordinator) > 0) {
client.awaitPendingRequests(this.coordinator);
continue;
}
RequestFuture<ByteBuffer> future = sendJoinGroupRequest();
future.addListener(new RequestFutureListener<ByteBuffer>() {
@Override
public void onSuccess(ByteBuffer value) {
onJoinComplete(generation, memberId, protocol, value);
needsJoinPrepare = true;
heartbeatTask.reset();
}
@Override
public void onFailure(RuntimeException e) {
}
});
client.poll(future);
if (future.failed()) {
RuntimeException exception = future.exception();
if (exception instanceof UnknownMemberIdException ||
exception instanceof RebalanceInProgressException ||
exception instanceof IllegalGenerationException)
continue;
else if (!future.isRetriable())
throw exception;
// 通过sleep控制重试间隔
time.sleep(retryBackoffMs);
}
}
}
// JoinGroupRequest设置到sent字段里
private RequestFuture<ByteBuffer> sendJoinGroupRequest() {
JoinGroupRequest request = new JoinGroupRequest( groupId, this.sessionTimeoutMs,
this.memberId, protocolType(), metadata());
return client.send(coordinator, ApiKeys.JOIN_GROUP, request)
.compose(new JoinGroupResponseHandler());
}
// 处理JoinGroupResponse
private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
@Override
public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
Errors error = Errors.forCode(joinResponse.errorCode());
if (error == Errors.NONE) {
// 更新本地信息
AbstractCoordinator.this.memberId = joinResponse.memberId();
AbstractCoordinator.this.generation = joinResponse.generationId();
AbstractCoordinator.this.rejoinNeeded = false;
AbstractCoordinator.this.protocol = joinResponse.groupProtocol();
// 判断自己是不是join leader
if (joinResponse.isLeader()) {
onJoinLeader(joinResponse).chain(future);
} else {
onJoinFollower().chain(future);
}
} else if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
// 重试
future.raise(error);
} else if (error == Errors.UNKNOWN_MEMBER_ID) {
AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
future.raise(Errors.UNKNOWN_MEMBER_ID);
} else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR_FOR_GROUP) {
coordinatorDead();
future.raise(error);
} else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL
|| error == Errors.INVALID_SESSION_TIMEOUT
|| error == Errors.INVALID_GROUP_ID) {
log.error("Attempt to join group {} failed due to fatal error: {}", groupId, error.message());
future.raise(error);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(groupId));
} else {
future.raise(new KafkaException("Unexpected error in join group response: " + error.message()));
}
}
// join leader的逻辑
private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
// 执行分配
Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(),
joinResponse.members());
// 发送请求
SyncGroupRequest request = new SyncGroupRequest(groupId, generation, memberId, groupAssignment);
return sendSyncGroupRequest(request);
}
}
}
public final class Heartbeat {
private final long timeout; // 过期时间
private final long interval; // 2次心跳的间隔,缺省3000
private long lastHeartbeatSend; // 最后发送心跳请求的时间
private long lastHeartbeatReceive; // 最后收到心跳响应的时间
private long lastSessionReset; //心跳重置时间
// 计算下次心跳发送时间
public long timeToNextHeartbeat(long now) {
long timeSinceLastHeartbeat = now - Math.max(lastHeartbeatSend, lastSessionReset);
if (timeSinceLastHeartbeat > interval)
return 0;
else
return interval - timeSinceLastHeartbeat;
}
// 判断是否超时
public boolean sessionTimeoutExpired(long now) {
return now - Math.max(lastSessionReset, lastHeartbeatReceive) > timeout;
}
}
// 从服务端拉取消息
public class Fetcher<K, V> {
// 负责网络通信
private final ConsumerNetworkClient client;
// 服务端收到FetchRequest后并不是立即响应,当返回的消息积累到至少minBytes个字节才响应, 提高网络有效负载
// 服务端根据请求里的minBytes决定啥时候返回?
private final int minBytes;
// 等待FetchResponse的最长时长,服务端根据此事件决定何时响应
private final int maxWaitMs;
// 每次fetch的最大字节数
private final int fetchSize;
// 每次获取Record的最大数量
private final int maxPollRecords;
private final Metadata metadata; // Kafka集群元数据
private final SubscriptionState subscriptions; // 记录每个TopicPartition的消费情况
// FetchResponse先转换成CompletedFetch对象进入队列缓存,后续再解析响应消息
private final List<CompletedFetch> completedFetches;
private final Deserializer<K> keyDeserializer;
private final Deserializer<V> valueDeserializer;
// 保存了CompletedFetch解析后的结果集合
// CompletedFetch里的消息只是ByteBuffer,经过offset+size确定长度,然后反序列拿到实际结构消息
// 泛型的ConsumerRecord集合存放在nextInLineRecords里,也是最终KafkaConsumer返回的结果
private PartitionRecords<K, V> nextInLineRecords = null;
private static class PartitionRecords<K, V> {
private long fetchOffset; // 记录了records中的第一个消息的offset
private TopicPartition partition;
private List<ConsumerRecord<K, V>> records; // 消息集合
}
// 创建FetchRequest请求
private Map<Node, FetchRequest> createFetchRequests() {
Cluster cluster = metadata.fetch();
Map<Node, Map<TopicPartition, FetchRequest.PartitionData>> fetchable = new HashMap<>();
// 遍历可以fetch的partition
for (TopicPartition partition : fetchablePartitions()) {
// leader副本
Node node = cluster.leaderFor(partition);
if (node == null) {
metadata.requestUpdate();
}
// 如果leader副本对应的unsent或InFlightRequest队列里还有请求为发送
// 就不对这个node请求fetch消息
else if (this.client.pendingRequestCount(node) == 0) {
Map<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
if (fetch == null) {
fetch = new HashMap<>();
fetchable.put(node, fetch);
}
// 通过SubscriptionState 查找每个partition对应的position
// 然后封装成PartitionData对象
long position = this.subscriptions.position(partition);
fetch.put(partition, new FetchRequest.PartitionData(position, this.fetchSize));
}
}
Map<Node, FetchRequest> requests = new HashMap<>();
for (Map.Entry<Node, Map<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
// 将发往统一node的所有TopicPartition封装成一个FetchRequest对象
Node node = entry.getKey();
FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, entry.getValue());
requests.put(node, fetch);
}
return requests;
}
private Set<TopicPartition> fetchablePartitions() {
// 先获取consumer订阅的partition
Set<TopicPartition> fetchable = subscriptions.fetchablePartitions();
// 下面2个队列如果存在就说明已经fetch过了,不用再fetch了
if (nextInLineRecords != null && !nextInLineRecords.isEmpty())
fetchable.remove(nextInLineRecords.partition);
for (CompletedFetch completedFetch : completedFetches)
fetchable.remove(completedFetch.partition);
return fetchable;
}
// 发送fetch请求
public void sendFetches() {
for (Map.Entry<Node, FetchRequest> fetchEntry: createFetchRequests().entrySet()) {
final FetchRequest request = fetchEntry.getValue();
client.send(fetchEntry.getKey(), ApiKeys.FETCH, request)
.addListener(new RequestFutureListener<ClientResponse>() {
@Override
public void onSuccess(ClientResponse resp) {
FetchResponse response = new FetchResponse(resp.responseBody());
// 收到的FetchResponse缓存到completedFetches队里里
for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
TopicPartition partition = entry.getKey();
long fetchOffset = request.fetchData().get(partition).offset;
FetchResponse.PartitionData fetchData = entry.getValue();
completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator));
}
}
});
}
}
// 处理completedFetches队列里的缓存
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
if (this.subscriptions.partitionAssignmentNeeded()) {
// 需要进行rebalance,返回空
return Collections.emptyMap();
} else {
Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
int recordsRemaining = maxPollRecords; // 一次去除消息个最大个数
Iterator<CompletedFetch> completedFetchesIterator = completedFetches.iterator();
while (recordsRemaining > 0) {
// 先把completedFetches转移到nextInLineRecords里
if (nextInLineRecords == null || nextInLineRecords.isEmpty()) {
if (!completedFetchesIterator.hasNext())
break;
CompletedFetch completion = completedFetchesIterator.next();
completedFetchesIterator.remove();
nextInLineRecords = parseFetchedData(completion);
} else {
recordsRemaining -= append(drained, nextInLineRecords, recordsRemaining);
}
}
return drained;
}
}
// 解析CompletedFetch
private PartitionRecords<K, V> parseFetchedData(CompletedFetch completedFetch) {
TopicPartition tp = completedFetch.partition;
FetchResponse.PartitionData partition = completedFetch.partitionData;
long fetchOffset = completedFetch.fetchedOffset;
int bytes = 0;
int recordsCount = 0;
PartitionRecords<K, V> parsedRecords = null;
if (!subscriptions.isFetchable(tp)) {
} else if (partition.errorCode == Errors.NONE.code()) {
Long position = subscriptions.position(tp);
ByteBuffer buffer = partition.recordSet;
MemoryRecords records = MemoryRecords.readableRecords(buffer);
List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
boolean skippedRecords = false;
for (LogEntry logEntry : records) {
if (logEntry.offset() >= position) {
parsed.add(parseRecord(tp, logEntry));
bytes += logEntry.size();
} else {
// 忽略在本地记录offset之前的消息
skippedRecords = true;
}
}
recordsCount = parsed.size();
if (!parsed.isEmpty()) {
// 解析后的Record集合封装成PartitionRecords对象
parsedRecords = new PartitionRecords<>(fetchOffset, tp, parsed);
ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
}
}
return parsedRecords;
}
private int append(Map<TopicPartition, List<ConsumerRecord<K, V>>> drained, PartitionRecords<K, V> partitionRecords, int maxRecords) {
long position = subscriptions.position(partitionRecords.partition);
if (partitionRecords.fetchOffset == position) {
// 获取消息集合,最多maxRecords个
List<ConsumerRecord<K, V>> partRecords = partitionRecords.take(maxRecords);
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1; // 最后一个消息的offset
// 追加消息集合
List<ConsumerRecord<K, V>> records = drained.get(partitionRecords.partition);
if (records == null) {
records = partRecords;
drained.put(partitionRecords.partition, records);
} else {
records.addAll(partRecords);
}
// 更新对应topicPartition的position
subscriptions.position(partitionRecords.partition, nextOffset);
return partRecords.size();
}
return 0;
}
// 重置TopicPartition的position
public void updateFetchPositions(Set<TopicPartition> partitions) {
for (TopicPartition tp : partitions) {
if (!subscriptions.isAssigned(tp) || subscriptions.isFetchable(tp))
continue; // 如果consumer有position就跳过重置
if (subscriptions.isOffsetResetNeeded(tp)) {
// 按照指定的策略重置position
resetOffset(tp);
} else if (subscriptions.committed(tp) == null) {
// consumer没有commit的offset,按缺省策略重置
subscriptions.needOffsetReset(tp);
resetOffset(tp);
} else {
// 否则就将position更新为commit的offset
long committed = subscriptions.committed(tp).offset();
subscriptions.seek(tp, committed);
}
}
}
private void resetOffset(TopicPartition partition) {
OffsetResetStrategy strategy = subscriptions.resetStrategy(partition);
final long timestamp;
if (strategy == OffsetResetStrategy.EARLIEST)
timestamp = ListOffsetRequest.EARLIEST_TIMESTAMP;
else if (strategy == OffsetResetStrategy.LATEST)
timestamp = ListOffsetRequest.LATEST_TIMESTAMP;
else
throw new NoOffsetForPartitionException(partition);
// 根据时间戳向分区的leader node发送OffsetRequest
long offset = listOffset(partition, timestamp);
if (subscriptions.isAssigned(partition))
this.subscriptions.seek(partition, offset); // 更新position
}
}
网友评论