sqoop增量数据迁移

作者: MichaelFly | 来源:发表于2016-12-19 10:35 被阅读615次

    背景

    业务系统库数据包含了大量历史数据,核心的表超过千万级甚至亿级后,传统在业务库上做数据分析已不合时宜,需要迁移至大数据平台(hive/spark sql/impala)做数据分析,如果按天全量导入至平台不仅消耗大量服务器资源并且全量读取业务库全表速度也会超慢,这时需要增量导入的功能,因为业务系统的表会用自增ID的标志,可以按天截取新增数据导入平台。

    sqoop增量迁移数据方式对比

    一种是 append,即通过指定一个递增的列,比如:
    --incremental append --check-column num_iid --last-value 0
    另种是可以根据时间戳,比如:
    --incremental lastmodified --check-column created --last-value '2012-02-01 11:0:00'
    就是只导入created 比'2012-02-01 11:0:00'更大的数据。

    第一种适合业务系统库,一般业务系统表会通过自增ID作为主键标识唯一性。
    第二种适合ETL的数据

    sqoop append模式使用

    1.使用 sqoop create-hive-table 生成 hive表结构
    2.定义 sqoop job,实际上是一个通道,通道的始发站为mysql对应的表,终点站为hive对应的表
    3.使用 sqoop job执行增量导入

    注:自己写个shell定时跑批或者放到调度系统定时执行

    下面为整个迁移的脚本示例:

    #!/bin/bash
    ##############################################
    ##  $1:日期   $2:表名
    ##  第一个参数为日期,第二个参数为mysql表名
    ##############################################
    
    #配置所在数据库地址
    conf_dbhost=xxx
    #配置所在数据库用户名
    conf_username=xxx
    #配置所在数据库密码
    conf_password=xxx
    #配置所在数据库名
    conf_dbname=etl
    var_etl_date=`mysql -h $conf_dbhost  -u$conf_username -p$conf_password -D $conf_dbname -e "SELECT var_value FROM para_etl_var WHERE var_name='{ETL_DATE}';"`
    echo $var_etl_date
    sys_date=`date -d'-1 day' +%Y-%m-%d`
    if [ ${1} == "-" ]
    then
    #    cur_date='2016-09-23'
        cur_date=${var_etl_date:10:10}
        echo $cur_date
    else
        #echo "$1"
        cur_date=`date --date="${1}" +%Y-%m-%d`
        echo $cur_date
    fi
    echo "$cur_date"
    #exit
    year=`date --date=$cur_date +%Y`
    month=`date --date=$cur_date +%m`
    day=`date --date=$cur_date +%d`
    echo "cur_date:"${cur_date}
    #hive库名
    hdb=rmdb
    #hive表名
    hive_table=crm_intopieces_dk
    #mysql表名
    mysql_table=crm_intopieces_dk
    
    #数据仓库基础路径
    basedir=/rmdb
    #mysql服务器地址
    server=xxx
    
    #mysql端口号
    port=3306
    #mysql数据库名
    mysql_database=test
    #用户名
    username=xxx
    #密码
    password=xxx
    
    #判断Hive是否存在,不存在执行下面创建语句,否则跳过
    #hive -e "use $hdb;select * from $hive_table limit 1;"
    if [ $? -ne 0 ]
    then
        echo "表不存在,执行创建表结构"
        sqoop create-hive-table 
        --connect jdbc:mysql://$server:$port/$mysql_database?tinyInt1isBit=false 
        --username $username 
        --password $password 
        --table $mysql_table
    else
        echo "表已存在,执行增量导入。。。"
    fi
    #exit
    #
    #一种是 append,即通过指定一个递增的列,比如:
    #--incremental append  --check-column num_iid --last-value 0 
    #另种是可以根据时间戳,比如:
    #--incremental lastmodified --check-column created --last-value '2012-02-01 11:0:00' 
    #就是只导入created 比'2012-02-01 11:0:00'更大的数据。 
    
    echo "创建job"
    #append
        sqoop job 
        --create crm_intopieces_dk 
        -- import --connect jdbc:mysql://$server:$port/$mysql_database?tinyInt1isBit=false 
        --username $username 
        --password $password  
        --table $mysql_table 
        --hive-import --hive-table $hive_table 
        --incremental append 
        --check-column id 
        --last-value 0
    
    echo "append增量导入模式启动。。。"
    
        sqoop job --exec crm_intopieces_dk
    exit
    

    相关文章

      网友评论

        本文标题:sqoop增量数据迁移

        本文链接:https://www.haomeiwen.com/subject/cjzzmttx.html