一、概念入门
1.消费者和消费组
Kafka消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,每个消费者会收到不同分区的消息。假设有一个T1主题,该主题有4个分区;同时我们有一个消费组G1,这个消费组只有一个消费者C1。那么消费者C1将会收到这4个分区的消息,如下所示:
Kafka一个很重要的特性就是,只需写入一次消息,可以支持任意多的应用读取这个消息。换句话说,每个应用都可以读到全量的消息。为了使得每个应用都能读到全量消息,应用需要有不同的消费组。对于上面的例子,假如我们新增了一个新的消费组G2,而这个消费组有两个消费者,那么会是这样的:
二、消息接收
见代码库:com.heima.kafka.chapter3.KafkaConsumerAnalysis
1.必要参数设置
KafkaConsumer
实例中参数众多,后续会深入讲解
public static Properties initConfig() {
Properties props = new Properties();
// 与KafkaProducer中设置保持一致
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 必填参数,该参数和KafkaProducer中的相同,制定连接Kafka集群所需的broker地址清单,可以设置一个或者多个
props.put("bootstrap.servers", brokerList);
// 消费者隶属于的消费组,默认为空,如果设置为空,则会抛出异常,这个参数要设置成具有一定业务含义的名称
props.put("group.id", groupId);
// 指定KafkaConsumer对应的客户端ID,默认为空,如果不设置KafkaConsumer会自动生成一个非空字符串
props.put("client.id", "consumer.client.id.demo");
return props;
}
2.订阅主题和分区
创建完消费者后我们便可以订阅主题了,只需要通过调用subscribe()方法即可,这个方法接收一个主题列表
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
另外,我们也可以使用正则表达式来匹配多个主题,而且订阅之后如果又有匹配的新主题,那么这个消费组会立即对其进行消费。正则表达式在连接Kafka与其他系统时非常有用。比如订阅所有的测试主题:
consumer.subscribe(Pattern.compile("heima*"));
指定订阅的分区
// 指定订阅的分区
consumer.assign(Arrays.asList(new TopicPartition("topic0701", 0)));
3.反序列化
// 与KafkaProducer中设置保持一致
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
4.位移提交
对于Kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中的位置。
当我们调用poll()时,该方法会返回我们没有消费的消息。当消息从broker返回消费者时,broker并不跟踪这些消息是否被消费者接收到;Kafka让消费者自身来管理消费的位移,并向消费者提供更新位移的接口,这种更新位移方式称为提交(commit
)。
重复消费
消息丢失
自动提交
这种方式让消费者来管理位移,应用本身不需要显式操作。当我们将enable.auto.commit
设置为true,那么消费者会在poll方法调用后每隔5秒(由auto.commit.interval.ms
指定)提交一次位移。和很多其他操作一样,自动提交也是由poll()方法来驱动的;在调用poll()时,消费者判断是否到达提交时间,如果是则提交上一次poll返回的最大位移。
需要注意到,这种方式可能会导致消息重复消费。假如,某个消费者poll消息后,应用正在处理消息,在3秒后Kafka进行了重平衡,那么由于没有更新位移导致重平衡后这部分消息重复消费。
同步提交
见代码库:com.heima.kafka.chapter3.CheckOffsetAndCommit
public static Properties initConfig() {
Properties props = new Properties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 手动提交开启
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return props;
}
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
if (records.isEmpty()) {
break;
}
List<ConsumerRecord<String, String>> partitionRecords = records.records(tp);
lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync();
//同步提交消费位移
}
异步提交
手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和自动提交一样)。另外一个解决办法是,使用异步提交的API。
见代码:com.heima.kafka.chapter3.OffsetCommitAsyncCallback
但是异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖。举个例子,假如我们发起了一个异步提交commitA,此时的提交位移为2000,随后又发起了一个异步提交commitB且位移为3000;commitA提交失败但commitB提交成功,此时commitA进行重试并成功的话,会将实际上将已经提交的位移从3000回滚到2000,导致消息重复消费。
异步回调
try {
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
//do some logical processing.
}
// 异步回调
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,Exception exception) {
if (exception == null) {
System.out.println(offsets);
} else {
log.error("fail to commit offsets {}", offsets, exception);
}
}
});
}
} finally {
consumer.close();
}
5.指定位移消费
到目前为止,我们知道消息的拉取是根据poll()方法中的逻辑来处理的,但是这个方法对于普通开发人员来说就是个黑盒处理,无法精确掌握其消费的起始位置。
seek()方法正好提供了这个功能,让我们得以追踪以前的消费或者回溯消费。
见代码库:com.heima.kafka.chapter3.SeekDemo
/**
* 指定位移消费
*/
public class SeekDemo extends ConsumerClientConfig {
public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
// timeout参数设置多少合适?太短会使分区分配失败,太长又有可能造成一些不必要的等待
consumer.poll(Duration.ofMillis(2000));
// 获取消费者所分配到的分区
Set<TopicPartition> assignment = consumer.assignment();
System.out.println(assignment);
for (TopicPartition tp : assignment) {
// 参数partition表示分区,offset表示指定从分区的哪个位置开始消费
consumer.seek(tp, 10);
}
consumer.seek(new TopicPartition(topic,0),10);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
//consume the record.
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.offset() + ":" + record.value());
}
}
}
增加判断是否分配到了分区,见代码库:com.heima.kafka.chapter3.SeekDemoAssignment
public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
long start = System.currentTimeMillis();
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {
consumer.poll(Duration.ofMillis(100));
assignment = consumer.assignment();
}
long end = System.currentTimeMillis();
System.out.println(end - start);
System.out.println(assignment);
for (TopicPartition tp : assignment) {
consumer.seek(tp, 10);
}
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
//consume the record.
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.offset() + ":" + record.value());
}
}
}
- 指定从分区末尾开始消费 ,见代码库:
com.heima.kafka.chapter3.SeekToEnd
// 指定从分区末尾开始消费
Map<TopicPartition, Long> offsets = consumer.endOffsets(assignment);
for (TopicPartition tp : assignment) {
consumer.seek(tp, offsets.get(tp));
}
- 演示位移越界操作,修改代码如下:
for (TopicPartition tp : assignment) {
//consumer.seek(tp, offsets.get(tp));
consumer.seek(tp, offsets.get(tp) + 1);
}
会通过auto.offset.reset参数的默认值将位置重置,效果如下:
INFO [Consumer clientId=consumer-1, groupId=group.heima] Fetch offset 1 is out of range for partition heima-0, resetting offset (org.apache.kafka.clients.consumer.internals.Fetcher:967)
INFO [Consumer clientId=consumer-1, groupId=group.heima] Fetch offset 10 is out of range for partition heima-1, resetting offset (org.apache.kafka.clients.consumer.internals.Fetcher:967)
INFO [Consumer clientId=consumer-1, groupId=group.heima] Resetting offset for partition heima-0 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:583)
INFO [Consumer clientId=consumer-1, groupId=group.heima] Resetting offset for partition heima-1 to offset 9. (org.apache.kafka.clients.consumer.internals.Fetcher:583)
6.再均衡监听器
再均衡是指分区的所属从一个消费者转移到另外一个消费者的行为,它为消费组具备了高可用性和伸缩性提供了保障,使得我们既方便又安全地删除消费组内的消费者或者往消费组内添加消费者。不过再均衡发生期间,消费者是无法拉取消息的。
见代码库:com.heima.kafka.chapter3.CommitSyncInRebalance
public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 劲量避免重复消费
consumer.commitSync(currentOffsets);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
//do nothing.
}
});
try {while (isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.offset() + ":" + record.value());
// 异步提交消费位移,在发生再均衡动作之前可以通过再均衡监听器的 onPartitionsRevoked回调执行commitSync方法同步提交位移。
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1));
}
consumer.commitAsync(currentOffsets, null);
}
} finally {
consumer.close();
}
}
7.消费者拦截器
之前章节讲了生产者拦截器,对应的消费者也有相应的拦截器概念,消费者拦截器主要是在消费到消息或者在提交消费位移时进行的一些定制化的操作。
使用场景
对消费消息设置一个有效期的属性,如果某条消息在既定的时间窗口内无法到达,那就视为无效,不需要再被处理。
见代码库:com.heima.kafka.chapter3.ConsumerInterceptorTTL
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
System.out.println("before:" + records);
long now = System.currentTimeMillis();
Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>();
for (TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
List<ConsumerRecord<String, String>> newTpRecords = new ArrayList<> ();
for (ConsumerRecord<String, String> record : tpRecords) {
if (now - record.timestamp() < EXPIRE_INTERVAL) {
newTpRecords.add(record);
}
}
if (!newTpRecords.isEmpty()) {
newRecords.put(tp, newTpRecords);
}
}
return new ConsumerRecords<>(newRecords);
}
实现自定义拦截器之后,需要在KafkaConsumer
中配置指定这个拦截器,如下
// 指定消费者拦截器
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,ConsumerInterceptorTTL.class .getName());
效果演示
发送端同时发送两条消息,其中一条修改timestamp的值来使其变得超时,如下:
com.heima.kafka.chapter3.ProducerFastStart
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Kafka-demo- 001", "hello, Kafka!");
ProducerRecord<String, String> record2 = new ProducerRecord<>(topic, 0, System.currentTimeMillis() - 10 * 1000, "Kafka-demo-001", "hello, Kafka!->超时");
启动消费端运行如下,只收到了未超时的消息:
8.消费者参数补充
fetch.min.bytes
这个参数允许消费者指定从broker读取消息时最小的数据量。当消费者从broker读取消息时,如果数据量小于这个阈值,broker会等待直到有足够的数据,然后才返回给消费者。对于写入量不高的主题来说,这个参数可以减少broker和消费者的压力,因为减少了往返的时间。而对于有大量消费者的主题来说,则可以明显减轻broker压力。
fetch.max.wait.ms
上面的fetch.min.bytes参数指定了消费者读取的最小数据量,而这个参数则指定了消费者读取时最长等待时间,从而避免长时间阻塞。这个参数默认为500ms。
max.partition.fetch.bytes
这个参数指定了每个分区返回的最多字节数,默认为1M。也就是说,KafkaConsumer.poll()
返回记录列表时,每个分区的记录字节数最多为1M。如果一个主题有20个分区,同时有5个消费者,那么每个消费者需要4M的空间来处理消息。实际情况中,我们需要设置更多的空间,这样当存在消费者宕机时,其他消费者可以承担更多的分区。
max.poll.records
这个参数控制一个poll()调用返回的记录数,这个可以用来控制应用在拉取循环中的处理数据量。
更多参数见官网: http://kafka.apache.org/documentation/#consumerconfigs
总结
本章主要讲解了消费者和消费组的概念,以及如何正确的使用KafkaConsumer
,其中重点讲解了参数的配置,订阅、反序列化、位移提交、再均衡、拦截器等知识点。
参考资料:《Kafka技术手册》
免费获取方式:私信【资料】
免费获取
还有更多Java PDF学习资料等你来拿!!!
网友评论