一 :背景
线上kafka消费端因日志异常的解决导致消息堆积。
二 : 日志异常解决导致消息堆积
线上kafka消费端日志异常,频繁打印错误日志,服务器磁盘一天就满了,此时其他服务无法正常工作。报错如下
java.lang.IllegalStateException: Coordinator selected invalid assignment protocol: null
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:217)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
2021-05-06 17:54:38.467 ERROR 4998 --- [teParkcyy-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null
最终查找到原因,是 servlet 生命周期过期,重启服务即可。
三 : 问题套娃
解决kafka消费端日志异常一天后,发现消息堆积,检查kafka消费端日志,正常刷日志,无明显异常信息 (最终发现,自己还是不仔细,日志太多了,夹杂在里面的报错信息被忽略了)
为了确认这个问题,我做了如下的测试来定位问题
1、查看kafka manager 是否有异常
管理界面大部分都看了,未发现异常情况,确认kafka 没问题
2、查看消息是否被消费到
找一个积压的topic ,对消费端的代码进行日志打印,查询消息是否被消费。
发现消息消费正常
3、查看数据库是否有锁表
查询是否因为数据库锁表或其他原因,导致消息消费了,但是没有入库,给我们的错觉是没有消费
发现数据库正常
此时发现一个重要的问题,重启kafka 之后,消息消费正常,并入库,但是到每个时间点入库停止,消费卡主,打印的消费日志停止,不在消费。
4、 再次查看kafka消费端日志
此时在消费端日志打印停止为界,向下查询日志问题,发现
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [parkBook]
日志显示parkBook topic 被拒绝访问
5、代码分析
parkBook topic 存在消费监听,此parkBook topic 应该是之前被人取消了,而我们kafka消费端对 topic 的加载是在启动时候直接加载到内存中,所以取消了并不会立马影响代码的错误。
在我们解决日志异常重启的时候,重新加载topic,导致此时内存中没有 parkBook topic ,监听失败,导致其他topic 也失败了
6、 问题解决
注释 parkBook topic 监听,重启kafka 消费端。
四 :优化
线上的kafka,是分两个Partition ,部署在一台机器上,随着数据的增加,消费能力不足以快速消费
我这边消费端是使用线程池。两倍了线程池的核心线程数、最大线程数
spring继承的kafka,配置 消费者线程为2 :spring.kafka.listener.concurrency=2
网友评论