美文网首页大数据
开源组件Kafka Connect推荐

开源组件Kafka Connect推荐

作者: 佛系小懒 | 来源:发表于2020-02-05 20:56 被阅读0次

    Kafka Connect简介

    Kafka Connect是Kafka的开源组件Confluent提供的功能,用于实现Kafka与外部系统的连接。Kafka Connect同时支持分布式模式和单机模式,另外提供了一套完整的REST接口,用于查看和管理Kafka Connectors,还具有offset自动管理,可扩展等优点。

    使用Kafka Connect连接Kafka和Elasticsearch

    1 测试环境准备

    ES服务、Kafka服务、kafka topic:kafka_es_test

    2 Kafka Connect 安装

    下载地址:https://www.confluent.io/download/

    3 Worker配置

    1) 配置参考 ,参考如下 :

    [通用配置]

    [Standalone Woker配置]

    [Distributed Worker配置]

    Kafka Connect默认使用AvroConverter,使用该AvroConverter时必须先启动Schema Registry服务

    2) 实际操作  

    修改./schema-registry/connect-avro-standalone.properties,配置bootstrap.servers=kafka服务地址

    4 Elasticsearch Connector配置

    1) 配置参考  

    [Connectors通用配置]

    [Elasticsearch Configuration Options]

    2) 实际操作  

    对./kafka-connect-elasticsearch/quickstart-elasticsearch.properties做如下修改:

    name=elasticsearch-sink

    connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector

    tasks.max=1

    topics=kafka_es_test

    key.ignore=true

    connection.url=http://ES服务地址

    type.name=kafka-connect

    注意: 其中topics、kafka中topic名称、Elasticsearch的索引名三者通常一致,也可以通过topic.index.map来设置从topic名到Elasticsearch索引名的映射

    5 启动connector

    5.1  启动Schema Registry服务

    该服务需要指定一个zookeeper地址或Kafka地址,以存储schema数据。具体操作如下: 

    1) 启动Zookeeper

    ./bin/zookeeper-server-start-daemon etc/kafka/zookeeper.properties

    2) 启动kafka

    ./bin/kafka-server-start-daemon etc/kafka/server.properties

    3) 启动schema Registry

    ./bin/schema-registry-start-daemon etc/schema-registry/schema-registry.properties

    4) 使用netstat -natpl 查看各服务端口是否正常启动  :zookeeper 、kafka 、schema registry各自的地址

    5.2 启动Connector

    ./bin/connect-standalone -daemon  etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties

    以上启动各服务均可在logs目录下找到对应日志

    6 启动Kafka Producer

    1) 启动Producer

    ./bin/kafka-avro-console-producer --broker-list XXXX:9092 --topic kafka_es_test --property value.schema='{"type":"record","name":"person","fields":[{"name":"nickname","type":"string"}]}'

    2) 输入如下数据

    {"nickname":"michel"}{"nickname":"mushao"}

    7 Kibana验证结果

    查看索引 ,GET_cat/indices结果可以看到名为kafka_es_test的索引被成功创建

    Confluent CLI

    CLI目前只是适用于开发阶段,不能用于生产环境。  

    它可以一键启动包括zookeeper,kafka,schema registry, kafka rest, connect等在内的多个服务;使用CLI启动默认配置为启动Distributed的Connector,需要通过环境变量来修改配置  

    使用Confluent CLI

    confluent CLI提供了丰富的命令,包括服务启动,服务停止,状态查询,日志查看等

    1) 启动./bin/confluent start

    2) 检查confluent运行状态./bin/confluent status

    3) 问题定位,如果第二步出现问题,可以使用log命令查看,如connect未启动成功则./bin/confluent log connect

    4) 加载Elasticsearch Connector

    a) 查看connector

    ./bin/confluentlistconnectors

    b) 加载Elasticsearch connector

    ./bin/confluentloadelasticsearch-sink

    结果,输出quickstart-elasticsearch.properties配置文件中包含的信息

    {"name":"elasticsearch-sink","config": {"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","tasks.max":"1","topics":"kafka_es_test","key.ignore":"true","connection.url":"http://192.168.0.8:9200","type.name":"kafka-connect","name":"elasticsearch-sink"    },"tasks": [],"type":null}

    5) 使用producer生产数据,并使用kibana验证是否写入成功

    Kafka Connect Rest API

    Kafka Connect提供了一套完成的管理Connector的接口,详情参考[Kafka Connect REST Interface]。该接口可以实现对Connector的创建,销毁,修改,查询等操作

    1) GET connectors 获取运行中的connector列表

    2) POST connectors 使用指定的名称和配置创建connector

    3) GET connectors/(string:name) 获取connector的详细信息

    4) GET connectors/(string:name)/config 获取connector的配置

    5) PUT connectors/(string:name)/config 设置connector的配置

    6) GET connectors/(string:name)/status 获取connector状态

    7) POST connectors/(stirng:name)/restart 重启connector

    8) PUT connectors/(string:name)/pause 暂停connector

    9) PUT connectors/(string:name)/resume 恢复connector

    10)DELETEconnectors/(string:name)/ 删除connector

    11)GETconnectors/(string:name)/tasks 获取connectors任务列表

    12)GET/connectors/(string:name)/tasks/(int: taskid)/status获取任务状态

    13) POST /connectors/(string:name)/tasks/(int: taskid)/restart 重启任务

    14)GET/connector-plugins/ 获取已安装插件列表

    15) PUT /connector-plugins/(string:name)/config/validate验证配置

    原文链接

    相关文章

      网友评论

        本文标题:开源组件Kafka Connect推荐

        本文链接:https://www.haomeiwen.com/subject/akdexhtx.html