美文网首页
Mysql实时同步Impala

Mysql实时同步Impala

作者: Annigo | 来源:发表于2020-12-01 15:02 被阅读0次

本文将使用StreamSets实时同步Mysql数据到Impala

1.创建Mysql数据表

CREATE TABLE IF NOT EXISTS `test.mysqltest`(
   `id` INT UNSIGNED AUTO_INCREMENT,
   `name` VARCHAR(100) NOT NULL,
   `age` INT NOT NULL,
   PRIMARY KEY ( `id` )
)ENGINE=InnoDB DEFAULT CHARSET=utf8;

2.1 开启Mysql行级日志.并创建maxwell账户(略过)
2.2 Impala创建对应数据表

CREATE TABLE IF NOT EXISTS `mysqltest`(
   `id` INT ,
   `name` string ,
   `age` INT ,
   PRIMARY KEY ( `id` )
)
PARTITION BY HASH (`id`) 
PARTITIONS 16 STORED AS KUDU 
TBLPROPERTIES ('kudu.table_name'='test.mysqltest')

3.配置StreamSets流.
3.1 Mysql数据源配置


image.png
image.png

3.2 过滤数据库


image.png

3.3 格式化数据


image.png

代码如下:

for record in records:
  newRecord = sdcFunctions.createRecord(record.sourceId + ':newRecordId')
  try:
    if record.value['Type'] == 'DELETE':
      newRecord.attributes['sdc.operation.type']='2'
      newRecord.value = record.value['OldData']
    else:
      newRecord.attributes['sdc.operation.type']='4';
      newRecord.value = record.value['Data'];        
    # Write record to processor output
    #newRecord.value['Type'] = record.value['Type']
    newRecord.value['Table'] = record.value['Table']
    
    output.write(newRecord)
  except Exception as e:
    # Send record to error
    error.write(newRecord, str(e))

3.4 Mysql字段转小写


image.png

代码如下

for record in records:
  try:
    for key in record.value.keys():
        record.value[key.lower()] = record.value[key]
        #del record.value[key]
    output.write(record)
  except Exception as e:
    # Send record to error
    error.write(record, str(e))

3.5 时间格式转换


image.png

3.6 Kudu写入配置


image.png

4.最终效果图(该效果图为生产环境效果):


image.png

相关文章

网友评论

      本文标题:Mysql实时同步Impala

      本文链接:https://www.haomeiwen.com/subject/bdgzoctx.html