KafkaOffsetMonitor简述
KafkaOffsetMonitor(下文简称KOM)是有由Kafka开源社区提供的一款Web管理界面,这个应用程序用来实时监控Kafka服务的Consumer以及它们所在的Partition中的Offset,可以浏览当前的消费者组,查看每个Topic的所有Partition的当前消费情况,浏览查阅Topic的历史消费信息等
topic的所有partiton消费情况列表一个topic的历史消费情况
KafkaOffsetMonitor 数据采集展现
数据采集源
Kafka源码中有定义对象ZkUtils(kafka-master\core\src\main\scala\kafka\utils):
ZkUtils而KOM本质上就是对ZkUtils中的这些属性的读取操作。
web实现
KOM是使用jetty作为web容器的,通过angular.js来实现类似MVC功能的。
getGroups具体流程分析:
KOM中一些流程主要体现在app.js和controller.js中。
- 首先需要定义app.js文件,在KOM中的app.js文件为:
var app = angular.module('offsetapp',
["offsetapp.controllers", "offsetapp.directives", "ngRoute"],
function($routeProvider) {
$routeProvider
.when("/", {
templateUrl: "views/grouplist.html",
controller: "GroupListCtrl"
})
.when("/group/:group", {
templateUrl: "views/group.html",
controller: "GroupCtrl"
})
.when("/group/:group/:topic", {
templateUrl: "views/topic.html",
controller: "TopicCtrl"
})
......
;;
});
angular.module("offsetapp.services", ["ngResource"])
.factory("offsetinfo", ["$resource", "$http", function($resource, $http) {
function groupPartitions(cb) {
return function(data) {
var groups = _(data.offsets).groupBy(function(p) {
var t = p.timestamp;
if(!t) t = 0;
return p.group+p.topic+t.toString();
});
groups = groups.values().map(function(partitions) {
return {
group: partitions[0].group,
topic: partitions[0].topic,
partitions: partitions,
logSize: _(partitions).pluck("logSize").reduce(function(sum, num) {
return sum + num;
}),
offset: _(partitions).pluck("offset").reduce(function(sum, num) {
return sum + num;
}),
timestamp: partitions[0].timestamp
};
}).value();
data.offsets = groups;
cb(data);
};
}
return {
getGroup: function(group, cb) {
return $resource("./group/:group").get({group:group}, groupPartitions(cb));
},
......
};
}]);
- 下面是controller.js文件:
angular.module('offsetapp.controllers',["offsetapp.services"])
.controller("GroupCtrl", ["$scope", "$interval", "$routeParams", "offsetinfo",
function($scope, $interval, $routeParams, offsetinfo) {
offsetinfo.getGroup($routeParams.group, function(d) {
$scope.info = d;
$scope.loading=false;
});
$scope.loading=true;
$scope.group = $routeParams.group;
}])
.controller("GroupListCtrl", ["$scope", "offsetinfo",
function($scope, offsetinfo) {
$scope.loading = true;
offsetinfo.listGroup().success(function(d) {
$scope.loading=false;
$scope.groups = d;
});
}])
......
;
- index.html部分代码块
<!-- Collect the nav links, forms, and other content for toggling -->
<div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">
<ul class="nav navbar-nav">
<li><a href="#">Consumer Groups</a></li>
<li><a href="/#/topics">Topic List</a></li>
<li class="dropdown">
<a href="javascript:void(0)" class="dropdown-toggle" data-toggle="dropdown">Visualizations <b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="/#/activetopicsviz">Active Topic Consumers</a></li>
<li><a href="/#/clusterviz">Cluster Overview</a></li>
</ul>
</li>
</ul>
</div><!-- /.navbar-collapse -->
其中"#"表示访问项目根目录:对比app.js文件的
.when("/", {templateUrl: "views/grouplist.html",controller: "GroupListCtrl"})
表示当访问项目根目录时使用的模板文件是grouplist.html,
<div class="page-header">
<h1>Please select the group you would like to monitor</h1>
</div>
<div class="alert alert-info" ng-show="loading">
Loading ...
</div>
<ul class="list-group">
<li ng-repeat="g in groups" class="list-group-item"><a href="./#/group/{{g}}">{{g}}</a></li>
</ul>
使用的controller是GroupListCtrl,继续看controller.js中的GroupListCtrl定义:
.controller("GroupListCtrl", ["$scope", "offsetinfo", function($scope, offsetinfo) {
$scope.loading = true;
offsetinfo.listGroup().success(function(d) {
$scope.loading=false;
$scope.groups = d;
});
}])
会调用offsetinfo.listGroup()方法,再到app.js文件中查看listGroup方法定义:
listGroup: function() {return $http.get("./group");}
这个时候会使用http模块映射到group这个path上,到这里就要看scala的代码了,进到OffsetGetterWeb.scala中,该类继承了UnfilteredWebApp类,在UnfilteredWebApp中定义了启动方法.
继续看group这个path的定义:
case GET(Path(Seg("group" :: Nil))) =>
JsonContent ~> ResponseString(write(getGroups(args)))
调用getGroups方法首先会初始化zkClient和使用zkClient构造OffsetGetter类,接着调用OffsetGetter的getGroups方法:
def getGroups: Seq[String] = {
try {
ZkUtils.getChildren(zkClient, ZkUtils.ConsumersPath)
} catch {
case NonFatal(t) =>
error(s"could not get groups because of ${t.getMessage}", t)
Seq()
}
}
也就是说getGroups就是读取zookeeper中的/consumers目录的数据,读取完成之后通过$scope.groups = d;代码将结果赋给$scope.groups,这样grouplist.html中就可以通过遍历groups来得到每个group了:
<li ng-repeat="g in groups" class="list-group-item"><a href="./#/group/{{g}}">{{g}}</a></li>
<font size=2>注:ng-repeat 指令用于循环输出指定次数的 HTML 元素
得到所有的groups之后,通过./#/group/链接可以访问每个group的具体信息。
数据采集周期
kafka监控的采集周期,也就是刷新时间refresh,还有保留时间retain,是在启动时指定的,默认是10s刷新一次,数据保留2天
kafka监控的采集周期注:KOM的运行需要通过sbt assembly进行编译打包
OffsetGetterWeb中定时任务方法:
def schedule(args: OWArgs) {
def retryTask[T](fn: => T) {
try {
retry(3) {
fn
}
} catch {
case NonFatal(e) =>
error("Failed to run scheduled task", e)
}
}
timer.scheduleAtFixedRate(new TimerTask() {
override def run() {
retryTask(writeToDb(args))
}
}, 0, args.refresh.toMillis)
timer.scheduleAtFixedRate(new TimerTask() {
override def run() {
retryTask(args.db.emptyOld(System.currentTimeMillis - args.retain.toMillis))
}
}, args.retain.toMillis, args.retain.toMillis)
}
def writeToDb(args: OWArgs) {
val groups = getGroups(args)
groups.foreach {
g =>
val inf = getInfo(g, args).offsets.toIndexedSeq
info(s"inserting ${inf.size}")
args.db.insertAll(inf)
}
}
DB写操作:每执行一次(采集)刷新,就会执行一次写操作(insertAll),一次清除旧数据的操作(emptyOld).
DB读操作:监控的读操作只有在查询历史信息(offsetHistory)时才查询DB,其他的数据都是实时的数据。
def offsetHistory(group: String, topic: String): OffsetHistory = database.withSession {
implicit s =>
val o = offsets
.where(off => off.group === group && off.topic === topic)
.sortBy(_.timestamp)
.map(_.forHistory)
.list()
OffsetHistory(group, topic, o)
}
数据库存储
数据库sqlite
KOM的数据库采用sqlite
val database = Database.forURL(s"jdbc:sqlite:$dbfile.db",
driver = "org.sqlite.JDBC")
默认数据库文件位置:.../KafkaOffsetMonitor-master/offsetapp.db
数据库字段
表名:OFFSETS
存储字段:
字段名 | 字段类型 | 说明 | 是否可空 |
---|---|---|---|
id | INTEGER | PRIMARY KEY、AUTOINCREMEN | NOT NULL |
group | VARCHAR(254) | 分组 | NOT NULL |
topic | VARCHAR(254) | 话题 | NOT NULL |
partition | INTEGER | 分区编号 | NOT NULL |
offset | BIGINT | 偏移量 | NOT NULL |
log_size | BIGINT | 分区内已接收消息总量 | NOT NULL |
owner | VARCHAR(254) | 所属者 | 可为null |
timestamp | BIGINT | 时间戳 | NOT NULL |
creation | BIGINT | 创建时间 | NOT NULL |
modified | BIGINT | 最新更新时间 | NOT NULL |
数据库索引:
def idx = index("idx_search", (group, topic))
def tidx = index("idx_time", (timestamp))
def uidx = index("idx_unique", (group, topic, partition, timestamp), unique = true)
存储性能及改造分析
因为KOM可以配置sqlite数据保留时间,定期清除过期数据,具体的存储性能跟存储时间和存储量有关,需要根据需求测试评估。
但基于sqlite本身特性:主打轻便,基于文件
因此对于大规模存储(>100W)性能欠佳,会导致页面加载较慢,且不支持分布式,没有用户管理。
Mysql的功能完全能覆盖Sqlite,若改造则需要将源码中的OffsetDB.scala文件中对数据库操作的函数(insert,insertAll,emptyOld,offsetHistory,maybeCreate)进行改写。
难点分析:要求熟悉scala语言,熟悉scala对mysql的操作。
网友评论