美文网首页
Kafka源码分析-Consumer(9)-offset操作

Kafka源码分析-Consumer(9)-offset操作

作者: 陈阳001 | 来源:发表于2018-12-31 15:31 被阅读0次

提交offset

通过前面的分析,我们知道了Rebalance操作的原理和实现。消费者正常消费和Rebalance操作开始之前,都会提交一次offset来记录Consumer当前的消费位置。提交offset的功能也是由ConsumerCoordinator实现的。
先了解下OffsetCommitRequest和OffsetCommitResponse的消息体格式:


OffsetCommitRequest消息体格式.jpg offsetCommit Response.jpg

OffsetCommitRequest消息体各个字段的含义:

名称 类型 含义
group_id String Consumer Group的Id
group_generation_id int 消费者保存的年代信息
member_id String GroupCoordinator分配给消费者的id
retention_time long 此offset的最长保存时间
topic String topic名称
partition int 分区编号
offset long 提交的信息offset
metadata String 任何希望与offset一起保存的自定义数据

OffsetCommitResponse消息体各个字段的含义:

名称 类型 含义
topic String topic名称
partition int 分区编号
error_code short 错误码

ConsumerCoordinator中与提交offset相关四个方法以及它们之间的调用关系:
类:ConsumerCoordinator
方法:
1)void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback)
2)void commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets)
3)RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets)

  1. void maybeAutoCommitOffsetsSync()


    image.png

    在SubscriptionState中使用TopicPartitionState记录了每个TopicPartition的消费情况,TopicPartitionState.position字段则记录了消费者下次要从服务端获取的消息的offset。当没有明确指定待提交的offset值时,则将TopicPartitionState.position作为待提交offset,组织成集合,形成ConsumerCoordinator.commitOffset*()方法的第一个参数。

private void doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
        //将needsFetchCommittedOffsets设置为true。
        this.subscriptions.needRefreshCommits();
        //创建并缓存OffsetCommitRequest请求,逻辑和之前的JoinGroupRequest和SyncGroupReqeust类似
        //区别是使用OffsetCommitResponseHandler 处理 OffsetCommitResponse

        RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
        final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;//选择回调函数
        future.addListener(new RequestFutureListener<Void>() {
            @Override
            public void onSuccess(Void value) {
                if (interceptors != null)
                    interceptors.onCommit(offsets);
                cb.onComplete(offsets, null);//调用回调
            }
            @Override
            public void onFailure(RuntimeException e) {
                if (e instanceof RetriableException) {
                    cb.onComplete(offsets, new RetriableCommitFailedException(e));
                } else {
                    cb.onComplete(offsets, e);//调用回调
                }
            }
        });
    }

commitOffsetsSync()方法与commitOffsetsAsync()方法的实现类似,也是调用sendOffsetCommitRequest()方法创建并缓存OffsetCommitRequest,使用OffsetCommitResponseHandler 处理 OffsetCommitResponse。但是有两点不同:

  • commitOffsetsSync()方法在发送OffsetCommitRequest时使用了ConsumerNetworkClient.poll(future)阻塞等待OffsetCommitResponse处理完成,这样才实现了同步提交的功能。
  • commitOffsetsSync()方法在检测到RetriableException异常时会进行重试。maybeAutoCommitOffsetsSync()方法会根据enable.auto.commit配置项决定是否调用commitOffsetsSync()方法。
    AutoCommitTask是一个定时任务,周期地调用commitOffsetAsync()方法,实现了自动提交Offset的功能。开启自动提交Offset后。业务逻辑线程不用手动调用commitOffsets*()方法提交Offset了。
    OffsetCommitResponseHandler.handle()方法是处理OffsetCommitResponse的入口,代码分析:
private class OffsetCommitResponseHandler extends CoordinatorResponseHandler<OffsetCommitResponse, Void> {

        private final Map<TopicPartition, OffsetAndMetadata> offsets;

