配置
[root@book2 schema-registry]# vi schema-registry.properties
listeners=http://book2:8081
kafkastore.connection.url=book2:2181/kafka
kafkastore.topic=_schemas
debug=false
[root@book2 schema-registry]# vi connect-avro-distributed.properties
bootstrap.servers=book2:9092,book3:9092
group.id=connect-cluster
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://book2:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://book2:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status
group.id (默认connect-cluster) - Connect cluster group使用唯一的名称;注意这不能和consumer group ID(消费者组)冲突。
config.storage.topic (默认connect-configs) - topic用于存储connector和任务配置;注意,这应该是一个单个的partition,多副本的topic。你需要手动创建这个topic,以 确保是单个partition(自动创建的可能会有多个partition)。
offset.storage.topic (默认 connect-offsets) - topic用于存储offsets;这个topic应该配置多个partition和副本。
status.storage.topic (默认 connect-status) - topic 用于存储状态;这个topic 可以有多个partitions和副本
启动
/home/confluent-3.2.2/bin/schema-registry-start /home/confluent-3.2.2/etc/schema-registry/schema-registry.properties &
/home/confluent-3.2.2/bin/connect-distributed /home/confluent-3.2.2/etc/schema-registry/connect-avro-distributed.properties &
在分布式模式中,connector(连接器)配置不能使用命令行。要使用下面介绍的REST API来创建,修改和销毁connector。 配置连接器(connector)
Connector的配置是简单的key-value映射。对于独立模式,这些都是在属性文件中定义,并通过在命令行上的Connect处理。在分布式模 式,JSON负载connector的创建(或修改)请求。大多数配置都是依赖的connector,有几个常见的选项:
name - 连接器唯一的名称,不能重复。
connector.class - 连接器的Java类,比如连接器是io.svectors.hbase.sink.HBaseSinkConnector。
tasks.max - 连接器创建任务的最大数。
topics - 作为连接器的输入的topic列表。
由于Kafka Connect的目的是作为一个服务运行,提供了一个用于管理connector的REST API。默认情况下,此服务的端口是8083。以下是当前支持的终端入口:
GET /connectors - 返回活跃的connector列表
POST /connectors - 创建一个新的connector;请求的主体是一个包含字符串name字段和对象config字段(connector的配置参数)的JSON对象。
GET /connectors/{name} - 获取指定connector的信息
GET /connectors/{name}/config - 获取指定connector的配置参数
PUT /connectors/{name}/config - 更新指定connector的配置参数
GET /connectors/{name}/status - 获取connector的当前状态,包括它是否正在运行,失败,暂停等。
GET /connectors/{name}/tasks - 获取当前正在运行的connector的任务列表。
GET /connectors/{name}/tasks/{taskid}/status - 获取任务的当前状态,包括是否是运行中的,失败的,暂停的等,
PUT /connectors/{name}/pause - 暂停连接器和它的任务,停止消息处理,直到connector恢复。
PUT /connectors/{name}/resume - 恢复暂停的connector(如果connector没有暂停,则什么都不做)
POST /connectors/{name}/restart - 重启connector(connector已故障)
参考链接:
github.com/mravi/kafka-connect-hbase
网友评论