目录
- 什么是消费者和消费者群组
- 什么是再均衡
- 群组协调器的broker如何确定消费者是否关闭或者崩溃
- 消费者如何提交偏移量
- 提交偏移方式有哪些
- 同步异步结合方式提交
- kafka消费者一直在轮询,那么怎么退出
- 消费者的配置
kafka消费者
什么是消费者和消费者群组
kafka消费者群群组是一个群体里的消费者订阅的都是同一个主题,每个消费者接收主题一部分分区的信息。当生产者的写入速度大于目前群组的读出速度的时候,可以通过增加消费者群组里面的消费者数量来接收消息。当消费者数量超过主题的分区数量的时候,有一部分消费者就会闲置。 不同消费者群组消费进度互不干涉。
什么是再均衡
分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡。再均衡非常重要,它为消费者带来了高可用性和伸缩性。在在均衡期间,消费者无法读取消息,造成整个群组一小段时间的不可用。消费者启动关闭时会造成再均衡。
群组协调器的broker如何确定消费者是否关闭或者崩溃?
通过心跳机制。发送心跳来维持它们和群组的从属关系。群组协调器会等待几秒,确认死亡才回触发再均衡。
消费者如何提交偏移量?
更新分区当前位置的操作叫做提交。消费者往一个叫做——consumer_offset的特殊主体发送消息,消息里包含每个分区的偏移量。如果消费者一直处于运行状态,那么偏移量就没有什么作用。不过如果消费者发生崩溃或者有新的消费者加入群组,就会触发再均衡。完成再均衡之后,每个消费者可能分配到新的分区,而不是之前处理的那个。为了能继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的位置处理。
如果提交的偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。如果偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会消失。
提交偏移方式有哪些?
- 自动提交,设置enable.auto.commit为true,那么消费者会自动从Poll()方法接受到的最大偏移量提交上去。风险在于如果提交之后第3s发生了再均衡,在均衡会后处理的消息会重复。所以在此调用之前最好确保素有当前调用返回的消息都以及处理完毕(在调用close()方法之前也会进行自动提交)
- 提交当前偏移量。把auto.commit.offset设为false,让应用程序决定何时提交偏移量。使用commitSync()提交偏移量最简单也最可靠。这个api会由poll()方法返回的最新偏移量,提交成功马上返回,如果提交视频,跑出异常。commitSync()将会提交由poll()返回的最新偏移量,所以在处理完所有记录后要确保调用commitSync()否则还是会由丢失消息的方希。如果发生再均衡,会出现重复处理。
- 异步提交,手动提交由一个不足之处,在broker对提交请求做出回应之前,应用程序会一直阻塞,这样会限制应用程序的吞吐量。可以使用异步提交API,我们只管发送提交请求,无需等待broker的响应。
- 同步与异步组合提交,在消费者关闭之前一般会组合使commitAsync()和commitSync()
- 提交特定的偏移量
同步异步结合方式提交
- 提交方式用同步和异步的方式 一般来着异步提交,即使失败也没事,因为后续提交总有成功的,但是如果遇到消费者关闭或者
再均衡时就必须提交成功。 消费者关闭可在finally块中同步提交,再均衡需要实现在均衡监听器,实现ConsumeRebalanceListener
kafka消费者一直在轮询,那么怎么退出呢?
如果确定退出循环,需要通过另外一个程序调用consumer.wakeup(), addShutDownHook里面调用consume.wakeup,这时候while不断循环的线程会抛出WakeUpException,不用管,只是抛出异常用
,finally时调用consume.close
消费者的配置
- fetch.min.bytes:指定了最小的字节数
- fetch.amx.wait.ms:用于指定broker的等待时间,默认为500ms
- max.partiton.fetch.bytes:属性指定了服务器从每个分区里返回给消费者的最大字数。
- session.timeout.ms:指定了消费者在被认定为死亡之前可以与服务器断开连接的时间,默认为3S。若在3S内被认为已经死亡,协调器就会发生再均衡。发送心跳的时间间隔一般为session.timeout.ms的三分之一。
- auto.offset.reset:此属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该如何处理。它的默认值为latest(意思是在偏移量无效的情况下,消费者将从最新的记录开始读取数据)。另一个值是earliest,意思是说,在偏移量无效的情况下,消费者将从起始位置读取分区的记录。
- enacle.auto.commit:此属性指定了消费者是否自动提交偏移量,默认值为true。为了尽量避免出现重复数据和数据丢失,可以把它设置为false,由自己控制何时提交偏移量。
- partition.assignment.strategy:有两种分配策略,Range以及RoundRobin。该策略会把主题的若干个连续的分区分配给消费者。RoundRobin该策略把主题的所有分区诸葛分配给消费者。例如:消费者C1以及消费者C2同时订阅了主题T1和主题T2,且每个主题有3个分区。若使用Range策略,消费者C1可能会分配到T1以及T2的分区0以及分区1,C2可能会分配到T1以及T2的分区2。RoundRobin的策略会把主题的所有分区逐个进行分配给消费者,消费者C1可能会分配到T1的分区0和分区2以及T2的分区1;消费者C2可能会分配到T2的分区0和分区2以及T1的分区1。
- client.id:用来标识从客户端发送来的消息
- max.poll.records:用于控制单次调用call()方法能够返回的记录数量。
- receive.buffer.bytes和send.buffer.bytes,socket在读写数据时用到的TCP缓冲区设置大小
网友评论