美文网首页
Kafka Connect 实现MySQL增量同步

Kafka Connect 实现MySQL增量同步

作者: CNSTT | 来源:发表于2018-12-20 17:12 被阅读0次

    前言

    本文将使用Kafka Connect 实现MySQL增量同步,设计三种模式,分别为incrementing timestamp timestamp+incrementing
    理论续自上文
    当然你也可以使用除了MySQL其他DB,参考官网放置对应的驱动文件即可。

    以下实验请在能正常Kafka生产消费的基础之上进行。

    1、incrementing 自增模式

    准备工作

    创建 A数据库源表person

    CREATE TABLE `person` (
      `pid` int(11) NOT NULL AUTO_INCREMENT,
      `firstname` varchar(255) CHARACTER SET utf8 DEFAULT NULL,
      `age` int(11) DEFAULT NULL,
      PRIMARY KEY (`pid`)
    ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
    

    创建 B数据库目标表kafkaperson

    CREATE TABLE `kafkaperson` (
      `pid` int(11) NOT NULL AUTO_INCREMENT,
      `firstname` varchar(255) CHARACTER SET utf8 DEFAULT NULL,
      `age` int(11) DEFAULT NULL,
      PRIMARY KEY (`pid`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
    

    切换目录 D:\com\kafka_2.11-2.0.1\config
    quickstart-mysql.properties(source)

    name=mysql-a-source-person
    connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
    tasks.max=1
    connection.url=jdbc:mysql://localhost:3306/A?user=***&password=***
    # incrementing  自增
    mode=incrementing
    # 自增字段  pid
    incrementing.column.name=pid
    # 白名单表  person
    table.whitelist=person
    # topic前缀   mysql-kafka-
    topic.prefix=mysql-kafka-
    

    quickstart-mysql-sink.properties(sink)

    name=mysql-a-sink-person
    connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
    tasks.max=1
    #kafka的topic名称
    topics=mysql-kafka-person
    # 配置JDBC链接
    connection.url=jdbc:mysql://localhost:3306/B?user=***&password=***
    # 不自动创建表,如果为true,会自动创建表,表名为topic名称
    auto.create=false
    # upsert model更新和插入
    insert.mode=upsert
    # 下面两个参数配置了以pid为主键更新
    pk.mode = record_value
    pk.fields = pid
    #表名为kafkatable
    table.name.format=kafkaperson
    

    实验一

    创建 Kafka Topic: mysql-kafka-person

    D:\com\kafka_2.11-2.0.1\bin\windows>kafka-run-class.bat kafka.admin.TopicCommand --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mysql-kafka-person
    

    启动 Kafka Connect

    D:\com\kafka_2.11-2.0.1\bin\windows>connect-standalone.bat D:/com/kafka_2.11-2.0.1/config/connect-standalone.properties D:/com/kafka_2.11-2.0.1/config/quickstart-mysql.properties D:/com/kafka_2.11-2.0.1/config/quickstart-mysql-sink.properties
    

    A库person表插入三条数据


    创建测试数据

    稍后发现B库kafkaperson表中进入了这三条数据


    image.png
    consumer控制台 观察到三条数据
    Topic console

    此时插入1条新数据

    INSERT INTO person(pid, firstname, age) VALUES(4, 'zl', 20);
    

    发现B库表中也多了一条pid为4的数据


    添加新的数据

    实验一结论

    JDBC Sink Connector 官网中指出insert.mode有且仅有两个值
    insert.mode=insert只接收标准的INSERT SQL新增语句
    insert.mode=upsert接收新增和更新,当对主键修改时也可以洞察并且输出。而insert是无法满足此要求的,因此根据实际业务使用的场景选择insert.mode

    INSERT INTO person (pid, firstname, age) VALUES (2, 'ls', 15) ON DUPLICATE KEY UPDATE firstname="world"; 
    

    然而我在实验过程中并没有成功,因此没看出来insert和upsert的区别,希望成功的人可以在留言中指正下!

    2、timestamp 时间戳模式

    准备工作

    创建 A数据库源表comments

    CREATE TABLE `comments` (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `content` varchar(255) DEFAULT NULL,
      `commenttime` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' ON UPDATE CURRENT_TIMESTAMP,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=latin1;
    

    注意timestamp必须指定not null,否则会报错!(无法使用时间戳列进行增量查询因为时间戳字段是可为空的...)
    创建 B数据库源表kafkacomments

    CREATE TABLE `kafkacomments` (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `content` varchar(255) DEFAULT NULL,
      `commenttime` timestamp NULL DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=latin1;
    

    切换目录 D:\com\kafka_2.11-2.0.1\config
    timestamp-mysql-source.properties(source)

    name=mysql-b-source-comments
    connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
    tasks.max=1
    connection.url=jdbc:mysql://localhost:3306/A?user=***&password=***
    table.whitelist=comments
    mode=timestamp
    timestamp.column.name=commenttime
    topic.prefix=mysql-kafka-
    

    timestamp-mysql-sink.properties(sink)

    name=mysql-b-sink-comments
    connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
    tasks.max=1
    #kafka的topic名称
    topics=mysql-kafka-comments
    # 配置JDBC链接
    connection.url=jdbc:mysql://localhost:3306/B?user=***&password=***
    # 不自动创建表,如果为true,会自动创建表,表名为topic名称
    auto.create=false
    # upsert model更新和插入
    insert.mode=upsert
    # 下面两个参数配置了以id为主键更新
    pk.mode = record_value
    pk.fields = id
    #表名为kafkatable
    table.name.format=kafkacomments
    

    实验二

    创建 Kafka Topic: mysql-kafka-comments

    D:\com\kafka_2.11-2.0.1\bin\windows>kafka-run-class.bat kafka.admin.TopicCommand --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mysql-kafka-comments
    

    启动 Kafka Connect

    D:\com\kafka_2.11-2.0.1\bin\windows>connect-standalone.bat D:/com/kafka_2.11-2.0.1/config/connect-standalone.properties D:/com/kafka_2.11-2.0.1/config/timestamp-mysql-source.properties D:/com/kafka_2.11-2.0.1/config/timestamp-mysql-sink.properties
    

    A库comments表插入四条数据


    A.comments表

    此时发现控制台和目标表中有了四条数据


    image.png
    B.kafkacomments表

    此时修改id为2和4的内容content,并修改评论时间commenttime

    update comments set content = "show test data" ,commenttime="2018-12-20 15:55:10" where id in(2,4)
    

    发现源表和目标表中的内容都发生了变化!


    image.png
    image.png
    image.png

    注意:

    1、如果修改的时间戳早于latest time ,则不会洞察到更新。例如MySQL中的now()获取当前时间就是很好的能被获取到的例子。
    2、源表向目标表传输数据,假设有两条(或以上)的数据行拥有同样的时间戳,如果在传输第二条的过程中崩溃,恢复过后第二条将会被丢失,因为latest time已经被记录过了,他只会去找更新的下一次时间。这种方式虽然能获取到update更新,但是不够稳健。而如果使用自增字段加时间戳字段混合的方式,即使崩溃也能记录到更新的最大ID,恢复之后可以被找到不会丢失。因此我们更推荐第三种方式!timestamp+incrementing

    3、timestamp+incrementing 时间戳自增混合模式

    实验过程同方法2不做赘述,唯一变动的是source的config文件

    name=mysql-b-source-comments
    connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
    tasks.max=1
    connection.url=jdbc:mysql://localhost:3306/A?user=***&password=***
    table.whitelist=comments
    mode=timestamp+incrementing
    timestamp.column.name=commenttime
    incrementing.column.name=id
    topic.prefix=mysql-kafka-
    

    至此完成使用Kafka Connect 三种模式实现MySQL增量同步!

    欢迎关注我的个人小程序

    996ICU小程序.png

    谢谢阅读,有帮助的点个❤!

    相关文章

      网友评论

          本文标题:Kafka Connect 实现MySQL增量同步

          本文链接:https://www.haomeiwen.com/subject/tiaekqtx.html