美文网首页
Mysql实时同步Impala

Mysql实时同步Impala

作者: Annigo | 来源:发表于2020-12-01 15:02 被阅读0次

    本文将使用StreamSets实时同步Mysql数据到Impala

    1.创建Mysql数据表

    CREATE TABLE IF NOT EXISTS `test.mysqltest`(
       `id` INT UNSIGNED AUTO_INCREMENT,
       `name` VARCHAR(100) NOT NULL,
       `age` INT NOT NULL,
       PRIMARY KEY ( `id` )
    )ENGINE=InnoDB DEFAULT CHARSET=utf8;
    

    2.1 开启Mysql行级日志.并创建maxwell账户(略过)
    2.2 Impala创建对应数据表

    CREATE TABLE IF NOT EXISTS `mysqltest`(
       `id` INT ,
       `name` string ,
       `age` INT ,
       PRIMARY KEY ( `id` )
    )
    PARTITION BY HASH (`id`) 
    PARTITIONS 16 STORED AS KUDU 
    TBLPROPERTIES ('kudu.table_name'='test.mysqltest')
    

    3.配置StreamSets流.
    3.1 Mysql数据源配置


    image.png
    image.png

    3.2 过滤数据库


    image.png

    3.3 格式化数据


    image.png

    代码如下:

    for record in records:
      newRecord = sdcFunctions.createRecord(record.sourceId + ':newRecordId')
      try:
        if record.value['Type'] == 'DELETE':
          newRecord.attributes['sdc.operation.type']='2'
          newRecord.value = record.value['OldData']
        else:
          newRecord.attributes['sdc.operation.type']='4';
          newRecord.value = record.value['Data'];        
        # Write record to processor output
        #newRecord.value['Type'] = record.value['Type']
        newRecord.value['Table'] = record.value['Table']
        
        output.write(newRecord)
      except Exception as e:
        # Send record to error
        error.write(newRecord, str(e))
    

    3.4 Mysql字段转小写


    image.png

    代码如下

    for record in records:
      try:
        for key in record.value.keys():
            record.value[key.lower()] = record.value[key]
            #del record.value[key]
        output.write(record)
      except Exception as e:
        # Send record to error
        error.write(record, str(e))
    

    3.5 时间格式转换


    image.png

    3.6 Kudu写入配置


    image.png

    4.最终效果图(该效果图为生产环境效果):


    image.png

    相关文章

      网友评论

          本文标题:Mysql实时同步Impala

          本文链接:https://www.haomeiwen.com/subject/bdgzoctx.html