美文网首页
kafka_07_consumer订阅主题和分区

kafka_07_consumer订阅主题和分区

作者: 平头哥2 | 来源:发表于2019-03-30 22:13 被阅读0次

1. 订阅主题

public void subscribe(Collection<String> topics) {
    subscribe(topics, new NoOpConsumerRebalanceListener());
}

@Override
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
    acquireAndEnsureOpen();//获取锁并保证consumer没有关闭
    try {
        if (topics == null) {//如果订阅主题为null
            throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
        } else if (topics.isEmpty()) {//如果订阅主题为空,则表示取消订阅的所有主题
            // treat subscribing to empty topic list as the same as unsubscribing
            this.unsubscribe();//取消订阅主题
        } else {
            for (String topic : topics) {
                if (topic == null || topic.trim().isEmpty())//判断topic是不是为null或“”
                    throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
            }

            throwIfNoAssignorsConfigured();

            log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
            this.subscriptions.subscribe(new HashSet<>(topics), listener);
            metadata.setTopics(subscriptions.groupSubscription());
        }
    } finally {
        release();//释放锁
    }
}

@Override
public void subscribe(Pattern pattern) {
    subscribe(pattern, new NoOpConsumerRebalanceListener());
}

@Override
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
    if (pattern == null)
        throw new IllegalArgumentException("Topic pattern to subscribe to cannot be null");

    acquireAndEnsureOpen();
    try {
        throwIfNoAssignorsConfigured();
        log.debug("Subscribed to pattern: {}", pattern);
        this.subscriptions.subscribe(pattern, listener);
        this.metadata.needMetadataForAllTopics(true);
        this.coordinator.updatePatternSubscription(metadata.fetch());
        this.metadata.requestUpdate();
    } finally {
        release();//释放锁
    }
}

其中: acquireAndEnsureOpen() 方法源码如下:

/**
 * 获取轻量级的锁,并且保证 consumer没有被关闭.
 * @throws IllegalStateException 如果consumer关闭,抛出IllegalStateException异常
 */
private void acquireAndEnsureOpen() {
    acquire();//获取锁
    if (this.closed) {
        release();//释放锁
        throw new IllegalStateException("This consumer has already been closed.");
    }
}
private static final long NO_CURRENT_THREAD = -1L;
private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
private final AtomicInteger refcount = new AtomicInteger(0);

/**
 * 获取锁,在多线程访问的情况下保护 consumer。当锁不可用的时候不是阻塞,而是抛出一个异常(并发修改异常:ConcurrentModificationException)
 * @throws ConcurrentModificationException 如果另外一个线程持有了锁
 */
private void acquire() {
    long threadId = Thread.currentThread().getId();//获取当前线程的id
    if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
        throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
    refcount.incrementAndGet();
}

/**
 * 释放锁
 */
private void release() {
    if (refcount.decrementAndGet() == 0)
        currentThread.set(NO_CURRENT_THREAD);
}

throwIfNoAssignorsConfigured();源码如下:

private List<PartitionAssignor> assignors;
private void throwIfNoAssignorsConfigured() {
    if (assignors.isEmpty())//如果分配的Partition为空,抛出异常 IllegalStateException
        throw new IllegalStateException("Must configure at least one partition assigner class name to " +
            ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " configuration property");
}

ConsumerRebalanceListener源码如下:

package org.apache.kafka.clients.consumer;

import java.util.Collection;

import org.apache.kafka.common.TopicPartition;

