1. 订阅主题
public void subscribe(Collection<String> topics) {
subscribe(topics, new NoOpConsumerRebalanceListener());
}
@Override
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
acquireAndEnsureOpen();//获取锁并保证consumer没有关闭
try {
if (topics == null) {//如果订阅主题为null
throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
} else if (topics.isEmpty()) {//如果订阅主题为空,则表示取消订阅的所有主题
// treat subscribing to empty topic list as the same as unsubscribing
this.unsubscribe();//取消订阅主题
} else {
for (String topic : topics) {
if (topic == null || topic.trim().isEmpty())//判断topic是不是为null或“”
throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
}
throwIfNoAssignorsConfigured();
log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
this.subscriptions.subscribe(new HashSet<>(topics), listener);
metadata.setTopics(subscriptions.groupSubscription());
}
} finally {
release();//释放锁
}
}
@Override
public void subscribe(Pattern pattern) {
subscribe(pattern, new NoOpConsumerRebalanceListener());
}
@Override
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
if (pattern == null)
throw new IllegalArgumentException("Topic pattern to subscribe to cannot be null");
acquireAndEnsureOpen();
try {
throwIfNoAssignorsConfigured();
log.debug("Subscribed to pattern: {}", pattern);
this.subscriptions.subscribe(pattern, listener);
this.metadata.needMetadataForAllTopics(true);
this.coordinator.updatePatternSubscription(metadata.fetch());
this.metadata.requestUpdate();
} finally {
release();//释放锁
}
}
其中: acquireAndEnsureOpen() 方法源码如下:
/**
* 获取轻量级的锁,并且保证 consumer没有被关闭.
* @throws IllegalStateException 如果consumer关闭,抛出IllegalStateException异常
*/
private void acquireAndEnsureOpen() {
acquire();//获取锁
if (this.closed) {
release();//释放锁
throw new IllegalStateException("This consumer has already been closed.");
}
}
private static final long NO_CURRENT_THREAD = -1L;
private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
private final AtomicInteger refcount = new AtomicInteger(0);
/**
* 获取锁,在多线程访问的情况下保护 consumer。当锁不可用的时候不是阻塞,而是抛出一个异常(并发修改异常:ConcurrentModificationException)
* @throws ConcurrentModificationException 如果另外一个线程持有了锁
*/
private void acquire() {
long threadId = Thread.currentThread().getId();//获取当前线程的id
if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
refcount.incrementAndGet();
}
/**
* 释放锁
*/
private void release() {
if (refcount.decrementAndGet() == 0)
currentThread.set(NO_CURRENT_THREAD);
}
throwIfNoAssignorsConfigured();源码如下:
private List<PartitionAssignor> assignors;
private void throwIfNoAssignorsConfigured() {
if (assignors.isEmpty())//如果分配的Partition为空,抛出异常 IllegalStateException
throw new IllegalStateException("Must configure at least one partition assigner class name to " +
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " configuration property");
}
ConsumerRebalanceListener源码如下:
package org.apache.kafka.clients.consumer;
import java.util.Collection;
import org.apache.kafka.common.TopicPartition;
/**
* A callback interface that the user can implement to trigger custom actions
* <p>
* This is applicable when the consumer is having Kafka auto-manage group membership. If the consumer directly assigns partitions,
* those partitions will never be reassigned and this callback is not applicable.
* <p>
* When Kafka is managing the group membership, a partition re-assignment will be triggered any time the members of the group change or the subscription
* of the members changes. This can occur when processes die, new process instances are added or old instances come back to life after failure.
* Rebalances can also be triggered by changes affecting the subscribed topics (e.g. when the number of partitions is
* administratively adjusted).
* <p>
* There are many uses for this functionality. One common use is saving offsets in a custom store. By saving offsets in
* the {@link #onPartitionsRevoked(Collection)} call we can ensure that any time partition assignment changes
* the offset gets saved.
* <p>
* Another use is flushing out any kind of cache of intermediate results the consumer may be keeping. For example,
* consider a case where the consumer is subscribed to a topic containing user page views, and the goal is to count the
* number of page views per user for each five minute window. Let's say the topic is partitioned by the user id so that
* all events for a particular user go to a single consumer instance. The consumer can keep in memory a running
* tally of actions per user and only flush these out to a remote data store when its cache gets too big. However if a
* partition is reassigned it may want to automatically trigger a flush of this cache, before the new owner takes over
* consumption.
* <p>
* This callback will only execute in the user thread as part of the {@link Consumer#poll(java.time.Duration) poll(long)} call
* whenever partition assignment changes.
* <p>
* It is guaranteed that all consumer processes will invoke {@link #onPartitionsRevoked(Collection) onPartitionsRevoked} prior to
* any process invoking {@link #onPartitionsAssigned(Collection) onPartitionsAssigned}. So if offsets or other state is saved in the
* {@link #onPartitionsRevoked(Collection) onPartitionsRevoked} call it is guaranteed to be saved by the time the process taking over that
* partition has their {@link #onPartitionsAssigned(Collection) onPartitionsAssigned} callback called to load the state.
* <p>
* Here is pseudo-code for a callback implementation for saving offsets:
* <pre>
* {@code
* public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
* private Consumer<?,?> consumer;
*
* public SaveOffsetsOnRebalance(Consumer<?,?> consumer) {
* this.consumer = consumer;
* }
*
* public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
* // save the offsets in an external store using some custom code not described here
* for(TopicPartition partition: partitions)
* saveOffsetInExternalStore(consumer.position(partition));
* }
*
* public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
* // read the offsets from an external store using some custom code not described here
* for(TopicPartition partition: partitions)
* consumer.seek(partition, readOffsetFromExternalStore(partition));
* }
* }
* }
* </pre>
*/
public interface ConsumerRebalanceListener {
/**
* A callback method the user can implement to provide handling of offset commits to a customized store on the start
* of a rebalance operation. This method will be called before a rebalance operation starts and after the consumer
* stops fetching data. It is recommended that offsets should be committed in this callback to either Kafka or a
* custom offset store to prevent duplicate data.
* <p>
* For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer}
* <p>
* <b>NOTE:</b> This method is only called before rebalances. It is not called prior to {@link KafkaConsumer#close()}.
* <p>
* It is common for the revocation callback to use the consumer instance in order to commit offsets. It is possible
* for a {@link org.apache.kafka.common.errors.WakeupException} or {@link org.apache.kafka.common.errors.InterruptException}
* to be raised from one these nested invocations. In this case, the exception will be propagated to the current
* invocation of {@link KafkaConsumer#poll(java.time.Duration)} in which this callback is being executed. This means it is not
* necessary to catch these exceptions and re-attempt to wakeup or interrupt the consumer thread.
*
* @param partitions The list of partitions that were assigned to the consumer on the last rebalance
* @throws org.apache.kafka.common.errors.WakeupException If raised from a nested call to {@link KafkaConsumer}
* @throws org.apache.kafka.common.errors.InterruptException If raised from a nested call to {@link KafkaConsumer}
*/
void onPartitionsRevoked(Collection<TopicPartition> partitions);
/**
* A callback method the user can implement to provide handling of customized offsets on completion of a successful
* partition re-assignment. This method will be called after the partition re-assignment completes and before the
* consumer starts fetching data, and only as the result of a {@link Consumer#poll(java.time.Duration) poll(long)} call.
* <p>
* It is guaranteed that all the processes in a consumer group will execute their
* {@link #onPartitionsRevoked(Collection)} callback before any instance executes its
* {@link #onPartitionsAssigned(Collection)} callback.
* <p>
* It is common for the assignment callback to use the consumer instance in order to query offsets. It is possible
* for a {@link org.apache.kafka.common.errors.WakeupException} or {@link org.apache.kafka.common.errors.InterruptException}
* to be raised from one these nested invocations. In this case, the exception will be propagated to the current
* invocation of {@link KafkaConsumer#poll(java.time.Duration)} in which this callback is being executed. This means it is not
* necessary to catch these exceptions and re-attempt to wakeup or interrupt the consumer thread.
*
* @param partitions The list of partitions that are now assigned to the consumer (may include partitions previously
* assigned to the consumer)
* @throws org.apache.kafka.common.errors.WakeupException If raised from a nested call to {@link KafkaConsumer}
* @throws org.apache.kafka.common.errors.InterruptException If raised from a nested call to {@link KafkaConsumer}
*/
void onPartitionsAssigned(Collection<TopicPartition> partitions);
}
其中: NoOpConsumerRebalanceListener 源码如下:(方法是空实现)
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
import java.util.Collection;
public class NoOpConsumerRebalanceListener implements ConsumerRebalanceListener {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}
}
示例:
//订阅主题,参数是集合
consumer.subscribe(Collections.singletonList(topic));
//订阅主题,参数是郑泽表达式
consumer.subscribe(Pattern.compile("topic-.*"));
2. 订阅主题的特定分区
方法(public void assign(Collection<TopicPartition> partitions) )如下:
/**
* 1.手动为该consumer分配一个patitions集合,该接口不允许增加分配,而是替代之前的分配(如果有的话)
* 2.如果给定的参数集合为空,相当于取消订阅 unsubscribe()
* 3. 通过该方法手动分配的主题不会使用consumer的group 管理功能。这样,当group成员或者集群和主题改变的时候,就没有rebalance操作触发
*
* 注意: assign(Collection) 和 group assignment with {@link #subscribe(Collection, ConsumerRebalanceListener) 不可能同时使用。
* <p>
* If auto-commit is enabled, an async commit (based on the old assignment) will be triggered before the new
* assignment replaces the old one.
*
* @param partitions The list of partitions to assign this consumer
* @throws IllegalArgumentException 如果 partitions 为null
* @throws IllegalStateException 先调用了subscribe(),而没有紧接着调用unsubscribe()
*/
@Override
public void assign(Collection<TopicPartition> partitions) {
acquireAndEnsureOpen();//获取锁,确保客户端打开
try {
if (partitions == null) {//如果集合为空
throw new IllegalArgumentException("Topic partition collection to assign to cannot be null");
} else if (partitions.isEmpty()) {
this.unsubscribe();//取消订阅
} else {
Set<String> topics = new HashSet<>();//这里不保证有序
for (TopicPartition tp : partitions) {
String topic = (tp != null) ? tp.topic() : null;
if (topic == null || topic.trim().isEmpty())
//如果topic有null,或者“”,或者" "
throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
topics.add(topic);//添加到集合
}
// make sure the offsets of topic partitions the consumer is unsubscribing from
// are committed since there will be no following rebalance
this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", "));
//Change the assignment to the specified partitions provided by the user
this.subscriptions.assignFromUser(new HashSet<>(partitions));
metadata.setTopics(topics);
}
} finally {
release();//释放锁
}
}
其中:TopicPartition 如下:
package org.apache.kafka.common;
import java.io.Serializable;
/**
* A topic name and partition number
*/
public final class TopicPartition implements Serializable {
private int hash = 0;//哈希
private final int partition;//分区
private final String topic;//主题
public TopicPartition(String topic, int partition) {
this.partition = partition;
this.topic = topic;
}
public int partition() {
return partition;
}
public String topic() {
return topic;
}
@Override
public int hashCode() {
if (hash != 0)
return hash;
final int prime = 31;
int result = 1;
result = prime * result + partition;
result = prime * result + ((topic == null) ? 0 : topic.hashCode());
this.hash = result;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
TopicPartition other = (TopicPartition) obj;
if (partition != other.partition)
return false;
if (topic == null) {
if (other.topic != null)
return false;
} else if (!topic.equals(other.topic))
return false;
return true;
}
@Override
public String toString() {
return topic + "-" + partition;
}
}
示例:
consumer.assign(Arrays.asList(new TopicPartition("topic-demo",0)));
我们事先不知道主题有多少个分区怎么办?
3. 查询指定主题的元数据信息
使用如下方法:
public List<PartitionInfo> partitionsFor(String topic)
public List<PartitionInfo> partitionsFor(String topic, Duration timeout)
源码如下:
//Get metadata about the partitions for a given topic.
//timeout: The maximum of time to await topic metadata
public List<PartitionInfo> partitionsFor(String topic, Duration timeout) {
acquireAndEnsureOpen();
long timeoutMs = timeout.toMillis();
try {
Cluster cluster = this.metadata.fetch();
List<PartitionInfo> parts = cluster.partitionsForTopic(topic);
if (!parts.isEmpty())
return parts;
Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(
new MetadataRequest.Builder(Collections.singletonList(topic), true), timeoutMs);
return topicMetadata.get(topic);
} finally {
release();
}
}
其中:PartitionInfo 类的源码如下:
package org.apache.kafka.common;
/**
* This is used to describe per-partition state in the MetadataResponse.
*/
public class PartitionInfo {
private final String topic;
private final int partition;
private final Node leader;
private final Node[] replicas; //AR
private final Node[] inSyncReplicas;//ISR
private final Node[] offlineReplicas;//OSR
// Used only by tests
public PartitionInfo(String topic, int partition, Node leader, Node[] replicas, Node[] inSyncReplicas) {
this(topic, partition, leader, replicas, inSyncReplicas, new Node[0]);
}
public PartitionInfo(String topic, int partition, Node leader, Node[] replicas, Node[] inSyncReplicas, Node[] offlineReplicas) {
this.topic = topic;
this.partition = partition;
this.leader = leader;
this.replicas = replicas;
this.inSyncReplicas = inSyncReplicas;
this.offlineReplicas = offlineReplicas;
}
/**
* The topic name
*/
public String topic() {
return topic;
}
/**
* The partition id
*/
public int partition() {
return partition;
}
/**
* The node id of the node currently acting as a leader for this partition or null if there is no leader
*/
public Node leader() {
return leader;
}
/**
* The complete set of replicas for this partition regardless of whether they are alive or up-to-date
*/
public Node[] replicas() {
return replicas;
}
/**
* The subset of the replicas that are in sync, that is caught-up to the leader and ready to take over as leader if
* the leader should fail
*/
public Node[] inSyncReplicas() {
return inSyncReplicas;
}
/**
* The subset of the replicas that are offline
*/
public Node[] offlineReplicas() {
return offlineReplicas;
}
@Override
public String toString() {
return String.format("Partition(topic = %s, partition = %d, leader = %s, replicas = %s, isr = %s, offlineReplicas = %s)",
topic,
partition,
leader == null ? "none" : leader.idString(),
formatNodeIds(replicas),
formatNodeIds(inSyncReplicas),
formatNodeIds(offlineReplicas));
}
/* Extract the node ids from each item in the array and format for display */
private String formatNodeIds(Node[] nodes) {
StringBuilder b = new StringBuilder("[");
for (int i = 0; i < nodes.length; i++) {
b.append(nodes[i].idString());
if (i < nodes.length - 1)
b.append(',');
}
b.append("]");
return b.toString();
}
}
4. 取消订阅
public void unsubscribe() {
acquireAndEnsureOpen();
try {
log.debug("Unsubscribed all topics or patterns and assigned partitions");
this.subscriptions.unsubscribe();
this.coordinator.maybeLeaveGroup();
this.metadata.needMetadataForAllTopics(false);
} finally {
release();
}
}
5. 订阅状态
public void subscribe(Collection<String> topics): //订阅状态为:AUTO_TOPICS
public void subscribe(Pattern pattern) //订阅状态为:AUTO_PATTERN
public void assign(Collection<TopicPartition> partitions) //订阅状态为:USER_ASSIGNED
网友评论