Kafka Indexing Service 是 Druid 推出的利用 Druid 的 Indexing Service 服务实时消费 Kafka 数据的插件。该插件会在 Overlord 中启动一个 supervisor,supervisor 启动之后会在 Middlemanager 中启动一些 indexing task,这些 task 会连接到 Kafka 集群消费 topic 数据,并完成索引创建。您需要做的,就是准备一个数据消费格式文件,之后通过 REST API 手动启动 supervisor。
Kafka Indexing Service 相对 Tranquility 来说
- 不受时间窗口限制,不存在新老task交替和过期数据的问题
- 数据保证exactly-once,druid保存了kafka每个topic每个分片消费的offset的情况,只有segment handoff成功后,offset配置才会更新,所以不会造成数据丢失和重复
- 数据模板及时生效,只要重新提交模板,及时生效
- 任务更健壮,重启middleManager或者shutdownr task,都不会造成数据丢失
配置启动
准备
公共配置添加 druid-kafka-indexing-service插件。( Including Extensions)
druid.extensions.loadList=[……,"druid-kafka-indexing-service"]
配置
Kafka Indexing Service Configuration
{
"type": "kafka",
"dataSchema": {
"dataSource": "metrics-kafka",
"parser": {
"type": "string",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [],
"dimensionExclusions": [
"timestamp",
"value"
]
}
}
},
"metricsSpec": [
{
"name": "count",
"type": "count"
},
{
"name": "value_sum",
"fieldName": "value",
"type": "doubleSum"
},
{
"name": "value_min",
"fieldName": "value",
"type": "doubleMin"
},
{
"name": "value_max",
"fieldName": "value",
"type": "doubleMax"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "HOUR",
"queryGranularity": "NONE"
}
},
"tuningConfig": {
"type": "kafka",
"maxRowsPerSegment": 5000000
},
"ioConfig": {
"topic": "metrics-druid-test",
"consumerProperties": {
"bootstrap.servers": "KAFKA_HOST:KAFKA_PORT"
},
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT1H"
}
}
提交Supervisor
curl -X POST -H 'Content-Type:application/json' -d @superviosr.json http://overlord:8090/druid/indexer/v1/supervisor
提交成功返回supervisor id
{
"id": "metrics-kafka"
}
可以在overlord后台查看supervisor相关状态
image.png
网友评论