美文网首页
Kafka监控-JMX自定义监控以及常用监控工具比较

Kafka监控-JMX自定义监控以及常用监控工具比较

作者: Mr_Qifei | 来源:发表于2019-03-29 15:49 被阅读0次

    目录:
    一、通过JMX自定义监控
    1.jconsole
    2.Java监控代码:
    二、Kafka三款监控工具比较(转载)
    1.Kafka Web Conslole
    2.Kafka Manager
    3.KafkaOffsetMonitor

    一、通过JMX自定义监控

    通过JMX监控可以看到的数据有:
    broker数据指标
    topic数据指标
    每个partition的数据指标
    consumer消费滞后情况等。

    1、jconsole

    利用jconsole 工具:(可通过jconsole,找到Mbean对应的指标,鼠标悬浮指标上方就能找到代码查询所需的ObjectName。)
    本地直接连接kafka进程
    通过远程连接进程:service:jmx:rmi:///jndi/rmi://127.0.0.1:9999/jmxrmi(启动kafka时需开通JMX端口)
    这里讨论的kafka版本是0.8.1.x和0.8.2.x,这两者在使用jmx监控时会有差异,差异体现在ObjectName之中 。
    所以在本程序中通过Boolean类型的newKafkaVersion来区别对待。
    为确定使用者的objectName,可以利用jconsole工具,找到Mbean对应的指标,鼠标悬浮指标上方就能找到代码查询所需的objectName。

    2、Java代码

    (推荐代码原创《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注作者的微信公众号:朱小厮的博客。)
    启动kafka时,需启动jmx端口:JMX_PORT=9999 bin/kafka-server-start.sh config/server.properties &

    JmxMgr.class
    package monitor;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    /**
    * Created by hidden on 2016/12/8.
    */
    public class JmxMgr {
    private static Logger log = LoggerFactory.getLogger(JmxMgr.class);
    private static List<JmxConnection> conns = new ArrayList<>();
    
    public static boolean init(List<String> ipPortList, boolean newKafkaVersion){
    for(String ipPort:ipPortList){
    log.info("init jmxConnection [{}]",ipPort);
    JmxConnection conn = new JmxConnection(newKafkaVersion, ipPort);
    boolean bRet = conn.init();
    if(!bRet){
    log.error("init jmxConnection error");
    return false;
    }
    conns.add(conn);
    }
    return true;
    }
    
    public static long getMsgInCountPerSec(String topicName){
    long val = 0;
    for(JmxConnection conn:conns){
    long temp = conn.getMsgInCountPerSec(topicName);
    val += temp;
    }
    return val;
    }
    
    public static double getMsgInTpsPerSec(String topicName){
    double val = 0;
    for(JmxConnection conn:conns){
    double temp = conn.getMsgInTpsPerSec(topicName);
    val += temp;
    }
    return val;
    }
    
    public static Map<Integer, Long> getEndOffset(String topicName){
    Map<Integer,Long> map = new HashMap<>();
    for(JmxConnection conn:conns){
    Map<Integer,Long> tmp = conn.getTopicEndOffset(topicName);
    if(tmp == null){
    log.warn("get topic endoffset return null, topic {}", topicName);
    continue;
    }
    for(Integer parId:tmp.keySet()){//change if bigger
    if(!map.containsKey(parId) || (map.containsKey(parId) && (tmp.get(parId)>map.get(parId))) ){
    map.put(parId, tmp.get(parId));
    }
    }
    }
    return map;
    }
    
    public static void main(String[] args) {
    List<String> ipPortList = new ArrayList<>();
    ipPortList.add("localhost:9999");
    //ipPortList.add("xx.101.130.2:9999");
    JmxMgr.init(ipPortList,true);
    
    String topicName = "demo2";
    System.out.println(getMsgInCountPerSec(topicName));
    System.out.println(getMsgInTpsPerSec(topicName));
    System.out.println(getEndOffset(topicName));
    }
    }
    JmxConnection.class
    package monitor;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import javax.management.*;
    import javax.management.remote.JMXConnector;
    import javax.management.remote.JMXConnectorFactory;
    import javax.management.remote.JMXServiceURL;
    import java.io.IOException;
    import java.net.MalformedURLException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Set;
    
    /**
    * Created by hidden on 2016/12/8.
    */
    public class JmxConnection {
    private static Logger log = LoggerFactory.getLogger(JmxConnection.class);
    
    private MBeanServerConnection conn;
    private String jmxURL;
    private String ipAndPort = "localhost:9999";
    private int port = 9999;
    private boolean newKafkaVersion = false;
    
    public JmxConnection(Boolean newKafkaVersion, String ipAndPort){
    this.newKafkaVersion = newKafkaVersion;
    this.ipAndPort = ipAndPort;
    }
    
    public boolean init(){
    jmxURL = "service:jmx:rmi:///jndi/rmi://" +ipAndPort+ "/jmxrmi";
    log.info("init jmx, jmxUrl: {}, and begin to connect it",jmxURL);
    try {
    JMXServiceURL serviceURL = new JMXServiceURL(jmxURL);
    JMXConnector connector = JMXConnectorFactory.connect(serviceURL,null);
    conn = connector.getMBeanServerConnection();
    if(conn == null){
    log.error("get connection return null!");
    return false;
    }
    } catch (MalformedURLException e) {
    e.printStackTrace();
    return false;
    } catch (IOException e) {
    e.printStackTrace();
    return false;
    }
    return true;
    }
    
    public String getTopicName(String topicName){
    String s;
    if (newKafkaVersion) {
    s = "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=" + topicName;
    } else {
    s = "\"kafka.server\":type=\"BrokerTopicMetrics\",name=\"" + topicName + "-MessagesInPerSec\"";
    }
    return s;
    }
    
    /**
    * @param topicName: topic name, default_channel_kafka_zzh_demo
    * @return 获取发送量(单个broker的,要计算某个topic的总的发送量就要计算集群中每一个broker之和)
    */
    public long getMsgInCountPerSec(String topicName){
    String objectName = getTopicName(topicName);
    Object val = getAttribute(objectName,"Count");
    String debugInfo = "jmxUrl:"+jmxURL+",objectName="+objectName;
    if(val !=null){
    log.info("{}, Count:{}",debugInfo,(long)val);
    return (long)val;
    }
    return 0;
    }
    
    /**
    * @param topicName: topic name, default_channel_kafka_zzh_demo
    * @return 获取发送的tps,和发送量一样如果要计算某个topic的发送量就需要计算集群中每一个broker中此topic的tps之和。
    */
    public double getMsgInTpsPerSec(String topicName){
    String objectName = getTopicName(topicName);
    Object val = getAttribute(objectName,"OneMinuteRate");
    if(val !=null){
    double dVal = ((Double)val).doubleValue();
    return dVal;
    }
    return 0;
    }
    
    private Object getAttribute(String objName, String objAttr)
    {
    ObjectName objectName =null;
    try {
    objectName = new ObjectName(objName);
    } catch (MalformedObjectNameException e) {
    e.printStackTrace();
    return null;
    }
    return getAttribute(objectName,objAttr);
    }
    
    private Object getAttribute(ObjectName objName, String objAttr){
    if(conn== null)
    {
    log.error("jmx connection is null");
    return null;
    }
    
    try {
    return conn.getAttribute(objName,objAttr);
    } catch (MBeanException e) {
    e.printStackTrace();
    return null;
    } catch (AttributeNotFoundException e) {
    e.printStackTrace();
    return null;
    } catch (InstanceNotFoundException e) {
    e.printStackTrace();
    return null;
    } catch (ReflectionException e) {
    e.printStackTrace();
    return null;
    } catch (IOException e) {
    e.printStackTrace();
    return null;
    }
    }
    
    /**
    * @param topicName
    * @return 获取topicName中每个partition所对应的logSize(即offset)
    */
    public Map<Integer,Long> getTopicEndOffset(String topicName){
    Set<ObjectName> objs = getEndOffsetObjects(topicName);
    if(objs == null){
    return null;
    }
    Map<Integer, Long> map = new HashMap<>();
    for(ObjectName objName:objs){
    int partId = getParId(objName);
    Object val = getAttribute(objName,"Value");
    if(val !=null){
    map.put(partId,(Long)val);
    }
    }
    return map;
    }
    
    private int getParId(ObjectName objName){
    if(newKafkaVersion){
    String s = objName.getKeyProperty("partition");
    return Integer.parseInt(s);
    }else {
    String s = objName.getKeyProperty("name");
    
    int to = s.lastIndexOf("-LogEndOffset");
    String s1 = s.substring(0, to);
    int from = s1.lastIndexOf("-") + 1;
    
    String ss = s.substring(from, to);
    return Integer.parseInt(ss);
    }
    }
    
    private Set<ObjectName> getEndOffsetObjects(String topicName){
    String objectName;
    if (newKafkaVersion) {
    objectName = "kafka.log:type=Log,name=LogEndOffset,topic="+topicName+",partition=*";
    }else{
    objectName = "\"kafka.log\":type=\"Log\",name=\"" + topicName + "-*-LogEndOffset\"";
    }
    ObjectName objName = null;
    Set<ObjectName> objectNames = null;
    try {
    objName = new ObjectName(objectName);
    objectNames = conn.queryNames(objName,null);
    } catch (MalformedObjectNameException e) {
    e.printStackTrace();
    return null;
    } catch (IOException e) {
    e.printStackTrace();
    return null;
    }
    
    return objectNames;
    }
    }
    

    二、Kafka三款监控工具比较

    (转载自蓝色天堂博客,本文链接地址:http://hadoop1989.com/2015/09/22/Kafka-Monitor_Compare)
    之前的博客中,介绍了Kafka Web Console这个监控工具,在生产环境中使用,运行一段时间后,发现该工具会和Kafka生产者、消费者、ZooKeeper建立大量连接,从而导致网络阻塞。并且这个Bug也在其他使用者中出现过,看来使用开源工具要慎重!该Bug暂未得到修复,不得已,只能研究下其他同类的Kafka监控软件。
    通过研究,发现主流的三种kafka监控程序分别为:
    Kafka Web Conslole
    Kafka Manager
    KafkaOffsetMonitor
    现在依次介绍以上三种工具:

    1、Kafka Web Conslole

    使用Kafka Web Console,可以监控:
    Brokers列表、Kafka 集群中 Topic列表,及对应的Partition、LogSize等信息
    点击Topic,可以浏览对应的Consumer Groups、Offset、Lag等信息
    生产和消费流量图、消息预览…

    程序运行后,会定时去读取kafka集群分区的日志长度,读取完毕后,连接没有正常释放,一段时间后产生大量的socket连接,导致网络堵塞。

    2、Kafka Manager

    雅虎开源的Kafka集群管理工具:
    管理几个不同的集群
    监控集群的状态(topics, brokers, 副本分布, 分区分布)
    产生分区分配(Generate partition assignments)基于集群的当前状态
    重新分配分区

    3、KafkaOffsetMonitor

    KafkaOffsetMonitor可以实时监控:
    Kafka集群状态
    Topic、Consumer Group列表
    图形化展示topic和consumer之间的关系
    图形化展示consumer的Offset、Lag等信息

    总结:

    通过使用,个人总结以上三种监控程序的优缺点:
    Kafka Web Console:监控功能较为全面,可以预览消息,监控Offset、Lag等信息,但存在bug,不建议在生产环境中使用。
    Kafka Manager:偏向Kafka集群管理,若操作不当,容易导致集群出现故障。对Kafka实时生产和消费消息是通过JMX实现的。没有记录Offset、Lag等信息。
    KafkaOffsetMonitor:程序一个jar包的形式运行,部署较为方便。只有监控功能,使用起来也较为安全。

    若只需要监控功能,推荐使用KafkaOffsetMonito,若偏重Kafka集群管理,推荐使用Kafka Manager。
    因为都是开源程序,稳定性欠缺。故需先了解清楚目前已存在哪些Bug,多测试一下,避免出现类似于Kafka Web Console的问题。

    重度引用:
    如何使用JMX监控Kafka:
    https://blog.csdn.net/u013256816/article/details/53524884https://blog.csdn.net/u013256816/article/details/53524884
    Kafka三款监控工具比较:
    https://www.jianshu.com/p/97ad87a933e1

    相关文章

      网友评论

          本文标题:Kafka监控-JMX自定义监控以及常用监控工具比较

          本文链接:https://www.haomeiwen.com/subject/xholbqtx.html