美文网首页大数据HadoopHadoop系
Kafka Connect的安装和配置

Kafka Connect的安装和配置

作者: 阿猫阿狗Hakuna | 来源:发表于2019-01-15 14:55 被阅读1次

    安装计划

           在使用Kafka Connect时,需要注意一些事项,以帮助你构建适应长期需求的datapipeline。本章旨在提供有关的一些上下文。

    预备条件

           要开始使用Kafka Connect,只有一个硬性的先决条件:一个Kafka的broker集群。然而,随着集群增长,有几个问题需要提前考虑:

    • 内部Topic创建
      正如我们将要详细讨论的,使用高复制因子、压缩清理策略以及正确的分区数量提前创建Kafka Connect所需的内部topic非常重要。这有助于避免以后重新校准这些主题。
    • Schema Registry
      虽然Schema Registry不是Kafka Connect所必须的服务,但它能使您轻松地将Avro用作所有connector的公共数据格式。这最小化了编写定制代码的需求,并以灵活的格式标准化数据。此外,还可以从schema烟花的强制兼容性规则中获益。

    Standalone vs Distributed

           在开始之前,确定哪种模式最适合您的环境非常有用。对于适合单个代理的环境(例如从web服务器向Kafka发送日志),standalone模式非常适合。在单个source或sink可能需要大量数据的用例中(例如,将数据从Kafka发送到HDFS),分布式模式在可伸缩性方面更加灵活,并提供了高可用性服务,从而最小化停机时间。

    Plugins安装

           Kafka Connect插件是一组jar文件,Kafka Connect可以在其中找到一个或多个connector、transform、以及converter的实现。Kafka Connect将每个插件彼此隔离,这样一个插件中的库就不会受到其他插件库的影响,这点非常重要。

    Kafka Connect plugin是:
    (1)在一个uber jar文件中包含插件及所有第三方依赖;或
    (2)一个包含jar包和第三方依赖的目录。

           Kafka Connect使用plugin path找到插件,这是Kafka Connect在worker配置文件中定义的一个以逗号分隔的目录列表。要安装插件,请将目录或uber jar放在plugin path路径中列出的目录中。

           举个例子,我们在每台机器上创建一个/usr/local/share/kafka/plugins目录,然后将我们所有的插件jar或插件目录放入其中。然后在worker的配置文件中加入如下配置项:

    plugin.path=/usr/local/share/kafka/plugins
    

           现在,当我们启动worker时,Kafka Connect可以发现这些插件中定义的所有connector、transform以及converter。Kafka Connect显式地避免了其他插件中的库,并防止了冲突。

    运行Workers

    standalone模式

           如果要在同一个机器上运行多个standalone实例,有一些参数需要是独一无二的:
    (1)offset.storage.file.filename:connector偏移量的存储。
    (2)rest.port:用于监听http请求的rest接口所占用的端口。

    Distributed模式

           connector和task的配置,offsets和状态会存储在Kafka的内部主题中,Kafka Connect会自动创建这些主题,且所有topic都使用了压缩清理策略。
           如果要手动创建这些topic,推荐使用如下命令:

      bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-status --replication-factor 3 --partitions 10 --config cleanup.policy=compact
    

    Worker配置

    这里只列出一些有疑问的。

    • key.converter(重要)
      用于key的converter类。它控制source connector写入Kafka的数据格式或sink connector从Kafka读取的数据格式。常用的有Avro和Json

    • value.converter(同上)

    • internal.key.converter(不重要)
      用于偏移量和配置等数据的转换。

    distributed Worker模式

           配置了group.id的worker会自动发现彼此并形成集群。一个集群中的所有worker必须使用相同的三个Kafka topic来共享配置、偏移量以及状态,所有worker必须配置相同的config.storage.topic、offset.storage.topic以及status.storage.topic。

    converter配置

           每个converter实现类都有自己的相关配置需求。下面的例子展示了一个worker属性文件,其中使用的AvroConverter需要将Schema Registry的url作为属性进行传递。

    key.converter=io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url=http://localhost:8081
    

    注意:除了其配置覆盖这些配置的connector,worker上运行的所有connector都使用这些converter。

    相关文章

      网友评论

        本文标题:Kafka Connect的安装和配置

        本文链接:https://www.haomeiwen.com/subject/dtqydqtx.html