        public OffsetCommitResponseHandler(Map<TopicPartition, OffsetAndMetadata> offsets) {
            this.offsets = offsets;
        }

        @Override
        public OffsetCommitResponse parse(ClientResponse response) {
            return new OffsetCommitResponse(response.responseBody());
        }

        @Override
        public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) {
            sensors.commitLatency.record(response.requestLatencyMs());
            Set<String> unauthorizedTopics = new HashSet<>();
            //遍历待提交的所有Offset信息
            for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) {
                TopicPartition tp = entry.getKey();
                OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp);
                long offset = offsetAndMetadata.offset();

                Errors error = Errors.forCode(entry.getValue());//获取错误码
                if (error == Errors.NONE) {
                    log.debug("Group {} committed offset {} for partition {}", groupId, offset, tp);
                    if (subscriptions.isAssigned(tp))
                        // update the local cache only if the partition is still assigned
                        // 更新SubscriptionState中对应TopicPartitionState的committed字段
                        subscriptions.committed(tp, offsetAndMetadata);
                } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                    log.error("Not authorized to commit offsets for group {}", groupId);
                    future.raise(new GroupAuthorizationException(groupId));
                    return;
                } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
                    unauthorizedTopics.add(tp.topic());
                } else if (error == Errors.OFFSET_METADATA_TOO_LARGE
                        || error == Errors.INVALID_COMMIT_OFFSET_SIZE) {
                    // raise the error to the user
                    log.debug("Offset commit for group {} failed on partition {}: {}", groupId, tp, error.message());
                    future.raise(error);
                    return;
                } else if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
                    // just retry
                    log.debug("Offset commit for group {} failed: {}", groupId, error.message());
                    future.raise(error);
                    return;
                } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
                        || error == Errors.NOT_COORDINATOR_FOR_GROUP
                        || error == Errors.REQUEST_TIMED_OUT) {
                    log.debug("Offset commit for group {} failed: {}", groupId, error.message());
                    coordinatorDead();
                    future.raise(error);
                    return;
                } else if (error == Errors.UNKNOWN_MEMBER_ID
                        || error == Errors.ILLEGAL_GENERATION
                        || error == Errors.REBALANCE_IN_PROGRESS) {
                    // need to re-join group
                    log.debug("Offset commit for group {} failed: {}", groupId, error.message());
                    subscriptions.needReassignment();
                    future.raise(new CommitFailedException("Commit cannot be completed since the group has already " +
                            "rebalanced and assigned the partitions to another member. This means that the time " +
                            "between subsequent calls to poll() was longer than the configured session.timeout.ms, " +
                            "which typically implies that the poll loop is spending too much time message processing. " +
                            "You can address this either by increasing the session timeout or by reducing the maximum " +
                            "size of batches returned in poll() with max.poll.records."));
                    return;
                } else {
                    log.error("Group {} failed to commit partition {} at offset {}: {}", groupId, tp, offset, error.message());
                    future.raise(new KafkaException("Unexpected error in commit: " + error.message()));
                    return;
                }
            }

            if (!unauthorizedTopics.isEmpty()) {
                log.error("Not authorized to commit to topics {} for group {}", unauthorizedTopics, groupId);
                future.raise(new TopicAuthorizationException(unauthorizedTopics));
            } else {
                future.complete(null);
            }
        }
    }

fetch offset

在Rebalance操作结束之后,每个消费者都确定了其需要消费的分区。在开始消费之前,消费者需要确定拉取的起止位置。假设之前已经将最后的消费位置提交到了GroupCoordinator,GroupCoordinator将其保存到了Kafka内部的Offsets Topic中,此时消费者可以通过OffsetFetchRequest请求获取上次提交offset并从此处继续消费。
OffsetFetchRequest和OffsetFetchResponse的消息体格式如下:


OffsetFetch Request.jpg OffsetFetch Response.jpg

ConsumerCoordinator中与fetch offset相关的方法及其调用关系:

