美文网首页
Kafka--04Kafka connect

Kafka--04Kafka connect

作者: viean | 来源:发表于2018-05-06 00:43 被阅读0次

    Kafka Connect用于在Kafka与其他系统间数据传输的工具。Kafka connect可以获取整个数据库或从所有应用程序服务器收集指标到Kafka;也可以从Kafka中topic数据传输至其他存储或者查询系统或者批处理系统进行离线分析。

    Kafka Connect功能
    Kafka能用框架,提供统一的集成API
    支持分布式模式(distributed)及单机模式(standalone)
    REST接口,用于查看和管理Kafka connectors
    自动化offset管理,开发人员不必担心错误处理的影响
    分布式,可扩展
    流/批处理集成

    Kafka connect两个核心组成 Source和Sink。
    Source:负责导入数据到Kafka;
    Sink :负责从Kafka导出数据;
    (如上二者都称为 connector)


    Kafka connect的几个重要的概念包括:connectors、tasks、workers和converters。
    Connectors-通过管理任务来细条数据流的高级抽象;
    Tasks- 数据写入kafka和数据从kafka读出的实现;
    Workers-运行connectors和tasks的进程;
    Converters- kafka connect和其他存储系统直接发送或者接受数据之间转换数据;

    distribute模式启动:
    需要先建三个broker
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic connect-configs
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 50 --topic connect-offsets
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 10 --topic connect-status
    启动
    bin/connect-distributed.sh config/connect-distributed.properties
    

    通过rest api管理connector

    因为kafka connect的意图是以服务的方式去运行,所以它提供了REST API去管理connectors,默认的端口是8083,你也可以在启动kafka connect之前在配置文件中添加rest.port配置。

    GET /connectors – 返回所有正在运行的connector名
    POST /connectors – 新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。
    GET /connectors/{name} – 获取指定connetor的信息
    GET /connectors/{name}/config – 获取指定connector的配置信息
    PUT /connectors/{name}/config – 更新指定connector的配置信息
    GET /connectors/{name}/status – 获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。
    GET /connectors/{name}/tasks – 获取指定connector正在运行的task。
    GET /connectors/{name}/tasks/{taskid}/status – 获取指定connector的task的状态信息
    PUT /connectors/{name}/pause – 暂停connector和它的task,停止数据处理知道它被恢复。
    PUT /connectors/{name}/resume – 恢复一个被暂停的connector
    POST /connectors/{name}/restart – 重启一个connector,尤其是在一个connector运行失败的情况下比较常用
    POST /connectors/{name}/tasks/{taskId}/restart – 重启一个task,一般是因为它运行失败才这样做。
    DELETE /connectors/{name} – 删除一个connector,停止它的所有task并删除配置。

    #standalone模式启动
     bin/connect-standalone.sh config/connect-standalone.properties config/connector1.properties
    

    相关文章

      网友评论

          本文标题:Kafka--04Kafka connect

          本文链接:https://www.haomeiwen.com/subject/vilyrftx.html