美文网首页
Kafka Indexing Service

Kafka Indexing Service

作者: zfylin | 来源:发表于2019-08-08 18:21 被阅读0次

Kafka Indexing Service 是 Druid 推出的利用 Druid 的 Indexing Service 服务实时消费 Kafka 数据的插件。该插件会在 Overlord 中启动一个 supervisor,supervisor 启动之后会在 Middlemanager 中启动一些 indexing task,这些 task 会连接到 Kafka 集群消费 topic 数据,并完成索引创建。您需要做的,就是准备一个数据消费格式文件,之后通过 REST API 手动启动 supervisor。

Kafka Indexing Service 相对 Tranquility 来说

  1. 不受时间窗口限制,不存在新老task交替和过期数据的问题
  2. 数据保证exactly-once,druid保存了kafka每个topic每个分片消费的offset的情况,只有segment handoff成功后,offset配置才会更新,所以不会造成数据丢失和重复
  3. 数据模板及时生效,只要重新提交模板,及时生效
  4. 任务更健壮,重启middleManager或者shutdownr task,都不会造成数据丢失

配置启动

准备

公共配置添加 druid-kafka-indexing-service插件。( Including Extensions)

druid.extensions.loadList=[……,"druid-kafka-indexing-service"]

配置

Kafka Indexing Service Configuration

{
  "type": "kafka",
  "dataSchema": {
    "dataSource": "metrics-kafka",
    "parser": {
      "type": "string",
      "parseSpec": {
        "format": "json",
        "timestampSpec": {
          "column": "timestamp",
          "format": "auto"
        },
        "dimensionsSpec": {
          "dimensions": [],
          "dimensionExclusions": [
            "timestamp",
            "value"
          ]
        }
      }
    },
    "metricsSpec": [
      {
        "name": "count",
        "type": "count"
      },
      {
        "name": "value_sum",
        "fieldName": "value",
        "type": "doubleSum"
      },
      {
        "name": "value_min",
        "fieldName": "value",
        "type": "doubleMin"
      },
      {
        "name": "value_max",
        "fieldName": "value",
        "type": "doubleMax"
      }
    ],
    "granularitySpec": {
      "type": "uniform",
      "segmentGranularity": "HOUR",
      "queryGranularity": "NONE"
    }
  },
  "tuningConfig": {
    "type": "kafka",
    "maxRowsPerSegment": 5000000
  },
  "ioConfig": {
    "topic": "metrics-druid-test",
    "consumerProperties": {
      "bootstrap.servers": "KAFKA_HOST:KAFKA_PORT"
    },
    "taskCount": 1,
    "replicas": 1,
    "taskDuration": "PT1H"
  }
}

提交Supervisor

curl -X POST -H 'Content-Type:application/json' -d @superviosr.json http://overlord:8090/druid/indexer/v1/supervisor

提交成功返回supervisor id

{
    "id": "metrics-kafka"
}

可以在overlord后台查看supervisor相关状态

image.png

Task创建过程

image.png

Task 终止过程

image.png

相关文章

网友评论

      本文标题:Kafka Indexing Service

      本文链接:https://www.haomeiwen.com/subject/hlrfjctx.html