美文网首页Apache Kafka
基于Splunk Connector for Kafka 实现S

基于Splunk Connector for Kafka 实现S

作者: 半夜菊花茶 | 来源:发表于2019-09-30 08:24 被阅读0次

Kafka是Apache的基于订阅发布模式的分布式消息队列,主要应用于数据管道和服务之间的流式数据,很多公司和组织都在利用Apache Kafka提供的能力构建下一代流式应用。本文介绍如何使用开源的Splunk Connector for Kafka消费Kafka数据并转发至Splunk,同时可以自由指定写入数据的sourcetype,便于后续数据解析处理。
Splunk Connect for Kafka是一种侵入式的客户端,基于Kafka Connect框架开发,用于将数据从Kafka的topic取出,并通过Splunk HTTP Event Collector(HEC)发送到Splunk

0x01 准备工作

0x02 配置Splunk

HTTP Event Collector(HEC)

配置Splunk接收从Kafka发送过来的消息需要配置HEC,通常有两种方式配置接收,即通过HEC接收后由Heavy Forwarder转发或者直接发送到Indexer节点,本文采用第二种方式,将消息直接发送到Indexer。
配置HEC,登录Splunk Web 页面,Setting > Data Inputs > HTTP Event Collector 点击Global Settings,确认All Tokens 为enabled状态,然后点击Save。这里要注意下面的HTTP Port Number,在后面的配置中会用到它。


image.png

然后点击New Token来创建一个新的HEC 令牌。
有关Splunk Acknowledgement 先不在本文讨论,感兴趣的同学请自行google


image.png

下一步会提示设置sourcetype,context和index等内容,按需配置即可。
对于大规模分布式的Splunk环境,生成token的方式请参考官方文档

Connector安装和配置

首先解压从GitHub上下载的Connector包,并将它复制到所有Kafka Connect节点上(默认情况下就是Kafka的安装节点)
修改配置文件,路径默认在Kafka安装目录下的config目录config/connect-distributed.properties

#These settings may already be configured if you have deployed a connector in your Kafka Connect Environment
bootstrap.servers=<BOOTSTRAP_SERVERS>
plugin.path=<PLUGIN_PATH>
#Required
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.flush.interval.ms=10000
#Recommended
group.id=kafka-connect-splunk-hec-sink

注意:

  • bootstrap.servers 填写所有Kafka Broker的IP:端口,用逗号隔开
  • plugin.path 填写Splunk connector for kafka JAR包位置,必须保证位置可达
    修改完成后保存文档,并确保所有Kafka Connect节点都已近完成上述修改,然后在每个节点都执行以下命令开启Kafka Connect
./bin/connect-distributed.sh config/connect-distributed.properties

执行以下命令验证插件是否启动

curl http://<KAFKA_CONNECT_HOST>:8083/connector-plugins

返回信息应该会是Connector的名称com.splunk.kafka.connect.SplunkSinkConnector

创建任务

通过上面的步骤我们启动了一个Splunk Connector for Kafka的服务,但是并没有指定要订阅哪个主题以及发送的目的端,接下来我们需要通过RESTful API创建任务。
在Kafka Connect节点上执行:

curl localhost:8083/connectors -X POST -H "Content-Type: application/json" -d'{
  "name": "splunk-prod-financial",
    "config": {
     "connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
     "tasks.max": "3",
     "topics": "NIP_SYSLOG_1515UDP2",
     "splunk.indexes": "main",
     "splunk.hec.uri":"https://192.168.134.56:8088",
     "splunk.hec.token": "4b6c844b-125c-4133-b23f-2f4de21b634a",
     "splunk.hec.ack.enabled" : "true",
     "splunk.hec.raw" : "false",
     "splunk.hec.json.event.enrichment" : "org=fin,bu=south-east-us",
     "splunk.hec.ssl.validate.certs": "false",
     "splunk.hec.track.data" : "true"
    }
}'

这里需要注意:

  • name: Connector的名字,这个名字也将作为Kafka消费者的组名
  • topics: Splunk消费的目标主题名称,用逗号隔开
  • Splunk.hec.uri: Splunk HEC URI,此处填写开启HEC的节点的URI,包含IP和端口,这里的端口是上面在配置HEC的时候设置的,比如8088
  • Splunk.hec.token: Splunk HTTP Event Collector token
  • Splunk.hec.ack.enabled: 可以设置为true或者false,设置为true时,Splunk Kafka Connector在查询Kafka offsets之前会先通过POST发送ACK请求,这个机制用来保障数据不丢失
  • Splunk.hec.json.event.enrichment: 只对event HEC endpoint有效,主要是用来在元数据中增加字段
  • splunk.hec.ssl.validate.certs: 开启或关闭SSL证书校验,如果你的Splunk使用自签发证书,这里可以考虑关闭SSL证书校验,否则会报证书错误
    完整的参数列表可以参考GitHub的说明:
    https://github.com/splunk/kafka-connect-splunk

验证

要验证配置是否正确,需要往Kafka指定的主题里面写入数据,然后在Splunk上查看是否可以收到对应数据内容。写入数据有很多种方式,可以使用Splunk提供的数据生成工具Kafka data-gen-app或者kafka-console-producer,再或者使用kafka自带的客户端直接写消息进去,比如:
在kafka节点上执行:

bin/kafka-console-producer.sh --broker-list 192.168.134.57:9092 --topic NIP_SYSLOG_1515UDP2
image.png

然后在Splunk 搜索:


image.png

其他常用API

通过调用任务创建API,Splunk Connector开始订阅指定主题,此时我们可以通过另外几个API查询任务状态

描述 命令
列出有效connectors curl http://localhost:8083/connectors
获取指定Connector信息 curl http://localhost:8083/connectors/kafka-connect-splunk
获取指定Connector配置 curl http://localhost:8083/connectors/kafka-connect-splunk/config
删除指定Connector curl http://localhost:8083/connectors/kafka-connect-splunk -X DELETE
获取指定connector任务信息 curl http://localhost:8083/connectors/kafka-connect-splunk/tasks

参考链接

Install and Use Splunk Connect for Kafka
Splunk Connect for Kafka – Connecting Apache Kafka with Splunk
Github Kafa-connect-splunk

相关文章

网友评论

    本文标题:基于Splunk Connector for Kafka 实现S

    本文链接:https://www.haomeiwen.com/subject/sfzpuctx.html