美文网首页
Kafka线上消息堆积问题

Kafka线上消息堆积问题

作者: 不是明天 | 来源:发表于2019-03-24 17:53 被阅读0次
    虽然项目中很早使用到了Kafka,但是由于我接手之后业务没有变化,所以这还是我第一次在生产环境接触Kafka,可以说是毫无经验,凭着自己对RocketMQ的理解(毕竟RocketMQ也借鉴了Kafka的设计经验),进行这次问题的排查。因此记录一下。  
    

    一、已知

    公司Kafka的Broker是由平台组维护,用户中心是消费方,这里简称uc, 单点登录是生产方,这里简称SSO。
    该业务是在SSO更新昵称时,通过Kafka发布消息,用户中心订阅该topic,获取用户id及用户名,更新用户相关记录。
    
    上周我被告知我负责的项目出现的消息堆积,需要我排查并处理,并且前几天Broker出现故障重启过。
    

    二、猜测

    首先定位到代码如下:

       @PostConstruct
        public void updateNickname() {
            ConcurrentUtil.execute(new Runnable() {
                @Override
                public void run() {
                    ConcurrentUtil.sleepQuiet(10000);
                    String group = "g_xxx";
                    String topic = "t_xxx";
                    Properties props = new Properties();
                    props.put("enable.auto.commit", "true");
                    props.put("auto.commit.interval.ms", "1000");
                    props.put("session.timeout.ms", "30000");
                    //省略部分代码...
                    consumer = new KafkaConsumer<>(props);
                    consumer.subscribe(CollectionUtil.wrapAsList(topic));
                    
    
                    while (flag) {
                        try {
                            ConsumerRecords<String, String> records = consumer.poll(0);
                            if (records != null && !records.isEmpty()) {
                                for (ConsumerRecord<String, String> record : records) {
                                    String username = record.key();
                                    String nickname = record.value();
                                    if (StringUtil.isNotEmpty(username, nickname)) {
                                        userNickNameService.saveOrUpdateNickname(nickname, username);
                                        //省略部分代码...
                                    }
                                }
                            } else {
                                ConcurrentUtil.sleepQuiet(500L);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
        }
    

    1.首先排查是否代码有更新
    看git提交记录,发现代码最近一次更新是在2016年。所以不可能是更新代码导致的。

    2.怀疑是重启过后,消费端没有重新连接
    (按照RocketMQ的经验)于是提出查看Kafka控制台,看看该消费端是否存在UC中定义的Group “g_xxx”,经过查看确实存在该Group。并且堆积的消息数已达7K左右(由于这是一个低频操作,量相对来说比较多了)。于是又陷入讨论和推测。
    回到代码,代码中没有打印具体日志,不知道目前消费的情况以及是否有消费。一位同事Y提出如果没有正常消费,消费端应该会有异常日志,抱着试一试的心态去查看线上日志。

    3.查看消费线上日志
    通过查看线上消费日志,惊喜发现该类有大量异常日志,是一个数据库异常,因为昵称是唯一索引,因此在更新昵称的时候报错了。
    并且线上还在持续的输出该日志。于是有如下两个疑问:

    •  为什么会出现重复的用户?(SSO是不允许重复昵称的)
      
    •  是否因为该异常,导致Kafka循环消费该错误的消息
      

      但是第二点就很奇怪,我们是捕获了异常的,Kafka怎么会获取到该异常并重试?

      于是从以上两点入手:

    •  查询为什么会有两个一样的昵称同步过来
      
    •  通过代码绕过该用户,先恢复线上正常消费(后期可以人工处理)
      

    三.异常用户处理

    接下来是修改代码,绕过该用户,以为可以暂时解决问题。

      System.out.println(String.format("Kafka-用户昵称消费:【%s】【%s】", username, nickname));
                                        if (!Objects.equals("nickname_xxx", nickname)) {
                                            userNickNameService.saveOrUpdateNickname(nickname, username);
                                        }
    

    四.异常依然存在

    第二天发布之后,对于上面两个问题有了结果

    • 根据日志,发现确实有不同Id的用户更新为同一昵称(这条线就交给他们了,意外发现SSO的一个bug)
    • 平台组依然告知还是堆积...


      image.png

    这个时候又不知道什么情况了。又开始看日志,查看控制台。发现:

    1.其他订阅的Group均能够正常消费,只有UC出现堆积
    2.日志中发现,UC的两台服务器都在消费
    3.日志中还发现,搜索同一用户,有很多的消费日志


    image.png

    根据以上信息推测:

    1。因为一个group只能有一台消费,两台出现消费是否因为出现连接出现问题,重新负载均衡了。

    1. 一个用户被循环消费,应该时因为本地提交消费位置失败,才回出现。

    五.查看资料

    根据以上信息,需要知道Kafka什么时候进行提交消费进度以及心跳保活的方式很关键。这时候Y已经快我一步,找到了问题所在:

    https://stackoverflow.com/questions/46546489/how-does-kafka-consumer-auto-commit-work
    offset自动提交时每次在poll的时候校验的,老版本kafka客户端的心跳貌似也是通过poll实现的,所以怀疑是处理的消息太多超时了,所以consumer被broker认为宕机了,导致一直在uc01和uc02中更换consumer并且offset提交一直失败”

    新版本貌似是单独一个线程维持心跳了
    备注:SSO版本0.10.2.0,UC版本0.10.2.1

    image.png

    我也通过官网文档发现(Kafka官方文档还挺全),该版本默认一批消费的liang量是500条:


    image.png

    六.问题修复

                    props.put("session.timeout.ms", "30000");
    

    30000/500=60ms,60ms处理一次数据库更新操作显然有点难,因此考虑修改消费批次大小:

                    props.put("max.poll.records", "50");
                    
    

    第二天发布上线,消息不再堆积。


    image.png

    七.总结

    之前没有出现问题是因为这是一个低频操作,每一批只有一条或几条数据更新。
    这次问题出现的原因为Broker宕机,导致堆积的消息过多,每一批达到500条消息,导致poll之后消费的时间过长,session超时。服务注册中心zookeeper以为客户端失效进行Rebalance,因此连接到另外一台消费服务器,然而另外一台服务器也出现超时,又进行Rebalance...如此循环,才出现了两台服务器都进行消费,并且一直重复消费。

    找到问题的所在就比较好解决了,加大超时时间、减少拉取条数或者异步进行业务处理,如果消息量比较大的话还可以增加消费线程等。

    起初认为Kafka对于poll的实现也太坑了,为什么心跳机制会隐藏在poll()方法中实现,好在新的版本有新的实现方式。不过后来这也是因为我们在使用Api的过程中不熟悉其实现原理导致的。所以在以后的开发中,一定要知其然,还要只其所以然。

    在解决这个问题的过程中和之后,了解了很多Kafka的实现原理,以及与RocketMQ的异同。消息队列一直是我比较感兴趣的一个方面,后面有机会一定再进行深入研究和对比。

    相关文章

      网友评论

          本文标题:Kafka线上消息堆积问题

          本文链接:https://www.haomeiwen.com/subject/sanuvqtx.html