* A client that consumes records from a Kafka cluster.
* kafka客户端从kafka集群消费消息(获取消息)
* This client transparently handles the failure of Kafka brokers, and transparently adapts as topic partitions it fetches migrate within the cluster. This client also interacts with the broker to allow groups of consumers to load balance consumption using consumer groups.
* 该客户端透明地处理Kafka broker的故障,并透明地适应它在集群内获取远处的主题分区。该客户端还与Kafka broker交互为了使用许消费者组使用 消费者组 来负载均衡消费。
* The consumer maintains TCP connections to the necessary brokers to fetch data Failure to close the consumer after use will leak these connections.The consumer is not thread-safe. See Multi-threaded Processing for more details.
* Cross-Version Compatibility
* This client can communicate with brokers that are version 0.10.0 or newer. Older or newer brokers may not support certain features. For example, 0.10.0 brokers do not support offsetsForTimes, because this feature was added in version 0.10.1. You will receive an {@link org.apache.kafka.common.errors.UnsupportedVersionException} when invoking an API that is not available on the running broker version.
*消费者维护TCP连接到brokers去获取数据,在使用消费者后,关闭消费者,如果消费者关闭失败,那么这些链接将泄漏。消费者不是线程安全的。 有关更多详细信息,请参阅 多线程处理。
* 跨版本兼容性
*此客户端可以与0.10.0或更新版本的代理进行通信。 较早或较新的brokers可能不支持某些功能。 例如,0.10.0代理不支持 offsetsForTimes,因为此功能是在版本0.10.1中添加的。 调用正在运行的 broker 版本上不可用的API时,您将收到{@link org.apache.kafka.common.errors.UnsupportedVersionException}。
*and Consumer Position
* Kafka maintains a numerical offset for each record in a partition. This offset acts as a unique identifier of a record within that partition, and also denotes the position of the consumer in the partition. For example, a consumer which is at position 5 has consumed records with offsets 0 through 4 and will next receive the record with offset 5. There are actually two notions of position relevant to the user of the consumer:
*偏移和消费者的位置
* Kafka为分区中的每条记录保留一个数字偏移量。 该偏移量用作该分区内记录的唯一标识符,也表示消费者在分区中的位置。 例如,处于位置5的消费者已经消耗了具有偏移量0到4的记录,并且将接下来以偏移量5接收该记录。实际上有两个与消费者有关的位置的概念:一个是消费者所处的位置一个是消费者已经消费了的位置
* The {@link #position(TopicPartition) position} of the consumer gives the offset of the next record that will be given out. It will be one larger than the highest offset the consumer has seen in that partition. It automatically advances every time the consumer receives messages in a call to {@link #poll(long)}.
*消费者的{@link #position(TopicPartition)position}给出下一个记录的偏移量。 这比消费者在该分区中看到的最高偏移大一点。 每次消费者在调用{@link #poll(long)}接收到消息时,它都会自动前进。
* The {@link #commitSync() committed position} is the last offset that has been stored securely. Should the process fail and restart, this is the offset that the consumer will recover to. The consumer can either automatically commit offsets periodically; or it can choose to control this committed position manually by calling one of the commit APIs (e.g. {@link #commitSync() commitSync} and {@link #commitAsync(OffsetCommitCallback) commitAsync}).
* {@link #commitSync()提交位置}是安全存储的最后一个偏移量。 如果进程失败并重新启动,这是个位置是消费者将要继续使用的继续提交位置。 用户也可以选择通过调用其中一个提交API(例如{@link #commitSync()commitSync}和{@link #commitAsync(OffsetCommitCallback)commitAsync})来手动控制此提交位置。
* This distinction gives the consumer control over when a record is considered consumed. It is discussed in further detail below. Consumer Groups and Topic Subscriptions
*Kafka uses the concept of consumer groups to allow a pool of processes to divide the work of consuming and processing records. These processes can either be running on the same machine or they can be distributed over many machines to provide scalability and fault tolerance for processing. All consumer instances sharing the same group.id will be part of the same consumer group.
*这种由消费者控制消费记录的机制,将在下面进一步详细讨论。
*卡夫卡使用消费者组和来划分消费和处理记录的工作。 这些进程既可以在同一台机器上运行,也可以为了更好的处理可扩展性和容错性,将其分布在多台机器上。 所有共享同一个group.id 的消费者实例属于同一个消费者组。
* Each consumer in a group can dynamically set the list of topics it wants to subscribe to through one of the {@link #subscribe(Collection, ConsumerRebalanceListener) subscribe} APIs. Kafka will deliver each message in the subscribed topics to one process in each consumer group. This is achieved by balancing the partitions between all members in the consumer group so that each partition is assigned to exactly one consumer in the group. So if there is a topic with four partitions, and a consumer group with two processes, each process would consume from two partitions.
*消费者组中的每个消费者可以通过{@link #subscribe(Collection,ConsumerRebalanceListener)subscribe} API之一,动态设置想要订阅的主题列表。 Kafka将把消费者组订阅的主题,的每条消息传递给每个消费者组中的一个进程。 这是通过平衡消费者组中的所有成员之间的分区来实现的,以便每个分区恰好分配给组中的一个消费者。 因此,如果有一个包含四个分区的主题和一个包含两个进程的消费者组,则每个进程将从两个分区中获取消息(如:一个主题有4个分区,同时订阅这个主题的消费者组有两个消费进程。那么每个消费进程从连个分区获取记录)。
* Membership in a consumer group is maintained dynamically: if a process fails, the partitions assigned to it will be reassigned to other consumers in the same group. Similarly, if a new consumer joins the group, partitions will be moved from existing consumers to the new one. This is known as rebalancing the group and is discussed in more detail below. Group rebalancing is also used when new partitions are added to one of the subscribed topics or when a new topic matching a {@link #subscribe(Pattern, ConsumerRebalanceListener) subscribed regex} is created. The group will automatically detect the new partitions through periodic metadata refreshes and assign them to members of the group.
*消费者组动态维护其成员:如果这个消费者组的消费者进程失败,分配给它的分区将被重新分配给同一组中的其他使用者(消费者进程)。同样,如果新的消费者加入该组,分区将从现有的消费者移动到新的消费者(添加新的消费者到消费者组时,分区将重新分配)。这被称为“重新平衡”组,并在下面进行更详细的讨论。当生成一个新的分区或者一个新的主题时,也会使用“重新平衡”组,,该消费者组定期刷新来自动检测新分区,并将其分配给组成员。
* Conceptually you can think of a consumer group as being a single logical subscriber that happens to be made up of multiple processes. As a multi-subscriber system, Kafka naturally supports having any number of consumer groups for a given topic without duplicating data (additional consumers are actually quite cheap).
从概念上讲,你可以把一个消费者组看作是一个由多个进程组成的单个逻辑用户。 作为一个多用户系统,Kafka支持为给定的主题提供任意数量的用户组,而不需要复制数据(添加新的用户实际上相当简单)。
* This is a slight generalization of the functionality that is common in messaging systems. To get semantics similar to a queue in a traditional messaging system all processes would be part of a single consumer group and hence record delivery would be balanced over the group like with a queue. Unlike a traditional messaging system, though, you can have multiple such groups. To get semantics similar to pub-sub in a traditional messaging system each process would have its own consumer group, so each process would subscribe to all the records published to the topic.
* In addition, when group reassignment happens automatically, consumers can be notified through a {@link ConsumerRebalanceListener}, which allows them to finish necessary application-level logic such as state cleanup, manual offset commits, etc. See Storing Offsets Outside Kafka for more details.
*此外,当分组重新分配自动发生时,消费者可以通过{@Link ConsumerRebalanceListener}来获取通知,这允许消费者完成必要的应用程序级逻辑,例如状态清除,手动偏移提交等。参见在卡夫卡外存储偏移量以获取更多详细信息。
* It is also possible for the consumer to manually assign specific partitions (similar to the older "simple" consumer) using {@link #assign(Collection)}. In this case, dynamic partition assignment and consumer group coordination will be disabled.Detecting Consumer Failures After subscribing to a set of topics, the consumer will automatically join the group when {@link #poll(long)} is invoked. The poll API is designed to ensure consumer liveness. As long as you continue to call poll, the consumer will stay in the group and continue to receive messages from the partitions it was assigned. Underneath the covers, the consumer sends periodic heartbeats to the server. If the consumer crashes or is unable to send heartbeats for a duration of session.timeout.ms, then the consumer will be considered dead and its partitions will be reassigned.
*我们也可以手动为消费者分配分区。 当使用手动分区时,动态分区分配和消费者组协调将被禁用。消费者订阅了一组主题,当{@link #poll(long)}被调用时,消费者将自动加入该消费者组。poll (long ) API旨在确保消费者存活。 只要您继续调用poll (long ),消费者就会留在消费者组中,并继续从分配的分区接收消息。 在这个方法的底层,消费者定期发送心跳到中心服务器。 如果消费者在session.timeout.ms 期间内崩溃或无法发送心跳,那么消费者将被视为死亡,其分区将被重新分配。
* It is also possible that the consumer could encounter a "livelock" situation where it is continuing to send heartbeats, but no progress is being made. To prevent the consumer from holding onto its partitions indefinitely in this case, we provide a liveness detection mechanism using the max.poll.interval.ms setting. Basically if you don't call poll at least as frequently as the configured max interval, then the client will proactively leave the group so that another consumer can take over its partitions. When this happens, you may see an offset commit failure (as indicated by a {@link CommitFailedException} thrown from a call to {@link #commitSync()}).This is a safety mechanism which guarantees that only active members of the group are able to commit offsets. So to stay in the group, you must continue to call poll.
*消费者也有可能遇到“活锁”的情况,即持续发送心跳,但没有消费任何数据。 为了防止消费者在这种情况下无限期地保持其分区,我们使用max.poll.interval.ms 设置提供活跃检测机制。 基本上,如果最大时间间隔max.poll.interval.ms没有调用poll函数来消费消息,则客户端将主动离开组,以便另一个消费者可以接管其分区。 发生这种情况时,您可能会看到一个偏移提交失败(如通过调用{@link #commitSync()})抛出的{@link CommitFailedException}所示。这是一个安全机制,它保证只有组中存活的成员可以提交偏移。 所以为了要留在消费者组中,你必须持续调用poll。
* The consumer provides two configuration settings to control the behavior of the poll loop: max.poll.interval.ms: By increasing the interval between expected polls, you can give the consumer more time to handle a batch of records returned from {@link #poll(long)}. The drawback is that increasing this value may delay a group rebalance since the consumer will only join the rebalance inside the call to poll. You can use this setting to bound the time to finish a rebalance, but you risk slower progress if the consumer cannot actually call {@link #poll(long) poll} often enough. max.poll.records: Use this setting to limit the total records returned from a single call to poll. This can make it easier to predict the maximum that must be handled within each poll interval. By tuning this value, you may be able to reduce the poll interval, which will reduce the impact of group rebalancing.
*消费者通过两个配置设置来控制循环调用POLL的行为:max.poll.interval.ms :通过增加预期轮询之间的时间间隔,您可以给消费者 有更多时间来处理{@link #poll(long)}返回的一批记录。 缺点是增加此值可能会延迟群组重新平衡,因为消费者只会在调用内部加入重新平衡。 您可以使用此设置来限定完成重新平衡所需的时间,但如果消费者不能实际经常调用{@link #poll(long)poll},则风险会进一步降低。
max。 poll.records :使用此设置可将单个调用返回的记录总数限制为轮询。 这可以更容易地预测每个轮询间隔内必须处理的最大值。 通过调整此值,您可能会减少轮询间隔,这将减少组重新平衡的影响。
网友评论