1.void refreshCommittedOffsetsIfNeeded()
2.public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(Set<TopicPartition> partitions)
3.private RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchRequest(Set<TopicPartition> partitions)
refreshCommittedOffsetsIfNeeded()方法的主要功能是发送OffsetFetchRequest请求,从服务端拉取最近提交的offset集合,并更新到Subscriptions集合中。具体实现:

/**
     * Refresh the committed offsets for provided partitions.
     */
    public void refreshCommittedOffsetsIfNeeded() {
        if (subscriptions.refreshCommitsNeeded()) {//检查needsFetchCommittedOffsets
            //发送OffsetFetchRequest并处理OffsetFetchResponse响应。返回值是最近提交的offset集合
            Map<TopicPartition, OffsetAndMetadata> offsets = fetchCommittedOffsets(subscriptions.assignedPartitions());
            //处理offsets集合,更新对应的TopicPartitionState的committed字段中
            for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
                TopicPartition tp = entry.getKey();
                // verify assignment is still active
                if (subscriptions.isAssigned(tp))
                    this.subscriptions.committed(tp, entry.getValue());
            }
            //将needsFetchCommittedOffsets设置为false,OffsetFetchRequest处理结束。
            this.subscriptions.commitsRefreshed();
        }
    }

/**
     * Fetch the current committed offsets from the coordinator for a set of partitions.
     * @param partitions The partitions to fetch offsets for
     * @return A map from partition to the committed offset
     */
    public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(Set<TopicPartition> partitions) {
        while (true) {
            ensureCoordinatorReady();//检测GroupCoordinator的状态。

            // contact coordinator to fetch committed offsets
            //创建并缓存OffsetFetchRequest请求
            RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future = sendOffsetFetchRequest(partitions);
            client.poll(future);//阻塞发送OffsetFetchRequest请求

            if (future.succeeded())
                return future.value();//返回服务端得到Offset
            //如果是RetriableException异常,则避退一段时间,重试
            if (!future.isRetriable())
                throw future.exception();
            time.sleep(retryBackoffMs);
        }
    }

处理OffsetFetchResponse的入口OffsetFetchResponseHandler.handle(),代码实现:

 private class OffsetFetchResponseHandler extends CoordinatorResponseHandler<OffsetFetchResponse, Map<TopicPartition, OffsetAndMetadata>> {

        @Override
        public OffsetFetchResponse parse(ClientResponse response) {
            return new OffsetFetchResponse(response.responseBody());
        }

        @Override
        public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
            //记录从服务端获取的offset的集合
            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(response.responseData().size());
            //处理OffsetFetchResponse中的每个分区
            for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) {
                TopicPartition tp = entry.getKey();
                OffsetFetchResponse.PartitionData data = entry.getValue();
                if (data.hasError()) {
                    Errors error = Errors.forCode(data.errorCode);
                    log.debug("Group {} failed to fetch offset for partition {}: {}", groupId, tp, error.message());

                    if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
                        // just retry
                        future.raise(error);
                    } else if (error == Errors.NOT_COORDINATOR_FOR_GROUP) {
                        // re-discover the coordinator and retry
                        coordinatorDead();
                        future.raise(error);
                    } else if (error == Errors.UNKNOWN_MEMBER_ID
                            || error == Errors.ILLEGAL_GENERATION) {
                        // need to re-join group
                        subscriptions.needReassignment();
                        future.raise(error);
                    } else {
                        future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message()));
                    }
                    return;
                } else if (data.offset >= 0) {
                    // record the position with the offset (-1 indicates no committed offset to fetch)
                    offsets.put(tp, new OffsetAndMetadata(data.offset, data.metadata));
                } else {
                    log.debug("Group {} has no committed offset for partition {}", groupId, tp);
                }
            }
            //传播offsets集合,最终通过fetchCommittedOffsets()方法返回
            future.complete(offsets);
        }
    }

相关文章

网友评论

      本文标题:Kafka源码分析-Consumer(9)-offset操作

      本文链接:https://www.haomeiwen.com/subject/hztwlqtx.html