美文网首页大数据学习
Sqoop:Hive / Impala导出数据到MySQL Sh

Sqoop:Hive / Impala导出数据到MySQL Sh

作者: xiaogp | 来源:发表于2021-06-11 15:54 被阅读0次

    摘要:SqoopMySQLHiveImpala

    在Spark跑批到Hive的任务后面加入Sqoop任务,将数据从Hive导入MySQL提供在线查询服务,记录一下Shell脚本,主要是Shell常用语法,Impala命令参数,Sqoop命令参数


    导数需求

    有一张Parquet格式的Hive分区表test.sqoop_test,用Impala查看表结构,其中dt是分区字段

    [cloudera01:21000] > desc sqoop_test;
    Query: describe sqoop_test
    +---------------+--------+---------+
    | name          | type   | comment |
    +---------------+--------+---------+
    | industry_code | string |         |
    | rank          | string |         |
    | inc           | string |         |
    | dt            | string |         |
    +---------------+--------+---------+
    

    需要导入MySQL库中,每天导入HIve表中最新dt分区的数据,根据Industry_code覆盖更新导入,其他两个字段是MySQL JSON类型,MySQL使用的是8.0.25版本,默认区分大小写,所以字段大小写要一致

    mysql> desc sqoop_test;
    +---------------+-------------+------+-----+---------+-------+
    | Field         | Type        | Null | Key | Default | Extra |
    +---------------+-------------+------+-----+---------+-------+
    | industry_code | varchar(20) | NO   | PRI | NULL    |       |
    | rank          | json        | YES  |     | NULL    |       |
    | inc           | json        | YES  |     | NULL    |       |
    +---------------+-------------+------+-----+---------+-------+
    3 rows in set (0.64 sec)
    

    导出方式

    先将数据从Hive表导出到HDFS,或者导入到一张新的Hive表,再从HDFS将数据导入到MySQL。虽然比直接导出多了一步操作,但是可以实现对数据的更精准的操作,主要体现在

    • 在从Hive表导出到HDFS时,可以进一步对数据进行字段筛选字段加工数据过滤操作,从而使得HDFS上的数据更“接近”或等于将来实际要导入MySQL表的数据
    • 在从HDFS导入MySQL时,也是将一个“小数据集”与目标表中的数据做对比,会提高导出速度

    导数流程如下



    导出到Hive中间表的方式

    将原始Hive表sqoop_test进列筛选,行和分区过滤之后导入中间表sqoop_test_push,再导入MySQL,先建立一张Hive中间表,作为存储一个指定dt版本的中间数据

    hive> create table sqoop_test_push (
        > `industry_code` string,
        > `rank` string,
        > `inc` string)
        > stored as PARQUET;
    

    下一步使用Impala同步元数据,并且使用Impala shell传入本地脚本文件获得Hive表的最大分区dt,shell脚本如下

    sql="invalidate metadata test.sqoop_test; select max(dt) as cnt from test.sqoop_test"
    max_partition=$(impala-shell -l --auth_creds_ok_in_clear -u cdh_dev --ldap_password_cmd="sh /home/etl/impala.sh" -i cloudera03 -d test -B -q "$sql")
    

    max_partition变量被impala-shell执行的返回值赋值,impala-shell的参数包括

    • -l:使用LDAP向Impala进行身份验证。必须将Impala配置为允许LDAP身份验证
    • auth_creds_ok_in_clear
    • --ldap_password_cmd:启动命令带有检索到的密码,传入一个获得密码的shell命令,比如--ldap_password_cmd="echo -n 123456"
    • -i:要连接的impala host和port,默认port是21000
    • -d:设置数据库
    • -B:去除格式化,查询大数据量时可以提高性能
    • -q:执行一个query语句

    执行完毕得到最大分区

    [root@ubuntu ~]# echo $max_partition
    20210317
    

    下一步操作impala-shell将Hive表过滤分区和字段,写入中间表,先指定一个INSERT OVERWRITE导入Hive表SQL语句文件,位置在/home/push_test.sql

    use ${var:impala_database};
    
    invalidate metadata ${var:impala_table_push};
    insert overwrite table ${var:impala_table_push}
    select industry_code,
    rank,
    inc
    from ${var:impala_table} where dt='${var:max_partition}';
    

    以上SQL语句先对创建的中间表做impala元数据同步,然后使用insert overwrite以过滤后的原表数据直接覆盖中间表,接下来使用impala-shell执行以上SQL语句文件

    impala-shell -l --auth_creds_ok_in_clear \
    -u cdh_dev \
    --ldap_password_cmd="sh /home/etl/impala.sh" \
    -i cloudera03 \
    -d test \
    -f "/home/push_test.sql" \
    --var=max_partition="$max_partition" \
    --var=impala_table="sqoop_test" \
    --var=impala_table_push="sqoop_test_push" \
    --var=impala_database="test"
    

    其中

    • -f:表示执行一个query文件,其中以分号;作为语句分隔条件
    • --var:自定义impala上下文变量,可以多次使用,必须指定key=value的格式,在定义的时候${var:KEY},赋值的时候--var=KEY=VALUE

    执行成功后中间表sqoop_test_push就有了需要导入MySQL的数据,并且和要求的最终数据是一致的
    下一步使用sqoop进行导数,由于目标MySQL版本是8.0.025的,sqoop需要高版本的mysql-connect驱动,否则报错无法建立链接,先在maven仓库上下载高版本的驱动mysql-connector-java-8.0.25.jar,放在sqoop的lib目录下,然后启动sqoop开始导数
    导入方式为allowinsert覆盖插入,即无则插入,有则根据主键更新

    sudo -u hdfs sqoop export \
    --connect "jdbc:mysql://192.168.67.72:3306/test" \
    --username "root" \
    --password "123456" \
    --table "sqoop_test" \
    --update-mode allowinsert \
    --update-key "industry_code" \
    --hcatalog-database "test" \
    --hcatalog-table "sqoop_test_push" \
    --null-string '\\N'  \
    --null-non-string '\\N' \
    -m 1
    

    相关参数如下

    • export:从hdfs导出数据到关系型数据库
    • --connect:指定jdbc连接字符串
    • --username:数据库用户
    • --password:数据库密码
    • --table:导出的数据库表名称
    • --update-mode:指定更新策略,包括updateonlyallowinsert,updateonly是默认模式,仅仅更新已存在的数据记录,不会插入新纪录,allowinsert有则更新,无则插入
    • --update-key:更新参考的列名称,多个列用逗号,隔开
    • --hcatalog-database:hive数据库,parquet格式的hive表使用hcatalog
    • --hcatalog-table:hive数据库表名
    • --null-string:针对string类型的字段,当Value是NULL,替换成指定的字符
    • --null-non-string:针对非string类型的字段,当Value是NULL,替换成指定字符
    • -m:并行化,使用n个map任务并行导出

    再看MySQL已经成功导入20条数据

    mysql> select count(*) from sqoop_test;
    +----------+
    | count(*) |
    +----------+
    |       20 |
    +----------+
    1 row in set (0.19 sec)
    

    完整Shell脚本

    完成的shell脚本如下,挂在Spark入库Hive的作业后面执行

    [root@ubuntu ~]# vim push_rank.sh
    
    #!/bin/bash
    mysql_url="jdbc:mysql://192.168.67.72/test"
    mysql_username="root"
    mysql_password="123456"
    mysql_table="sqoop_test"
    impala_username="cdh_dev"
    impalad_host="cloudera03"
    impala_database='test'
    impala_table='sqoop_test'
    impala_table_push='sqoop_test_push'
    push_sql_path='/home/push_rank.sql'
    
    sql="invalidate metadata $impala_database.$impala_table; select max(dt) as cnt from $impala_database.$impala_table"
    max_partition=$(impala-shell -l --auth_creds_ok_in_clear -u $impala_username --ldap_password_cmd="sh /home/etl/impala.sh" -i $impalad_host -d test -B -q "$sql")
    
    if [ $? -eq 0 ]; then
      echo "Impala SQL执行成功!"
      echo "最大分区为${max_partition}"
    else
      echo "Impala SQL执行失败!"
      exit 1
    fi
    
    
    impala-shell -l --auth_creds_ok_in_clear -u $impala_username --ldap_password_cmd="sh /home/etl/impala.sh" -i $impalad_host -d $impala_database -f $push_sql_path --var=max_partition="$max_partition" --var=impala_table=$impala_table --var=impala_table_push=$impala_table_push --var=impala_database=$impala_database
    if [ $? -eq 0 ]; then
      echo "impala执行成功,开始向mysql推数"
      sudo -u hdfs sqoop export --connect ${mysql_url} --username ${mysql_username} --password ${mysql_password} --table ${mysql_table} --update-mode allowinsert --update-key "industry_code" --hcatalog-database ${impala_database} --hcatalog-table ${impala_table_push} --null-string '\\N'  --null-non-string '\\N' -m 1;
      if [ $? -eq 0 ]
       then
        echo "sqoop export success !"
      else
        echo "sqoop export failed !"
        exit 1
      fi
    else
      echo "impala执行失败"
      exit 1
    fi
    

    其中/home/push_rank.sql如下

    use ${var:impala_database};
    
    invalidate metadata ${var:impala_table_push};
    insert overwrite table ${var:impala_table_push}
    select industry_code,
    rank,
    inc
    from ${var:impala_table} where dt='${var:max_partition}';
    

    相关文章

      网友评论

        本文标题:Sqoop:Hive / Impala导出数据到MySQL Sh

        本文链接:https://www.haomeiwen.com/subject/lgtteltx.html