/**
 * A callback interface that the user can implement to trigger custom actions  
 * <p>
 * This is applicable when the consumer is having Kafka auto-manage group membership. If the consumer directly assigns partitions,
 * those partitions will never be reassigned and this callback is not applicable.
 * <p>
 * When Kafka is managing the group membership, a partition re-assignment will be triggered any time the members of the group change or the subscription
 * of the members changes. This can occur when processes die, new process instances are added or old instances come back to life after failure.
 * Rebalances can also be triggered by changes affecting the subscribed topics (e.g. when the number of partitions is
 * administratively adjusted).
 * <p>
 * There are many uses for this functionality. One common use is saving offsets in a custom store. By saving offsets in
 * the {@link #onPartitionsRevoked(Collection)} call we can ensure that any time partition assignment changes
 * the offset gets saved.
 * <p>
 * Another use is flushing out any kind of cache of intermediate results the consumer may be keeping. For example,
 * consider a case where the consumer is subscribed to a topic containing user page views, and the goal is to count the
 * number of page views per user for each five minute window. Let's say the topic is partitioned by the user id so that
 * all events for a particular user go to a single consumer instance. The consumer can keep in memory a running
 * tally of actions per user and only flush these out to a remote data store when its cache gets too big. However if a
 * partition is reassigned it may want to automatically trigger a flush of this cache, before the new owner takes over
 * consumption.
 * <p>
 * This callback will only execute in the user thread as part of the {@link Consumer#poll(java.time.Duration) poll(long)} call
 * whenever partition assignment changes.
 * <p>
 * It is guaranteed that all consumer processes will invoke {@link #onPartitionsRevoked(Collection) onPartitionsRevoked} prior to
 * any process invoking {@link #onPartitionsAssigned(Collection) onPartitionsAssigned}. So if offsets or other state is saved in the
 * {@link #onPartitionsRevoked(Collection) onPartitionsRevoked} call it is guaranteed to be saved by the time the process taking over that
 * partition has their {@link #onPartitionsAssigned(Collection) onPartitionsAssigned} callback called to load the state.
 * <p>
 * Here is pseudo-code for a callback implementation for saving offsets:
 * <pre>
 * {@code
 *   public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
 *       private Consumer<?,?> consumer;
 *
 *       public SaveOffsetsOnRebalance(Consumer<?,?> consumer) {
 *           this.consumer = consumer;
 *       }
 *
 *       public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
 *           // save the offsets in an external store using some custom code not described here
 *           for(TopicPartition partition: partitions)
 *              saveOffsetInExternalStore(consumer.position(partition));
 *       }
 *
 *       public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
 *           // read the offsets from an external store using some custom code not described here
 *           for(TopicPartition partition: partitions)
 *              consumer.seek(partition, readOffsetFromExternalStore(partition));
 *       }
 *   }
 * }
 * </pre>
 */
public interface ConsumerRebalanceListener {

    /**
     * A callback method the user can implement to provide handling of offset commits to a customized store on the start
     * of a rebalance operation. This method will be called before a rebalance operation starts and after the consumer
     * stops fetching data. It is recommended that offsets should be committed in this callback to either Kafka or a
     * custom offset store to prevent duplicate data.
     * <p>
     * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer}
     * <p>
     * <b>NOTE:</b> This method is only called before rebalances. It is not called prior to {@link KafkaConsumer#close()}.
     * <p>
     * It is common for the revocation callback to use the consumer instance in order to commit offsets. It is possible
     * for a {@link org.apache.kafka.common.errors.WakeupException} or {@link org.apache.kafka.common.errors.InterruptException}
     * to be raised from one these nested invocations. In this case, the exception will be propagated to the current
     * invocation of {@link KafkaConsumer#poll(java.time.Duration)} in which this callback is being executed. This means it is not
     * necessary to catch these exceptions and re-attempt to wakeup or interrupt the consumer thread.
     *
     * @param partitions The list of partitions that were assigned to the consumer on the last rebalance
     * @throws org.apache.kafka.common.errors.WakeupException If raised from a nested call to {@link KafkaConsumer}
     * @throws org.apache.kafka.common.errors.InterruptException If raised from a nested call to {@link KafkaConsumer}
     */
    void onPartitionsRevoked(Collection<TopicPartition> partitions);

    /**
     * A callback method the user can implement to provide handling of customized offsets on completion of a successful
     * partition re-assignment. This method will be called after the partition re-assignment completes and before the
     * consumer starts fetching data, and only as the result of a {@link Consumer#poll(java.time.Duration) poll(long)} call.
     * <p>
     * It is guaranteed that all the processes in a consumer group will execute their
     * {@link #onPartitionsRevoked(Collection)} callback before any instance executes its
     * {@link #onPartitionsAssigned(Collection)} callback.
     * <p>
     * It is common for the assignment callback to use the consumer instance in order to query offsets. It is possible
     * for a {@link org.apache.kafka.common.errors.WakeupException} or {@link org.apache.kafka.common.errors.InterruptException}
     * to be raised from one these nested invocations. In this case, the exception will be propagated to the current
     * invocation of {@link KafkaConsumer#poll(java.time.Duration)} in which this callback is being executed. This means it is not
     * necessary to catch these exceptions and re-attempt to wakeup or interrupt the consumer thread.
     *
     * @param partitions The list of partitions that are now assigned to the consumer (may include partitions previously
     *            assigned to the consumer)
     * @throws org.apache.kafka.common.errors.WakeupException If raised from a nested call to {@link KafkaConsumer}
     * @throws org.apache.kafka.common.errors.InterruptException If raised from a nested call to {@link KafkaConsumer}
     */
    void onPartitionsAssigned(Collection<TopicPartition> partitions);
}

