美文网首页
读书笔记:Kafka消费者客户端入门

读书笔记:Kafka消费者客户端入门

作者: 东南枝下 | 来源:发表于2020-11-26 17:16 被阅读0次

消费组(Consumer Group)

消费者在进行消费前需要指定其所属消费组的名称,这个可以通过消费者客户端参数group.id来配置,默认值为空字符串。

一个正常的消费逻辑需要具备以下几个步骤:
(1)配置消费者客户端参数及创建相应的消费者实例。
(2)订阅主题。

// 订阅状态:AUTO_TOPICS
consumer.subscribe(Collections.singletonList(TOPIC));
// 通过正则订阅,订阅状态:AUTO_PATTERN
consumer.subscribe(Pattern.compile("topic-.*"));
// 直接订阅某些主题的特定分区,订阅状态:USER_ASSIGNED
consumer.assign(Collections.singletonList(new TopicPartition(TOPIC,0)));
// 订阅状态:AUTO_TOPICS,AUTO_PATTERN,USER_ASSIGNED 互斥,一个消费者只能使用其中一种
// 取消订阅
consumer.unsubscribe(); // 或者订阅空集合

// 查询指定主题的元数据信息
List<PartitionInfo> partitionInfoList=consumer.partitionsFor(TOPIC);

(3)拉取消息并消费。

    // 拉取订阅消息,设置阻塞时间为永久
ConsumerRecords<String,String> records=consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
// 按照 分区/主题 维度进行消费
records.records(partition) 或 records.records(topic)

ConsumerRecords 中的几种方法
1、count()方法用来计算出消息集中的消息个数,返回类型是int
2、isEmpty()方法用来判断消息集是否为空,返回类型是boolean
3、empty()方法用来获取一个空的消息集,返回类型是ConsumerRecord<K,V>

public void seek(TopicPartition partition,long offset)
seek()方法只能重置消费者分配到的分区的消费位置,而分区的分配是在 poll()方法的调用过程中实现的。也就是说,在执行seek()方法之前需要先执行一次poll()方法,等到分配到分区之后才可以重置消费位置。

// 获取消费者所分配到的分区信息
public Set<TopicPartition> assignment()

seekToBeginning()方法和seekToEnd() // 从开头或末尾开始消费

(4)提交消费位移。

    enable.auto.commit=false时,进行手动提交位移
手动提交又可以分为同步提交和异步提交两种方式,commitSync()和commitAsync()
// 提交指定分区的位移
public void commitSync(final Map<TopicPartition,OffsetAndMetadata> offsets)
// 异步提交回调
public  void  commitAsync(OffsetCommitCallback callback)
public void commitAsync(final Map<TopicPartition,OffsetAndMetadata offsets,OffsetCommitCallback callback)

(5)关闭消费者实例。

pause()  // 暂停某些分区在拉取操作时返回数据给客户端

paused()  //返回被暂停的分区合集

resume()  // 恢复某些分区向客户端返回数据

KafkaConsumer的wakeup()方法可以退出poll()的逻辑,并抛出WakeupException 的异常

参数

配置参数常量类:org.apache.kafka.clients.consumer.ConsumerConfig

bootstrap.servers
指 定 连 接 Kafka 集 群 所 需 的 broker 地 址 清 单

group.id
消费者隶属的消费组的名称

key.deserializervalue.deserializer
与生产者客户端 KafkaProducer中的key.serializer和value.serializer参数对应

client.id
设定KafkaConsumer对应的客户端id,默认值也为“”。如果客户端不设置,则KafkaConsumer会自动生成一个非空字符串,即字符串“consumer-”与数字的拼接

enable.auto.commit
消费位移的提交方式, 默认true,自动提交,false为手动提交

auto.commit.interval.ms
消费位移定义提交周期时间,默认5秒

auto.offset.reset
当消费者查找不到所记录的消费位移时,就会根据消费者客户端参数auto.offset.reset的配置来决定从何处开始进行消费,这个参数的默认值为“latest”,表示从分区末尾开始消费消息
如果将auto.offset.reset参数配置为“earliest”,那么消费者会从起始处,也就是0开始消费
配置为“none”,配置为此值就意味着出现查到不到消费位移的时候,既不从最新的消息位置处开始消费,也不从最早的消息位置处开始消费,此时会报出NoOffsetForPartitionException异常

fetch.min.bytes
该参数用来配置Consumer在一次拉取请求(调用poll()方法)中能从Kafka中拉取的最小数据量,默认值为1(B)

fetch.max.bytes
用来配置Consumer在一次拉取请求中从Kafka中拉取的最大数据量,默认值为52428800(B),也就是 50MB。

fetch.max.wait.ms
如果Kafka仅仅参考fetch.min.bytes参数的要求,那么有可能会一直阻塞等待而无法发送响应给

Consumer
用于指定Kafka的等待时间,默认值为500(ms)

max.partition.fetch.bytes
这个参数用来配置从每个分区里返回给Consumer的最大数据量,默认值为1048576(B),即1MB。这个参数与fetch.max.bytes 参数相似

max.poll.records
用来配置Consumer在一次拉取请求中拉取的最大消息数,默认值为500(条)

connections.max.idle.ms
这个参数用来指定在多久之后关闭限制的连接,默认值是540000(ms),即9分钟。

exclude.internal.topics
Kafka中有两个内部的主题:__consumer_offsets和__transaction_state
用来指定Kafka中的内部主题是否可以向消费者公开,默认值为true

receive.buffer.bytes
这个参数用来设置Socket接收消息缓冲区(SO_RECBUF)的大小,默认值为65536(B),即64KB。如果设置为-1,则使用操作系统的默认值。

request.timeout.ms
这个参数用来配置Consumer等待请求响应的最长时间,默认值为30000(ms)。

metadata.max.age.ms
用来配置元数据的过期时间,默认值为300000(ms),即5分钟

reconnect.backoff.ms
用来配置尝试重新连接指定主机之前的等待时间(也称为退避时间),避免频繁地连接主机,默认值为50(ms)

retry.backoff.ms
这个参数用来配置尝试重新发送失败的请求到指定的主题分区之前的等待(退避)时间,避免在某些故障情况下频繁地重复发送,默认值为100(ms)。

isolation.level
用来配置消费者的事务隔离级别。字符串类型,有效值为“read_uncommitted”和“read_committed”,表示消费者所消费到的位置,如果设置为“read_committed”,那么消费者就会忽略事务未提交的消息,即只能消费到LSO(LastStableOffset)的位置,默认情况下为“read_uncommitted”,即可以消费到HW(High Watermark)处的位置

再均衡

分区的所属权从一个消费者转移到另一消费者的行为

消费者拦截器

消费者拦截器主要在消费到消息或在提交消费位移时进行一些定制化的操作
消费者拦截器需要自定义实现org.apache.kafka.clients.consumer.ConsumerInterceptor接口
KafkaConsumer会在poll()方法返回之前调用拦截器的onConsume()方法
KafkaConsumer会在提交完消费位移之后调用拦截器的onCommit()方法

多线程实现

KafkaConsumer却是非线程安全的
KafkaConsumer中定义了一个 acquire()方法,用来检测当前是否只有一个线程在操作,若有其他线程正在操作则会抛出ConcurrentModifcationException异常
acquire()方法和release()方法成对出现,表示相应的加锁和解锁操作

相关文章

网友评论

      本文标题:读书笔记:Kafka消费者客户端入门

      本文链接:https://www.haomeiwen.com/subject/umkziktx.html