Kafka Connect JDBC

作者: 阿猫阿狗Hakuna | 来源:发表于2019-01-17 10:26 被阅读1次

           数据通过定期执行SQL查询并为结果集中的每行创建输出记录来进行加载。默认情况下,数据库中的所有表都被复制,每个表都有自己的输出topic。connector会对数据库进行监视,对新表和删除的表自动进行调整。

    特性

           connector可以获取到数据表中的新增数据和变化数据,通过以下的增量查询查询模式。

    增量查询模式

    • Incremental column
      跟踪一个自增的column,只能检测到新记录,已有数据的更新不能被检测到,所以只能用于不变数据。比如流式的事实表。
    • Timestamp column
      跟踪一个时间戳列。因为时间戳不是独一无二的,这种模式不能保证所有更新的数据都被交付。
    • Incremental and timestamp column
      最具鲁棒性,将递增列与时间戳列结合在一起,每个元组(id, 时间戳)唯一的标识行更新。即使更新在部分完成后失败,未处理的更新依然会在系统恢复时被正确的监测。
    • Custom query
      可以使用custom query而不是拷贝整张表。

    Schema进化

           当启用了Avro converter时,JDBC connector支持schema进化。如果数据库表的schema发生了变化,JDBC connector可以检测到这个变化,并创建一个新的Kafka Connect schema并尝试在Schema Registry中注册新的Avro schema。能否成功注册schema取决于Schema Registry的兼容性级别,默认情况下是backward。
           例如,如果从表中删除一列,更改是向后兼容的,相应的Avro模式可以在模式注册表中成功注册。如果您修改数据库表模式以更改列类型或添加列,那么当将Avro模式注册到模式注册中心时,它将被拒绝,因为更改不向后兼容。
           可以修改Schema Registry的兼容级别,有两种方法:
    (1)使用PUT /config/(string: subject)设置主题兼容性级别。主题包含topic-key和topic-value,topic由topic.prefix和表名组合而成。
    (2)可以在Schema Registry的avro.compatibility.level中设置兼容性级别。这是一个全局设定。
           然而,由于JDBC API的限制,一些兼容的schema更改可能被视为不兼容的更改。例如,添加具有默认值的列是向后兼容的更改。然而,JDBC API的限制使得将其映射到Kafka连接模式中正确类型的默认值非常困难,因此默认值目前被忽略了。其含义是,即使数据库表模式的某些更改是向后兼容的,在模式注册中心注册的模式也不是向后兼容的,因为它不包含默认值。
           如果JDBC连接器和HDFS连接器一起使用,那么模式兼容性也会受到一些限制。启用Hive集成时,模式兼容性要求backforward、forward和full,以确保Hive模式能够查询主题下的全部数据。由于某些兼容的模式更改将被视为不兼容的模式更改,因此这些更改将不起作用,因为生成的Hive模式将无法查询某个主题的整个数据。

    相关文章

      网友评论

        本文标题:Kafka Connect JDBC

        本文链接:https://www.haomeiwen.com/subject/tmdvdqtx.html