-
环境准备
-
开启MariaDB的Binlog日志
修改/etc/my.conf文件,在配置文件[mysqld]下增加如下配置
server-id=999 log-bin=mysql-bin binlog_format=ROW
注意:
MySQL Binlog支持多种数据更新格式包括Row、Statement和mix(Row和Statement的混合),这里建议使用Row模式的Binlog格式,可以更加方便实时的反应行级别的数据变化。binlog 配置参考:
https://dev.mysql.com/doc/refman/5.7/en/binary-log-setting.html[root@node01 mariadb]# systemctl restart mysqld [root@node01 mariadb]# systemctl status mysqld ● mysqld.service - LSB: start and stop MariaDB Loaded: loaded (/etc/rc.d/init.d/mysqld; bad; vendor preset: disabled) Active: active (running) since Tue 2020-04-28 09:59:03 CST; 1min 11s ago Docs: man:systemd-sysv-generator(8) Process: 12771 ExecStop=/etc/rc.d/init.d/mysqld stop (code=exited, status=0/SUCCESS) Process: 12925 ExecStart=/etc/rc.d/init.d/mysqld start (code=exited, status=0/SUCCESS) CGroup: /system.slice/mysqld.service ├─12970 /bin/sh /usr/local/mariadb/bin/mysqld_safe --datadir=/usr/local/mariadb/data --pid-file=/usr/local/mariadb/data/node01.pid └─13079 /usr/local/mariadb/bin/mysqld --basedir=/usr/local/mariadb --datadir=/usr/local/mariadb/data --plugin-dir=/usr/local/mariadb/lib/plugin --user=mysql --log-error=/usr/l... Apr 28 09:59:02 node01 systemd[1]: Starting LSB: start and stop MariaDB... Apr 28 09:59:02 node01 mysqld[12925]: Starting MariaDB.200428 09:59:02 mysqld_safe Logging to '/usr/local/mariadb/data/node01.err'. Apr 28 09:59:02 node01 mysqld[12925]: 200428 09:59:02 mysqld_safe Starting mysqld daemon with databases from /usr/local/mariadb/data Apr 28 09:59:03 node01 mysqld[12925]: [ OK ] Apr 28 09:59:03 node01 systemd[1]: Started LSB: start and stop MariaDB.
-
创建 MariaDB 同步账号
GRANT ALL on maxwell.* to 'maxwell'@'%' identified by '123456'; GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%'; FLUSH PRIVILEGES;
-
StreamSets 安装 MySQL 驱动
参考:
https://blog.csdn.net/weixin_43215250/article/details/87981707 -
创建测试表
- 在MariaDB数据库中创建测试表
CREATE DATABASE IF NOT EXISTS test; CREATE TABLE test.`binlog_test` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) NOT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
- 在 HUE 上创建 KUDU 表
CREATE DATABASE IF NOT EXISTS test; CREATE TABLE IF NOT EXISTS test.binlog_test ( id int, name String, PRIMARY key(id) ) PARTITION BY HASH PARTITIONS 16 STORED AS KUDU;
-
-
创建 StreamSets 的 Pipline
-
创建一个新的 Pipline
image -
选择 Origins 类别,搜索 MySQL Binary Log,并拖动到画布
image
配置 MySQL Binary Log 基本信息
image
**配置 MySQL 连接信息**
**注意:** 此处配置的 Server ID 应与 MySQL 的 my.cnf 文件中的 server-id 保持一致。
![image](https://img.haomeiwen.com/i18545623/f43db30aa5089c27?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
**配置 MySQL 账号信息**
![image](https://img.haomeiwen.com/i18545623/33d3abeb44e632e3?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
**高级配置,根据自己的需要进行配置,这里采用默认**
![image](https://img.haomeiwen.com/i18545623/59232a75d0526241?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
**参考:**
https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/MySQLBinaryLog.html?hl=mysql%2Cbinary%2Clog
-
添加表过滤的Stream Selector 1
image
配置 Stream Selector 基本信息
image
配置分流条件
${record:value("/Table") == "binlog_test"}
image -
添加表过滤的 Stream Selector 1
配置 Stream Selector 基本信息
image
配置分流条件
image
${record:value("/Table") == "DELETE"}
-
添加处理日志 JavaScript Evaluator
添加解析 DELETE 类型的Binary Log 日志的 JavaScript Evaluator
image
配置JavaScript脚本
imagefor(var i = 0; i < records.length; i++) { try { var newRecord = sdcFunctions.createRecord(true); newRecord.value = records[i].value['Data']; newRecord.value.Type = records[i].value['Type']; newRecord.value.Database = records[i].value['Database']; newRecord.value.Table = records[i].value['Table']; log.info(records[i].value['Type']) output.write(newRecord); } catch (e) { // Send record to error error.write(records[i], e); } }
**添加解析 INSRET 和 UPDATE 类型日志的 JavaScript Evaluator**
![image](https://img.haomeiwen.com/i18545623/30378a88b8f7f613?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
**配置JavaScript脚本**
```
for(var i = 0; i < records.length; i++) {
try {
var newRecord = sdcFunctions.createRecord(true);
newRecord.value = records[i].value['OldData'];
newRecord.value.Type = records[i].value['Type'];
newRecord.value.Database = records[i].value['Database'];
newRecord.value.Table = records[i].value['Table'];
log.info(records[i].value['Type'])
output.write(newRecord);
} catch (e) {
// Send record to error
error.write(records[i], e);
}
}
```
![image](https://img.haomeiwen.com/i18545623/8492b6202b22945e?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
**参考:**
https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Processors/JavaScript.html?hl=javascript%2Cevaluator
-
添加 KUDU
配置 Kudu Delete
image
配置Kudu基本属性
配置Kudu环境信息
image
Kudu的高级配置,这里使用默认配置
image配置 Kudu Upsert
image
配置Kudu基本属性
配置Kudu环境信息
image
Kudu的高级配置,这里使用默认配置
image -
校验 Pipelines 配置
image -
启动 Pipelines
image -
Pipeline 流程测试
-
新增数据
向 MariaDB 中的 binlog_test 表中插入数据
insert into test.binlog_test values(1, '张三');
在 StreamSets 中查看的 Pipeline 状态
image
image使用Hue查看 Kudu 表数据,验证数据是否成功插入
image
-
更新数据
更新 MariaDB 中的 binlog_test 表中数据
update test.binlog_test set name='李四' where id = 1;
在 StreamSets 中查看的 Pipeline 状态
image
image
使用Hue查看 Kudu 表数据,验证数据是否成功更新
image -
删除数据
删除 MariaDB 中的 binlog_test 表中数据
delete from test.binlog_test where id = 1;
在 StreamSets 中查看的 Pipeline 状态
image
image
使用Hue查看 Kudu 表数据,验证数据是否成功删除
image
-
网友评论