概述
Kafka Connect 是一款可扩展并且可靠地在 Apache Kafka 和其他系统之间进行数据传输的工具。 可以很简单的快速定义 connectors 将大量数据从 Kafka 移入和移出. Kafka Connect 可以摄取数据库数据或者收集应用程序的 metrics 存储到 Kafka topics,使得数据可以用于低延迟的流处理。 一个导出的 job 可以将来自 Kafka topic 的数据传输到二级存储,用于系统查询或者批量进行离线分析。
Kafka Connect 功能包括:
- Kafka connectors 通用框架: - Kafka Connect 将其他数据系统和Kafka集成标准化,简化了 connector 的开发,部署和管理
- 分布式和单机模式 - 可以扩展成一个集中式的管理服务,也可以单机方便的开发,测试和生产环境小型的部署。
- REST 接口 - 通过易于使用的REST API提交和管理connectors到您的Kafka Connect集群
- offset 自动管理 - 只需要connectors 的一些信息,Kafka Connect 可以自动管理offset 提交的过程,因此开发人员无需担心开发中offset提交出错的这部分。
- 分布式的并且可扩展 - Kafka Connect 构建在现有的 group 管理协议上。Kafka Connect 集群可以扩展添加更多的workers。
- 整合流处理/批处理 - 利用 Kafka 已有的功能,Kafka Connect 是一个桥接stream 和批处理系统理想的方式。
搭建kafka connect分布式集群
Kafka Connect 当前支持两种执行方式: 单机 (单个进程) 和 分布式.
分布式模式下会自动进行负载均衡,允许动态的扩缩容,并提供对 active task,以及这个任务对应的配置和offset提交记录的容错。
#分布式
bin/connect-distributed.sh config/connect-distributed.properties
connect-distributed.properties文件配置参数可以查看官方文档
配置connector
Connector 配置是简单的key-value 映射的格式。在分布式模式中,它们将被包含在创建(或修改)connector 的请求的JSON格式串中。
rest api
由于Kafka Connect 旨在作为服务运行,它还提供了一个用于管理 connectors 的REST API。默认情况下,此服务在端口8083上运行。以下是当前支持的功能:
- GET /connectors - 返回一个活动的连接器的列表
- POST /connectors - 创建一个新的连接器; 请求主体应为JSON对象,其中包含name字段和带有连接器配置参数的对象config字段
- GET /connectors/{name} - 获取有关特定连接器的信息
- GET /connectors/{name}/config
- PUT /connectors/{name}/config - 更新连接器的参数
- GET /connectors/{name}/status - 获取连接器的当前状态,包括连接器是否正在运行,发生故障,已暂停等,将其分配给哪个工作器,如果连接器发生故障,则显示错误信息,以及其所有任务的状态
- GET /connectors/{name}/tasks
- GET /connectors/{name}/tasks/{taskid}/status
- PUT /connectors/{name}/pause - 暂停连接器及其任务,这将停止消息处理,直到恢复连接器为止
- PUT /connectors/{name}/resume - 恢复已暂停的连接器(如果连接器未暂停,则不执行任何操作)
- POST /connectors/{name}/restart - 重新启动连接器(通常是因为它失败了)
- POST /connectors/{name}/tasks/{taskId}/restart
- DELETE /connectors/{name} - 删除连接器,暂停所有任务并删除其配置
Kafka Connect还提供用于获取有关 connector plugin sss信息的REST API:
- GET /connector-plugins- 返回安装在Kafka Connect集群中的连接器插件的列表。
- PUT /connector-plugins/{connector-type}/config/validate - 根据配置定义验证提供的配置值。
自定义connector
要在Kafka和另一个系统之间复制数据,用户会为想要 pull 数据或者 push 数据的系统创建一个connector。 connector 有两类:SourceConnectors 从其他系统导入数据(e.g.JDBCSourceConnector 会将关系型数据库导入到Kafka中)和SinkConnectors导出数据(e.g. HDFSSinkConnector会将Kafka topic 的内容导出到 HDFS 文件)
Connectors 自身不执行任何数据复制:Connector的配置描述要复制的数据,并且Connector 负责负责将 job 分解为可分发给 worker 的一组 Tasks。这些Tasks也分为两类: SourceTask 和 SinkTask。
通过分配,每个Task 必须将数据的一部分子集复制到Kafka或者从Kafka复制。在 Kafka Connect中,应该始终可以将这些分配的数据框架化为一组输入和输出流,这些流由具有一致结构的记录组成。
开发一个 connector 只需要实现两个接口, Connector 和 Task接口. 一个简单的例子的源码在Kafkafile package中。 connector 用于单机模式,并拥有 SourceConnector 和SourceTask实现来读取一个文件的每行记录,并将其作为记录发发送,SinkConnector的SinkTask将记录写入到文件。
网友评论