其中: NoOpConsumerRebalanceListener 源码如下:(方法是空实现)

package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;

import java.util.Collection;

public class NoOpConsumerRebalanceListener implements ConsumerRebalanceListener {

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {}

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}

}

示例:

//订阅主题,参数是集合
consumer.subscribe(Collections.singletonList(topic));
//订阅主题,参数是郑泽表达式
consumer.subscribe(Pattern.compile("topic-.*"));

2. 订阅主题的特定分区

方法(public void assign(Collection<TopicPartition> partitions) )如下:

/**
 * 1.手动为该consumer分配一个patitions集合,该接口不允许增加分配,而是替代之前的分配(如果有的话)
 * 2.如果给定的参数集合为空,相当于取消订阅 unsubscribe()
 * 3. 通过该方法手动分配的主题不会使用consumer的group 管理功能。这样,当group成员或者集群和主题改变的时候,就没有rebalance操作触发
 * 
 * 注意: assign(Collection) 和 group assignment with {@link #subscribe(Collection, ConsumerRebalanceListener) 不可能同时使用。
 * <p>
 * If auto-commit is enabled, an async commit (based on the old assignment) will be triggered before the new
 * assignment replaces the old one.
 *
 * @param partitions The list of partitions to assign this consumer
 * @throws IllegalArgumentException 如果 partitions 为null 
 * @throws IllegalStateException 先调用了subscribe(),而没有紧接着调用unsubscribe()
 */
@Override
public void assign(Collection<TopicPartition> partitions) {
    acquireAndEnsureOpen();//获取锁,确保客户端打开
    try {
        if (partitions == null) {//如果集合为空
            throw new IllegalArgumentException("Topic partition collection to assign to cannot be null");
        } else if (partitions.isEmpty()) {
            this.unsubscribe();//取消订阅
        } else {
            Set<String> topics = new HashSet<>();//这里不保证有序
            for (TopicPartition tp : partitions) {
                String topic = (tp != null) ? tp.topic() : null;
                if (topic == null || topic.trim().isEmpty())
                    //如果topic有null,或者“”,或者"   "
                    throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
                topics.add(topic);//添加到集合
            }

            // make sure the offsets of topic partitions the consumer is unsubscribing from
            // are committed since there will be no following rebalance
            this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());

            log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", "));
            //Change the assignment to the specified partitions provided by the user
            this.subscriptions.assignFromUser(new HashSet<>(partitions));
            metadata.setTopics(topics);
        }
    } finally {
        release();//释放锁
    }
}

其中:TopicPartition 如下:

package org.apache.kafka.common;

import java.io.Serializable;

/**
 * A topic name and partition number
 */
public final class TopicPartition implements Serializable {

    private int hash = 0;//哈希
    private final int partition;//分区
    private final String topic;//主题

    public TopicPartition(String topic, int partition) {
        this.partition = partition;
        this.topic = topic;
    }

    public int partition() {
        return partition;
    }

    public String topic() {
        return topic;
    }

    @Override
    public int hashCode() {
        if (hash != 0)
            return hash;
        final int prime = 31;
        int result = 1;
        result = prime * result + partition;
        result = prime * result + ((topic == null) ? 0 : topic.hashCode());
        this.hash = result;
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        TopicPartition other = (TopicPartition) obj;
        if (partition != other.partition)
            return false;
        if (topic == null) {
            if (other.topic != null)
                return false;
        } else if (!topic.equals(other.topic))
            return false;
        return true;
    }

    @Override
    public String toString() {
        return topic + "-" + partition;
    }
}

示例:

consumer.assign(Arrays.asList(new TopicPartition("topic-demo",0)));

