虽然项目中很早使用到了Kafka,但是由于我接手之后业务没有变化,所以这还是我第一次在生产环境接触Kafka,可以说是毫无经验,凭着自己对RocketMQ的理解(毕竟RocketMQ也借鉴了Kafka的设计经验),进行这次问题的排查。因此记录一下。
一、已知
公司Kafka的Broker是由平台组维护,用户中心是消费方,这里简称uc, 单点登录是生产方,这里简称SSO。
该业务是在SSO更新昵称时,通过Kafka发布消息,用户中心订阅该topic,获取用户id及用户名,更新用户相关记录。
上周我被告知我负责的项目出现的消息堆积,需要我排查并处理,并且前几天Broker出现故障重启过。
二、猜测
首先定位到代码如下:
@PostConstruct
public void updateNickname() {
ConcurrentUtil.execute(new Runnable() {
@Override
public void run() {
ConcurrentUtil.sleepQuiet(10000);
String group = "g_xxx";
String topic = "t_xxx";
Properties props = new Properties();
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
//省略部分代码...
consumer = new KafkaConsumer<>(props);
consumer.subscribe(CollectionUtil.wrapAsList(topic));
while (flag) {
try {
ConsumerRecords<String, String> records = consumer.poll(0);
if (records != null && !records.isEmpty()) {
for (ConsumerRecord<String, String> record : records) {
String username = record.key();
String nickname = record.value();
if (StringUtil.isNotEmpty(username, nickname)) {
userNickNameService.saveOrUpdateNickname(nickname, username);
//省略部分代码...
}
}
} else {
ConcurrentUtil.sleepQuiet(500L);
}
} catch (Throwable e) {
e.printStackTrace();
}
}
}
});
}
1.首先排查是否代码有更新
看git提交记录,发现代码最近一次更新是在2016年。所以不可能是更新代码导致的。
2.怀疑是重启过后,消费端没有重新连接
(按照RocketMQ的经验)于是提出查看Kafka控制台,看看该消费端是否存在UC中定义的Group “g_xxx”,经过查看确实存在该Group。并且堆积的消息数已达7K左右(由于这是一个低频操作,量相对来说比较多了)。于是又陷入讨论和推测。
回到代码,代码中没有打印具体日志,不知道目前消费的情况以及是否有消费。一位同事Y提出如果没有正常消费,消费端应该会有异常日志,抱着试一试的心态去查看线上日志。
3.查看消费线上日志
通过查看线上消费日志,惊喜发现该类有大量异常日志,是一个数据库异常,因为昵称是唯一索引,因此在更新昵称的时候报错了。
并且线上还在持续的输出该日志。于是有如下两个疑问:
为什么会出现重复的用户?(SSO是不允许重复昵称的)
-
是否因为该异常,导致Kafka循环消费该错误的消息
但是第二点就很奇怪,我们是捕获了异常的,Kafka怎么会获取到该异常并重试?
于是从以上两点入手:
查询为什么会有两个一样的昵称同步过来
通过代码绕过该用户,先恢复线上正常消费(后期可以人工处理)
三.异常用户处理
接下来是修改代码,绕过该用户,以为可以暂时解决问题。
System.out.println(String.format("Kafka-用户昵称消费:【%s】【%s】", username, nickname));
if (!Objects.equals("nickname_xxx", nickname)) {
userNickNameService.saveOrUpdateNickname(nickname, username);
}
四.异常依然存在
第二天发布之后,对于上面两个问题有了结果
- 根据日志,发现确实有不同Id的用户更新为同一昵称(这条线就交给他们了,意外发现SSO的一个bug)
-
平台组依然告知还是堆积...
image.png
这个时候又不知道什么情况了。又开始看日志,查看控制台。发现:
1.其他订阅的Group均能够正常消费,只有UC出现堆积
2.日志中发现,UC的两台服务器都在消费
3.日志中还发现,搜索同一用户,有很多的消费日志
image.png
根据以上信息推测:
1。因为一个group只能有一台消费,两台出现消费是否因为出现连接出现问题,重新负载均衡了。
- 一个用户被循环消费,应该时因为本地提交消费位置失败,才回出现。
五.查看资料
根据以上信息,需要知道Kafka什么时候进行提交消费进度以及心跳保活的方式很关键。这时候Y已经快我一步,找到了问题所在:
“ https://stackoverflow.com/questions/46546489/how-does-kafka-consumer-auto-commit-work
offset自动提交时每次在poll的时候校验的,老版本kafka客户端的心跳貌似也是通过poll实现的,所以怀疑是处理的消息太多超时了,所以consumer被broker认为宕机了,导致一直在uc01和uc02中更换consumer并且offset提交一直失败”
image.png新版本貌似是单独一个线程维持心跳了
备注:SSO版本0.10.2.0,UC版本0.10.2.1
我也通过官网文档发现(Kafka官方文档还挺全),该版本默认一批消费的liang量是500条:
image.png
六.问题修复
props.put("session.timeout.ms", "30000");
30000/500=60ms,60ms处理一次数据库更新操作显然有点难,因此考虑修改消费批次大小:
props.put("max.poll.records", "50");
第二天发布上线,消息不再堆积。
image.png
七.总结
之前没有出现问题是因为这是一个低频操作,每一批只有一条或几条数据更新。
这次问题出现的原因为Broker宕机,导致堆积的消息过多,每一批达到500条消息,导致poll之后消费的时间过长,session超时。服务注册中心zookeeper以为客户端失效进行Rebalance,因此连接到另外一台消费服务器,然而另外一台服务器也出现超时,又进行Rebalance...如此循环,才出现了两台服务器都进行消费,并且一直重复消费。
找到问题的所在就比较好解决了,加大超时时间、减少拉取条数或者异步进行业务处理,如果消息量比较大的话还可以增加消费线程等。
起初认为Kafka对于poll的实现也太坑了,为什么心跳机制会隐藏在poll()方法中实现,好在新的版本有新的实现方式。不过后来这也是因为我们在使用Api的过程中不熟悉其实现原理导致的。所以在以后的开发中,一定要知其然,还要只其所以然。
在解决这个问题的过程中和之后,了解了很多Kafka的实现原理,以及与RocketMQ的异同。消息队列一直是我比较感兴趣的一个方面,后面有机会一定再进行深入研究和对比。
网友评论