阅读以下内容你将了解到:
1.消费者和消费组的概念
2.消费者组的作用?
3.Kafka中的消费模式?
4.消费者的位移提交的概念(包括自动提交、带来的问题、手动提交策略、指定位移消费)
5.什么是再均衡?(包括发生的时机、造成的问题)
6.消费者中的拦截器
7.消费者中的多线程实现
1.消费者和消费组的概念
消费组是一个逻辑上的概念,它将旗下的消费者归位一类,可通过参数制定消费者所属的组名称。
2.消费者组的作用?
两个消费者组互不影响,在同一个消费者组内,一个分区只能对应一个消费者,而消费者可以对应多个分区,如果消费者大于分区数,多余的消费者会空闲。
对于不同组,可以有不同的负载均衡。_举个例子:消费组A里有五个消费者,消费者组B里有两个消费者,这两个消费者组同时订阅了一个Topic,这个Topic下有有四个分区,此时消费组A里面四个消费者消费不同的分区,剩余一个空闲。
消费组B里面一个消费者消费两个不同的分区。
3.Kafka中的消费模式?
Kafka中为pull(拉)模式,消费者根据自己需要去主动向服务器请求来拉取信息。
4.消费者的位移提交
对于分区来说,每条消息都有自己的offset,消费者也是一样,也有一个offset的概念,需要记录到自己消费到哪里了,并且这个消费位移必须做持久化保存。(重启或者再分配后需要拿到之前的消费位移。)
位移提交可能造成的重复消费与消息丢失的情况:
消息丢失:如果拉取到消息就进行了位移提交:当前一次拉取到[x+2,x+7]的消息集,此时处理到x+5时遇到了异常,在异常恢复后重新拉取消息是从x+8开始的,那么[x+5,x+7]这一段就会消息丢失。
重复消费:如果等消费完成之后再进行位移提交:还是上文的情形,当处理到x+5的时候遇到异常,在异常恢复后,又拉了一遍[x+2,x+7]的消息集,那么[x+2,x+4]的消息就被重复消费了。自动位移提交带来的问题:Kfka中默认的消费提交方式是每隔五秒提交一次消费位移(也就是延时提交)
假设刚拉取完一次消息,消费者崩溃或者再均衡了,那么又得从上一次位移提交的地方重新开始消费。造成重复消费。
假设拉取完一次消息,消费者中的处理线程出现了异常,导致有一部分没有处理,此时进行了位移提交,那么这部分消息就会丢失。
显然自动位移的提交并不能满足我们的需求,我们应当对有需要的业务设置手动提交。首先要明确一点:对于高可靠要求来说,宁愿重复消费也不应该因为消费异常导致消费丢失。我们可以在其他方面(比如幂等)来抵消因为重复消费带来的影响。
手动提交策略:
同步提交:
将拉取到的消息存入缓存buffer中,等积累到足够多的时候再做相应的批量提交处理。(会导致重复消费,但是不会消息丢失。)
每消费一条消息就提交一次位移。(在较小的程度上重复消费,但是会耗费性能)异步提交:
即在提交消费位移结果还未返回前就开始了新的拉取操作。(可以提高性能)
异步带来的问题:一次位移提交为x失败了,下一次又异步提交x+y成功了,此时重试提交x,会不会导致位移又变成x了呢? 此时我们可以判断如果位移已经大于要提交的值,就不需要重试此次的提交了。Kafka还提供了控制或者关闭消费的方法,有些场景下我们可能需要暂停某些分区的消费而先消费其他分区,当一定条件后再恢复。
Kafka还提供了指定位移来进行消费的功能
有这么一个场景:当新的消费者组建立的时候,根本没有可以查找的消费位移,或者因为其他情况找不到消费位移( _consumer_offsets主题中信息过期被删除了等),需要通过配置参数auto.offset.reset,指定起始处、分区末尾来进行消费。注意:如果启动时能找到消费位移,这个参数是不会奏效的。
或者我们可以通过seek()函数在业务逻辑里面重置分配到的分区的消费位置,可以通过方法拿到分区末尾、24小时前的消费位移等来进行消费。用这个方法我们还可以把消费位移存到任意存储介质中,比如数据库、文件系统等,通过这个方法来向前跳过或者回溯消费。更精准。强无敌。
5.什么是再均衡?
再均衡是指分区所属权从一个消费者转移到另一个的过程。
什么时候会发生再均衡?
1.有新的消费者加入消费组。
2.有消费者宕机下线。(长时间未发送心跳包即可认为下线)
3.消费者主动退出消费组。
4.消费组内所订阅的任一主题或者主题的分区数量发生变化
5.消费组对应的GroupCoorinator节点发生了变更。(该节点是来管理消费组的)再均衡会导致什么问题?
1.再均衡发生期间,消费组内的消费者是无法读取消息的
2.当某个消费者消费完某个分区尚未来的及提交位移,此时发生了再均衡,然后这个分区又被分配给了消费组内的另一个消费者。此时被消费完的那部分会又被重新消费一遍。就发生了重复消费。
6.消费者中的拦截器
与生产者中的拦截器类似,可以对消息进行拦截处理,比如消息的时间戳与当前时间戳超过10s就认定为过期,直接丢弃。
7.消费者中的多线程实现
与Producer的线程安全不同,Consumer是非线程安全的
一般可以认为有两种方案
1.开启多个消费线程。(此方法与多进程区别不大,但是每个线程要维护一个独立的TCP链接)
2.参考Reactor模型中,一个线程来监听TCP链接,多个线程(线程池)来处理业务。这个模型减少了TCP连接时的通信消耗,也增强了横向扩展能力。
我们把眼光放到第二种方案的位移提交的实现上:
这里提到的只是一种解决思路:滑动窗口。
image.png
每当startOffset指向的方格中的消息被消费完成,就可以提交这部分位移,与此同时窗口向前移动一格。
整个滑动窗口中的消息是我们加载到内存中的消息,一格方格对应一个线程。此时我们可以通过窗口的大小来控制重复消费的范围。
注意:如果有方格内消息无法被标记为消费成功,此时应当设置一个阈值,超过一定时间后进行重试,如果重试失败就转入重试队列(过段时间再重试),还不奏效就放到死信队列(在此队列中的消息可以用于后续分析当时遇到的异常情况,进而改善和优化系统)
网友评论