前言
本文将使用Kafka Connect 实现MySQL增量同步,设计三种模式,分别为incrementing
timestamp
timestamp+incrementing
理论续自上文
当然你也可以使用除了MySQL其他DB,参考官网放置对应的驱动文件即可。
以下实验请在能正常Kafka生产消费的基础之上进行。
1、incrementing 自增模式
准备工作
创建 A数据库源表person
CREATE TABLE `person` (
`pid` int(11) NOT NULL AUTO_INCREMENT,
`firstname` varchar(255) CHARACTER SET utf8 DEFAULT NULL,
`age` int(11) DEFAULT NULL,
PRIMARY KEY (`pid`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
创建 B数据库目标表kafkaperson
CREATE TABLE `kafkaperson` (
`pid` int(11) NOT NULL AUTO_INCREMENT,
`firstname` varchar(255) CHARACTER SET utf8 DEFAULT NULL,
`age` int(11) DEFAULT NULL,
PRIMARY KEY (`pid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
切换目录 D:\com\kafka_2.11-2.0.1\config
quickstart-mysql.properties(source)
name=mysql-a-source-person
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://localhost:3306/A?user=***&password=***
# incrementing 自增
mode=incrementing
# 自增字段 pid
incrementing.column.name=pid
# 白名单表 person
table.whitelist=person
# topic前缀 mysql-kafka-
topic.prefix=mysql-kafka-
quickstart-mysql-sink.properties(sink)
name=mysql-a-sink-person
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
#kafka的topic名称
topics=mysql-kafka-person
# 配置JDBC链接
connection.url=jdbc:mysql://localhost:3306/B?user=***&password=***
# 不自动创建表,如果为true,会自动创建表,表名为topic名称
auto.create=false
# upsert model更新和插入
insert.mode=upsert
# 下面两个参数配置了以pid为主键更新
pk.mode = record_value
pk.fields = pid
#表名为kafkatable
table.name.format=kafkaperson
实验一
创建 Kafka Topic: mysql-kafka-person
D:\com\kafka_2.11-2.0.1\bin\windows>kafka-run-class.bat kafka.admin.TopicCommand --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mysql-kafka-person
启动 Kafka Connect
D:\com\kafka_2.11-2.0.1\bin\windows>connect-standalone.bat D:/com/kafka_2.11-2.0.1/config/connect-standalone.properties D:/com/kafka_2.11-2.0.1/config/quickstart-mysql.properties D:/com/kafka_2.11-2.0.1/config/quickstart-mysql-sink.properties
A库person表插入三条数据
创建测试数据
稍后发现B库kafkaperson表中进入了这三条数据
image.png
consumer控制台 观察到三条数据
Topic console
此时插入1条新数据
INSERT INTO person(pid, firstname, age) VALUES(4, 'zl', 20);
发现B库表中也多了一条pid为4的数据
添加新的数据
实验一结论
在 JDBC Sink Connector 官网中指出insert.mode
有且仅有两个值
insert.mode=insert
只接收标准的INSERT SQL新增语句
insert.mode=upsert
接收新增和更新,当对主键修改时也可以洞察并且输出。而insert
是无法满足此要求的,因此根据实际业务使用的场景选择insert.mode
。
INSERT INTO person (pid, firstname, age) VALUES (2, 'ls', 15) ON DUPLICATE KEY UPDATE firstname="world";
然而我在实验过程中并没有成功,因此没看出来insert和upsert的区别,希望成功的人可以在留言中指正下!
2、timestamp 时间戳模式
准备工作
创建 A数据库源表comments
CREATE TABLE `comments` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`content` varchar(255) DEFAULT NULL,
`commenttime` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=latin1;
注意timestamp必须指定not null,否则会报错!(无法使用时间戳列进行增量查询因为时间戳字段是可为空的...)
创建 B数据库源表kafkacomments
CREATE TABLE `kafkacomments` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`content` varchar(255) DEFAULT NULL,
`commenttime` timestamp NULL DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=latin1;
切换目录 D:\com\kafka_2.11-2.0.1\config
timestamp-mysql-source.properties(source)
name=mysql-b-source-comments
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://localhost:3306/A?user=***&password=***
table.whitelist=comments
mode=timestamp
timestamp.column.name=commenttime
topic.prefix=mysql-kafka-
timestamp-mysql-sink.properties(sink)
name=mysql-b-sink-comments
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
#kafka的topic名称
topics=mysql-kafka-comments
# 配置JDBC链接
connection.url=jdbc:mysql://localhost:3306/B?user=***&password=***
# 不自动创建表,如果为true,会自动创建表,表名为topic名称
auto.create=false
# upsert model更新和插入
insert.mode=upsert
# 下面两个参数配置了以id为主键更新
pk.mode = record_value
pk.fields = id
#表名为kafkatable
table.name.format=kafkacomments
实验二
创建 Kafka Topic: mysql-kafka-comments
D:\com\kafka_2.11-2.0.1\bin\windows>kafka-run-class.bat kafka.admin.TopicCommand --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mysql-kafka-comments
启动 Kafka Connect
D:\com\kafka_2.11-2.0.1\bin\windows>connect-standalone.bat D:/com/kafka_2.11-2.0.1/config/connect-standalone.properties D:/com/kafka_2.11-2.0.1/config/timestamp-mysql-source.properties D:/com/kafka_2.11-2.0.1/config/timestamp-mysql-sink.properties
A库comments表插入四条数据
A.comments表
此时发现控制台和目标表中有了四条数据
image.png
B.kafkacomments表
此时修改id为2和4的内容content
,并修改评论时间commenttime
update comments set content = "show test data" ,commenttime="2018-12-20 15:55:10" where id in(2,4)
发现源表和目标表中的内容都发生了变化!
image.png
image.png
image.png
注意:
1、如果修改的时间戳早于latest time
,则不会洞察到更新。例如MySQL中的now()
获取当前时间就是很好的能被获取到的例子。
2、源表向目标表传输数据,假设有两条(或以上)的数据行拥有同样的时间戳,如果在传输第二条的过程中崩溃,恢复过后第二条将会被丢失,因为latest time
已经被记录过了,他只会去找更新的下一次时间。这种方式虽然能获取到update
更新,但是不够稳健。而如果使用自增字段加时间戳字段混合的方式,即使崩溃也能记录到更新的最大ID
,恢复之后可以被找到不会丢失。因此我们更推荐第三种方式!timestamp+incrementing
3、timestamp+incrementing 时间戳自增混合模式
实验过程同方法2不做赘述,唯一变动的是source的config文件
name=mysql-b-source-comments
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://localhost:3306/A?user=***&password=***
table.whitelist=comments
mode=timestamp+incrementing
timestamp.column.name=commenttime
incrementing.column.name=id
topic.prefix=mysql-kafka-
网友评论