美文网首页
kafka connector分布式部署

kafka connector分布式部署

作者: 草丛螳螂 | 来源:发表于2017-07-29 16:19 被阅读439次

    配置

    [root@book2 schema-registry]# vi schema-registry.properties

    listeners=http://book2:8081

    kafkastore.connection.url=book2:2181/kafka

    kafkastore.topic=_schemas

    debug=false

    [root@book2 schema-registry]# vi connect-avro-distributed.properties

    bootstrap.servers=book2:9092,book3:9092

    group.id=connect-cluster

    key.converter=io.confluent.connect.avro.AvroConverter

    key.converter.schema.registry.url=http://book2:8081

    value.converter=io.confluent.connect.avro.AvroConverter

    value.converter.schema.registry.url=http://book2:8081

    internal.key.converter=org.apache.kafka.connect.json.JsonConverter

    internal.value.converter=org.apache.kafka.connect.json.JsonConverter

    internal.key.converter.schemas.enable=false

    internal.value.converter.schemas.enable=false

    config.storage.topic=connect-configs

    offset.storage.topic=connect-offsets

    status.storage.topic=connect-status

    group.id (默认connect-cluster) - Connect cluster group使用唯一的名称;注意这不能和consumer group ID(消费者组)冲突。

    config.storage.topic (默认connect-configs) - topic用于存储connector和任务配置;注意,这应该是一个单个的partition,多副本的topic。你需要手动创建这个topic,以 确保是单个partition(自动创建的可能会有多个partition)。

    offset.storage.topic (默认 connect-offsets) - topic用于存储offsets;这个topic应该配置多个partition和副本。

    status.storage.topic (默认 connect-status) - topic 用于存储状态;这个topic 可以有多个partitions和副本

    启动

    /home/confluent-3.2.2/bin/schema-registry-start /home/confluent-3.2.2/etc/schema-registry/schema-registry.properties &

    /home/confluent-3.2.2/bin/connect-distributed /home/confluent-3.2.2/etc/schema-registry/connect-avro-distributed.properties &

    在分布式模式中,connector(连接器)配置不能使用命令行。要使用下面介绍的REST API来创建,修改和销毁connector。 配置连接器(connector)

    Connector的配置是简单的key-value映射。对于独立模式,这些都是在属性文件中定义,并通过在命令行上的Connect处理。在分布式模 式,JSON负载connector的创建(或修改)请求。大多数配置都是依赖的connector,有几个常见的选项:

    name - 连接器唯一的名称,不能重复。

    connector.class - 连接器的Java类,比如连接器是io.svectors.hbase.sink.HBaseSinkConnector。

    tasks.max - 连接器创建任务的最大数。

    topics - 作为连接器的输入的topic列表。

    由于Kafka Connect的目的是作为一个服务运行,提供了一个用于管理connector的REST API。默认情况下,此服务的端口是8083。以下是当前支持的终端入口:

    GET /connectors - 返回活跃的connector列表

    POST /connectors - 创建一个新的connector;请求的主体是一个包含字符串name字段和对象config字段(connector的配置参数)的JSON对象。

    GET /connectors/{name} - 获取指定connector的信息

    GET /connectors/{name}/config - 获取指定connector的配置参数

    PUT /connectors/{name}/config - 更新指定connector的配置参数

    GET /connectors/{name}/status - 获取connector的当前状态,包括它是否正在运行,失败,暂停等。

    GET /connectors/{name}/tasks - 获取当前正在运行的connector的任务列表。

    GET /connectors/{name}/tasks/{taskid}/status - 获取任务的当前状态,包括是否是运行中的,失败的,暂停的等,

    PUT /connectors/{name}/pause - 暂停连接器和它的任务,停止消息处理,直到connector恢复。

    PUT /connectors/{name}/resume - 恢复暂停的connector(如果connector没有暂停,则什么都不做)

    POST /connectors/{name}/restart - 重启connector(connector已故障)

    参考链接:

    github.com/mravi/kafka-connect-hbase

    orchome.com/345

    blog.csdn.net/chuanzhongdu1/article/details/51365535

    blog.csdn.net/ludonqin/article/details/52387769

    相关文章

      网友评论

          本文标题:kafka connector分布式部署

          本文链接:https://www.haomeiwen.com/subject/njcclxtx.html