我们事先不知道主题有多少个分区怎么办?

3. 查询指定主题的元数据信息

使用如下方法:

public List<PartitionInfo> partitionsFor(String topic) 
public List<PartitionInfo> partitionsFor(String topic, Duration timeout)

源码如下:

//Get metadata about the partitions for a given topic.
//timeout: The maximum of time to await topic metadata
public List<PartitionInfo> partitionsFor(String topic, Duration timeout) {
    acquireAndEnsureOpen();
    long timeoutMs = timeout.toMillis();
    try {
        Cluster cluster = this.metadata.fetch();
        List<PartitionInfo> parts = cluster.partitionsForTopic(topic);
        if (!parts.isEmpty())
            return parts;

        Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(
                new MetadataRequest.Builder(Collections.singletonList(topic), true), timeoutMs);
        return topicMetadata.get(topic);
    } finally {
        release();
    }
}

其中:PartitionInfo 类的源码如下:

package org.apache.kafka.common;

/**
 * This is used to describe per-partition state in the MetadataResponse.
 */
public class PartitionInfo {

    private final String topic;
    private final int partition;
    private final Node leader;
    private final Node[] replicas; //AR
    private final Node[] inSyncReplicas;//ISR
    private final Node[] offlineReplicas;//OSR

    // Used only by tests
    public PartitionInfo(String topic, int partition, Node leader, Node[] replicas, Node[] inSyncReplicas) {
        this(topic, partition, leader, replicas, inSyncReplicas, new Node[0]);
    }

    public PartitionInfo(String topic, int partition, Node leader, Node[] replicas, Node[] inSyncReplicas, Node[] offlineReplicas) {
        this.topic = topic;
        this.partition = partition;
        this.leader = leader;
        this.replicas = replicas;
        this.inSyncReplicas = inSyncReplicas;
        this.offlineReplicas = offlineReplicas;
    }

    /**
     * The topic name
     */
    public String topic() {
        return topic;
    }

    /**
     * The partition id
     */
    public int partition() {
        return partition;
    }

    /**
     * The node id of the node currently acting as a leader for this partition or null if there is no leader
     */
    public Node leader() {
        return leader;
    }

    /**
     * The complete set of replicas for this partition regardless of whether they are alive or up-to-date
     */
    public Node[] replicas() {
        return replicas;
    }

    /**
     * The subset of the replicas that are in sync, that is caught-up to the leader and ready to take over as leader if
     * the leader should fail
     */
    public Node[] inSyncReplicas() {
        return inSyncReplicas;
    }

    /**
     * The subset of the replicas that are offline
     */
    public Node[] offlineReplicas() {
        return offlineReplicas;
    }

    @Override
    public String toString() {
        return String.format("Partition(topic = %s, partition = %d, leader = %s, replicas = %s, isr = %s, offlineReplicas = %s)",
                             topic,
                             partition,
                             leader == null ? "none" : leader.idString(),
                             formatNodeIds(replicas),
                             formatNodeIds(inSyncReplicas),
                             formatNodeIds(offlineReplicas));
    }

    /* Extract the node ids from each item in the array and format for display */
    private String formatNodeIds(Node[] nodes) {
        StringBuilder b = new StringBuilder("[");
        for (int i = 0; i < nodes.length; i++) {
            b.append(nodes[i].idString());
            if (i < nodes.length - 1)
                b.append(',');
        }
        b.append("]");
        return b.toString();
    }

}

4. 取消订阅

public void unsubscribe() {
    acquireAndEnsureOpen();
    try {
        log.debug("Unsubscribed all topics or patterns and assigned partitions");
        this.subscriptions.unsubscribe();
        this.coordinator.maybeLeaveGroup();
        this.metadata.needMetadataForAllTopics(false);
    } finally {
        release();
    }
}

5. 订阅状态

public void subscribe(Collection<String> topics): //订阅状态为:AUTO_TOPICS
public void subscribe(Pattern pattern) //订阅状态为:AUTO_PATTERN
public void assign(Collection<TopicPartition> partitions) //订阅状态为:USER_ASSIGNED

相关文章

网友评论

      本文标题:kafka_07_consumer订阅主题和分区

      本文链接:https://www.haomeiwen.com/subject/kzkebqtx.html