再看 Kafka Lag

作者: Java_老男孩 | 来源:发表于2019-05-22 16:26 被阅读4次

在《Kafka的Lag计算误区及正确实现》一文中提及了kafka.admin.ConsumerGroupCommand.PartitionAssignmentState无法被外部访问,故要将PartitionAssignmentState前的protected修饰符去掉

可以直接将describeGroup返回的结果转换成JSON然后传至监控页面(supported by YANGliiN oba)。代码如下:

String[] agrs = {"--describe", "--bootstrap-server", brokers, "--group", groupId};
ConsumerGroupCommand.ConsumerGroupCommandOptions options =
        new ConsumerGroupCommand.ConsumerGroupCommandOptions(agrs);
ConsumerGroupCommand.KafkaConsumerGroupService kafkaConsumerGroupService =
        new ConsumerGroupCommand.KafkaConsumerGroupService(options);
ObjectMapper mapper = new ObjectMapper();
//1. 使用jackson-module-scala_2.12
mapper.registerModule(new DefaultScalaModule());
//2. 反序列化时忽略对象不存在的属性
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
//3. 将Scala对象序列化成JSON字符串
String source = mapper.writeValueAsString(kafkaConsumerGroupService.describeGroup()._2.get());

这里需要采用的是jackson-module-scala的包实现,如果直接用普通的JSON序列化方式那么会达不到想要的效果,jackson以及jackson-module-scala对应的Maven库如下:

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-core</artifactId>
    <version>2.9.4</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.module</groupId>
    <artifactId>jackson-module-scala_2.12</artifactId>
    <version>2.9.5</version>
</dependency>

注意如果本地安装的Scala版本与所配置的jackson-module-scala版本不一致的话会报出一些异常。发散一下思维:既然可以序列化为JSON,那么完全可以通过JSON再反序列化会对象,只不过通过JSON作为中间媒介,将访问受限的Scala对象转变为Java对象,上面剩余代码如下:

//4. 将JSON字符串反序列化成Java对象
List<PartitionAssignmentState> target = mapper.readValue(source,
        getCollectionType(mapper,List.class,PartitionAssignmentState.class));
//5. 排序
target.sort((o1, o2) -> o1.getPartition() - o2.getPartition());
//6. 打印
printPasList(target);

如此就可以达到与前面几篇文章中关于获取消费者详情功能同样的效果。这里有两个注意要点:

  1. PartitionAssignmentState中的coordinator是Node类型,这个类型需要自定义,Kafka原生的会报错。
  2. 反序列化时Node会有一个empty的属性不识别,解决方案参考代码中的步骤2.

代码更多细节请参考:代码

通过JSON的序列化和反序列化操作实现了原本不能为之的事情,那么思维再发散一下,也可以序列化成字节流,比如通过ByteBuffer进行转换,只不过编程逻辑变得复杂了。

上面这段陈述有可能会让人觉得Scala与Java之间的互操作起来不容易,其实不然,上面这段陈述只是用来补充一下如何获取消费者详情的另一种方法,Scala与Java之间的互操作还是比较简单的,一般情况下都可以直接使用对方的类。对于集合而言,Scala中还有用于Scala与Java集合的互转的scala.collection.JavaConverters(scala2.8.1开始引入),与此雷同的scala.collection.JavaConversions已被标注为@Deprecated(since 2.12.0)。在scala代码中如果需要集合转换,首先引入scala.collection.JavaConverters._,进而显示调用asJava或者asScala方法完成转型。关于Scala与Java集合互转的介绍会在下一篇文章中呈现。

相关文章

  • 再看 Kafka Lag

    在《Kafka的Lag计算误区及正确实现》一文中提及了kafka.admin.ConsumerGroupComma...

  • Kafka 消费Lag监控

    需求描述:lag(滞后)是kafka消费队列性能监控的重要指标,lag的值越大,表示kafka的堆积越严重。本篇文...

  • 聊聊kafka consumer offset lag incr

    序 本文主要解析一下遇到的一个kafka consumer offset lag不断增大的异常。 查看consum...

  • Kafka的消费积压监控-Burrow

    使用kafka, 消费生产的数据是必不可少的, 为不影响业务的正常处理, 对消费过程的积压lag的监控和报警就显得...

  • kafka-python 获取topic lag值

    说真,这个问题看上去很简单,但“得益”与kafka-python神奇的文档,真的不算简单,反正我是搜了半天还看了半...

  • 再看kafka——spring boot集成kafka

    之前自己写过一篇入门文章kafka简单入门及与spring boot整合,主要是结合kafka官方的文档入门,学习...

  • HIVE:常用分析函数

    1、lag() over() lag(pay_succ_time, 1, '1990-01-01 00:00:00...

  • 窗口函数示例2-lag:

    有关开窗函数的基本语法参照 参考地址 1.LAG & LEAD LAG(col,n,DEFAULT) 用于统计窗口...

  • Kafka的Lag计算误区及正确实现

    前言 消息堆积是消息中间件的一大特色,消息中间件的流量削峰、冗余存储等功能正是得益于消息中间件的消息堆积能力。然而...

  • Kafka 的 Lag 计算误区及正确实现

    前言 消息堆积是消息中间件的一大特色,消息中间件的流量削峰、冗余存储等功能正是得益于消息中间件的消息堆积能力。然而...

网友评论

    本文标题:再看 Kafka Lag

    本文链接:https://www.haomeiwen.com/subject/qvdjzqtx.html