本文将使用StreamSets实时同步Mysql数据到Impala
1.创建Mysql数据表
CREATE TABLE IF NOT EXISTS `test.mysqltest`(
`id` INT UNSIGNED AUTO_INCREMENT,
`name` VARCHAR(100) NOT NULL,
`age` INT NOT NULL,
PRIMARY KEY ( `id` )
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
2.1 开启Mysql行级日志.并创建maxwell账户(略过)
2.2 Impala创建对应数据表
CREATE TABLE IF NOT EXISTS `mysqltest`(
`id` INT ,
`name` string ,
`age` INT ,
PRIMARY KEY ( `id` )
)
PARTITION BY HASH (`id`)
PARTITIONS 16 STORED AS KUDU
TBLPROPERTIES ('kudu.table_name'='test.mysqltest')
3.配置StreamSets流.
3.1 Mysql数据源配置
image.png
image.png
3.2 过滤数据库
image.png
3.3 格式化数据
image.png
代码如下:
for record in records:
newRecord = sdcFunctions.createRecord(record.sourceId + ':newRecordId')
try:
if record.value['Type'] == 'DELETE':
newRecord.attributes['sdc.operation.type']='2'
newRecord.value = record.value['OldData']
else:
newRecord.attributes['sdc.operation.type']='4';
newRecord.value = record.value['Data'];
# Write record to processor output
#newRecord.value['Type'] = record.value['Type']
newRecord.value['Table'] = record.value['Table']
output.write(newRecord)
except Exception as e:
# Send record to error
error.write(newRecord, str(e))
3.4 Mysql字段转小写
image.png
代码如下
for record in records:
try:
for key in record.value.keys():
record.value[key.lower()] = record.value[key]
#del record.value[key]
output.write(record)
except Exception as e:
# Send record to error
error.write(record, str(e))
3.5 时间格式转换
image.png
3.6 Kudu写入配置
image.png
4.最终效果图(该效果图为生产环境效果):
image.png
网友评论