美文网首页大数据玩转大数据大数据
Kafka Manager实现原理与填坑

Kafka Manager实现原理与填坑

作者: 扫帚的影子 | 来源:发表于2017-02-28 13:51 被阅读4327次
    Kafka Manager 简介
    • Kafka Manager 可能是现在能找到的最好的可视化的Kafka管理工具, 感谢Yahoo-我人生中打开的一个网站-的开源;
    • 使用Kafka Manager, 基本上之前需要运行Kafka相应命令行工具的工作现在都可以可视化的完成:
      1. 创建Topic, 调整消息保存时长, Partition数量等等配置;
      2. 管理Topic, 包括Reassign Partitions, Preferred Replica Election等等;
      3. 消费情况查看, 支持offset保存到zk和broker两种方式, 列出所有消费的group, 消费每个partition的详情;
    1. 集群的简单健康状态查看,包括partition分布是否均衡, leader分布是否均衡等;
    2. 通过JMX查看各种指标, 比如各个broker的网络流量和消息进出数据, 每个Topic消息的读写速度等;
    • 下面我们会先简单介绍下Kafka Manager的实现和在使用中遇到的几种坑;
    Kafka Manager实现
    • 实现语言: Scala
    • 用到的框架和第三方库:
    1. Play framework: Kafka-Mananger本质上是个Web应用, 因此使用play framework的MVC架构实现;
    2. AKKA: 用于构建高并发、分布式和容错的应用. Kafka Manager中的所有请求都使用akka来异步处理;
    3. Apache Curator Framework: 用于访问zookeeper;
    4. Kafka Sdk: 用于获取各Topic的last offset, 使用Admin接口实现各种管理功能;
    • 编译:
      整个工程使用 sbt 构建, 具体编译流程可以在githut上找到. sbt在build过程中会加载很多第三方依赖, 这个在国内有时会很慢, 各种同学各显神通吧.
    • 实现:
      其实kafka manager的代码还是很清晰易阅读的, 如果熟悉scala和play的话应该没有难度. 不同本人也是现学现用, 好惭愧~~~. 咱们这里捡重点的说吧, 不分析具体代码实现,只讲下实现的方法:
    1. 获取集群中所有Topic
      使用Curator访问zk获取,并监听zk相关节点 /brokers/topics 的变化;
    2. 获取Topic的partiton, leader, replicas信息
      也是从zk获取, /brokers/topics/[topic]/partitions;
    3. 获取Topic的各partition的last offset
      使用kafka sdk发送OffsetRequest到kafka集群来获得, 这个获取的动作会被封装成Future[PartitionOffsetsCapture], 每个topic一个Future, 使用Google的LoadingCache来存储这些future, 利用LoadingCache的超时淘汰机制来周期性的创建新的Future来间隔地发送OffsetRequest获取当前最新的last offset;
    4. 获取Kafka本身管理的group的消费情况
      使用kafka sdk不断地消费"__consumer_offsets"这个topic, 来获取所有group的消费情况,关于__consumer_offsets参考 Committing and fetching consumer offsets in Kafka
    5. 获取zookeeper管理的group的消费情况
      肯定是从zk上读取, /consumers

    上面的这些实现都在 KafkaStateActor.scala 这个文件里.

    • 各种Acotr的关系简图,仅供参考
    kafka-manager.png
    Kafka Manager遇到的坑
    • 多个kafka manager来管理同一个kafka集群:
      你会发现在kafka manager里无法看到所有offset使用kafka本身管理的group.
      前面我们讲过使用kafka sdk不断地消费"__consumer_offsets", 看看这段代码(在KafkaStateActor.scala中):
        props.put("group.id", "KafkaManagerOffsetCache")
        props.put("bootstrap.servers", bootstrapBrokerList.list.map(bi => s"${bi.host}:${bi.port}").mkString(","))
        props.put("exclude.internal.topics", "false")
        props.put("enable.auto.commit", "false")
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
        props.put("auto.offset.reset", "latest")
    

    props.put("group.id", "KafkaManagerOffsetCache")这句说明不管启动了几个kafka manager, 消费"__consumer_offsets"都使用同一个group.
    解决方案: group.id从配置文件中读取,每个kafka manager使用不同的group id;

    • 客户端使用某些sdk(比如librdkafka)消费topic, 客户端crash后, 在kafka manager上查看其group的消费情况, 仍然一直能看到"Consumer Instance Owner"
      原因在于处理从broker返回的GroupMetadata response时没有处理异常情况:
                  case GroupMetadataKey(version, key) =>
                        val value: GroupMetadata = readGroupMessageValue(key, ByteBuffer.wrap(record.value()))
                        value.allMemberMetadata.foreach {
                          mm =>
                            mm.assignment.foreach {
                              case (topic, part) =>
                                groupTopicPartitionMemberMap += (key, topic, part) -> mm
                            }
                        }
                    }
    

    这里的record.value可能为空, 此时应作清理工作:

                      if (null != record &&                                                                                                   
                          null != record.value()) {                                                                                           
                            val value: GroupMetadata = readGroupMessageValue(key, ByteBuffer.wrap(record.value()))                            
                            value.allMemberMetadata.foreach {                                                                                               
                              mm =>                                                                                                                         
                                mm.assignment.foreach {                                                                                                     
                                  case (topic, part) =>                                                                                                     
                                    groupTopicPartitionMemberMap += (key, topic, part) -> mm                                                                
                                }
                            }                                                                                                                               
                            } else {                                                                                                                          
                              groupTopicPartitionMemberMap.foreach {                                                                                          
                                case ((group, topic, part), mmd) =>                                                                                           
                                  if (group == key) {                                                                                                         
                                    var tmp = mmd                                                                                                             
                                    tmp.memberId = ""                                                                                                         
                                    tmp.clientHost = ""                                                                                                       
                                    groupTopicPartitionMemberMap += (key, topic, part) -> tmp                                                                 
                                  }                                                                                                                           
                              }                                                                                                                               
                            }          
    
    • Yikes! Ask timed out on [ActorSelection[Anchor(akka://kafka-manager-system/), Path(/user/kafka-manager)]] after [5000 ms]
      访问kafka manager时出现上面的超时提示, 遇到这个问题,好学不服输的你肯定会上网各种搜, 然后你会去改kafka manager的各种配置, 调大各种thread pool的容量, 增大queue size, 甚至开大jvm的使用内存, 然而问题并没有解决, 看来只剩下定时重启这一招儿了.

    这里提供一种解决方案: 这个超时是Actor在执行异步请求时一直等不到返回结果造成的, 主要是前面讲过的"获取Topic的各partition的last offset的Future"没有返回结果,这些Future是通过Await.ready来阻塞拿到result的, 然而在kafka manager中这个Await.ready没有给timeout, 是一直等待, 那咱们就给个timeout好了, 代码在ActorModel.scala中, 有好几处Await.ready的调用.

    找到根源: 再也不用定时重启, 提了一个pull request到官方:Use a separate thread to get the topic offsets to fixed bug 'Yikes! Ask timed out...', 主要就是不再使用 Future[PartitionOffsetCapture]来获取topic offset, 因为这个会产生大量的Future, 进而会产生大量的task提交到ThreadExcutor, 其实只需要启动一个单独的线程来作这件事就好了.

    • Consumer offset的详情不完整
      通过上面的源码分析我们知道km是通过消费"__consumer_offsets"来获取某一个组的消费情况的,消费这个topic,和消费用户自己的topic没什么两样, km里使用"props.put("auto.offset.reset", "latest")"默认offset无效时从最新位置来拉取, 如果一个group用户已经有段时间没有提交offset(但还没有完全过期), 则此时在km上看不到相应的gorup信息, 可以简单改为"props.put("auto.offset.reset", "earliest")"

    • 同名group消费不同topic后,其中一个group的消费进程结束后, 仍可以看到其消费详情
      这个问题是最近被发现,之前应该是一直存在着,没能引起重视.
      这里提供一种简单的,hack的解决方案:

    case GroupMetadataKey(version, key) =>
                       if (null != record &&                                                                                                   
                         null != record.value()) {                                                                                           
                           val value: GroupMetadata = readGroupMessageValue(key, ByteBuffer.wrap(record.value()))                            
                           var topicSet:Set[String] = Set()
    
                           value.allMemberMetadata.foreach {                                                                                               
                             mm =>                                                                                                                         
                               mm.assignment.foreach {                                                                                                     
                                 case (topic, part) =>                                                                                                     
                                   topicSet += topic
                                   groupTopicPartitionMemberMap += (key, topic, part) -> mm                                                                
                               }
                             }
    
                             groupTopicPartitionMemberMap.foreach {                                                                                          
                               case ((group, topic, part), mmd) =>                                                                                           
                                 if (group == key &&
                                   !topicSet.contains(topic)) {                                                                                                         
                                   var tmp = mmd                                                                                                             
                                   tmp.memberId = ""                                                                                                         
                                   tmp.clientHost = ""                                                                                                       
                                   groupTopicPartitionMemberMap += (key, topic, part) -> tmp                                                                 
                                 }                                                                                                                           
                             }                                                                                                                               
                           
                           } else {                                                                                                                          
                             groupTopicPartitionMemberMap.foreach {                                                                                          
                               case ((group, topic, part), mmd) =>                                                                                           
                                 if (group == key) {                                                                                                         
                                   var tmp = mmd                                                                                                             
                                   tmp.memberId = ""                                                                                                         
                                   tmp.clientHost = ""                                                                                                       
                                   groupTopicPartitionMemberMap += (key, topic, part) -> tmp                                                                 
                                 }                                                                                                                           
                             }                                                                                                                               
                           }                                                                                                                                 
                   }
    
    
    今天就写这么多, 其他坑以后遇到再补充.

    之前一直在写kafka的源码解析,大家有兴趣也可以指正一下 源码解析

    相关文章

      网友评论

      • 0cbf866bbcea:谢谢分享,最近在做lag监控,受益颇多。
        扫帚的影子:@tony86yan 不客气,共同提高.这个githu上官方最近有不少的更新.
      • 稻草人_Sdy:我的storm程序消费kafka topic,有消费数据,但是在 kafka Manager 上找不到我的消费组,用名字在zookeeper 上也找不到。 kafka 版本 0.10.1
        稻草人_Sdy:我的offset 是自己保存的,所有KafkaManager 上找不多的。多谢!
        扫帚的影子:@稻草人_5cb1 先确认consumer里的配置 offset是保存到哪里了
      • 83bc6203ccd7:Yikes! Ask timed out on [ActorSelection[Anchor(akka://kafka-manager-system/), Path(/user/kafka-manager)]] after [5000 ms]
        关于这个错误,研究了好久源码,不说代码原理,直接给一个暴力无脑的解决方法:
        首先,把所有timeout都强行改成10s,相关的文件--ActorModel.scala, KafkaManager.scala。
        然后,把LongRunningPoolActor这个class的longRunningExecutor的初始化LinkedBlockingQueue的大小改成你的kafka集群的所有topic的所有partition数量的总和(大一点的集群有几千)

        当然硬件配置要跟上。
        saup007:@扫帚的影子 很赞,以前要经常重启。尝试更新版本,应该可以解决了。
        扫帚的影子:最近提交了一个path, 自己用了一段时间一直没再出现这个问题,但是还没有被官方merge, 可以参考:https://github.com/yahoo/kafka-manager/pull/456
      • 83bc6203ccd7:这几天要根据kafka manager 源码写个kafka consumer metadata的api,您这篇文章帮大忙了。:pray:
        扫帚的影子: @PaperAndChill 互相学习 一起研究
      • 疯狂的轻骑兵:超时那个问题可以把application.conf中的kafka-manager.api-timeout-millis调大,也能搞定。
        扫帚的影子: @疯狂的轻骑兵 定时采样的 这个正常
        疯狂的轻骑兵: @疯狂的轻骑兵 那个也是俺。今天还发现offset的值有时候跟实际的差比较大,在20%多
        扫帚的影子: @疯狂的轻骑兵 github上那个是你?除了这个地方最近又发现还有会卡的地方 还没时间跟
      • 2c7bb236b838:则此时在km上看不到相应的gorup信息, 可以简单改为"props.put("auto.offset.reset", "earliest")"

        你这个地方写的不对,如果这样设置的话,找不到offset的时候会从头消费的。
        扫帚的影子:@2c7bb236b838 找不到offset时确实入你所说会从头消费__consumer_offsets, 一直会消费到group最后提交的offset信息,所以也是没有问题的.
      • 759ec3f0e1bb:我想问下这个目前支持0.10.x 以上的版本吗
        扫帚的影子: @梓墨林轩 最新版支持 github上已经写了 但我目前用的还是0.9

      本文标题:Kafka Manager实现原理与填坑

      本文链接:https://www.haomeiwen.com/subject/itjxgttx.html