Sqoop最佳实践

作者: data之道 | 来源:发表于2018-02-26 12:51 被阅读396次

    一、什么是Sqoop

    Sqoop是一个在结构化数据和Hadoop之间进行批量数据迁移的工具,结构化数据可以是Mysql、Oracle等RDBMS。Sqoop底层用MapReduce程序实现抽取、转换、加载,MapReduce天生的特性保证了并行化和高容错率,而且相比Kettle等传统ETL工具,任务跑在Hadoop集群上,减少了ETL服务器资源的使用情况。在特定场景下,抽取过程会有很大的性能提升。

        如果要用Sqoop,必须正确安装并配置Hadoop,因依赖于本地的hadoop环境启动MR程序;mysql、oracle等数据库的JDBC驱动也要放到Sqoop的lib目录下。本文针对的是Sqoop1,不涉及到Sqoop2,两者有大区别,感兴趣的读者可以看下官网说明。

    二、import

        import是数据从RDBMS导入到Hadoop的工具

        2.1、split

        Sqoop并行化是启多个map task实现的,-m(或--num-mappers)参数指定map task数,默认是四个。并行度不是设置的越大越好,map task的启动和销毁都会消耗资源,而且过多的数据库连接对数据库本身也会造成压力。在并行操作里,首先要解决输入数据是以什么方式负债均衡到多个map的,即怎么保证每个map处理的数据量大致相同且数据不重复。--split-by指定了split column,在执行并行操作时(多个map task),Sqoop需要知道以什么列split数据,其思想是:

        1、先查出split column的最小值和最大值

        2、然后根据map task数对(max-min)之间的数据进行均匀的范围切分

    例如id作为split column,其最小值是0、最大值1000,如果设置4个map数,每个map task执行的查询语句类似于:SELECT * FROM sometable WHERE id >= lo AND id < hi,每个task里(lo,hi)的值分别是 (0, 250), (250, 500), (500, 750), and (750, 1001)。

        Sqoop不能在多列字段上进行拆分,如果没有索引或者有组合键,必须显示设置splitting column;默认的主键作为split column,如果表里没有主键或者没有指定--split-by,就要设置num-mappers 1或者--autoreset-to-one-mapper,这样就只会启动一个task。

        从上面的分析过程可以看到Sqoop以理想化方式根据split column将数据切分成多个范围,如果split键的值不是均匀分布,每个任务分配的数据量可能相差很大、导致数据倾斜。

        2.2、参数

    --driver:指定JDBC驱动,默认Mysql

    --table:指定查询的表

    --columns:指定从源数据库导入的列。当没有设置--table参数,就默认查询表中所有字段,实现方式是在数据库执行一个查询语句,就可得到每个字段及其对应的类型.

    --where:查询条件.如果设置了table参数,就以table、columns、where三个参数拼接成的SQL查询数据

    --query:自定义查询SQL,语句要有$CONDITIONS关键字,作用是动态替换,当获取默认boundary query时,$CONDITIONS会替换成(1=1);获取查询的数据列和其对应的字段类型时$CONDITIONS会替换成(1=0)。table和query不能同时设置

    --boundary-query:指定split的sql,如果没有设置,且有--table参数,生成的split sql是根据table、where条件拼出来的。

    如果设置了--query参数,split sql是基于query sql的子查询:

     需要特别注意的是,有的数据库对子查询没有进行优化(如Mysql),查询性能会很低,这就要自定义boundary-query,提高查询效率。

        2.3、HDFS

    数据直接导入到HDFS,按行读取并写入到HDFS文件,源表里的每一行数据在HDFS里作为单独记录,记录可以是文本格式(每行一个记录)或Avro、SequenceFile二进制格式。导入过程可以并行,因此可能生成多文件。

    --append:生成的文件追加到目标目录里

    --delete-target-dir:如果目标目录已经存在,会先把目录删掉,类似overwrite

      执行上面的命令后,可以看到详细的日志:输入数据是怎么split的、mapreduce执行进度、mapreduce的URL等

        2.4、Hive

    --hive-import参数指定数据导入到hive表:

    --target-dir:需要指定该参数,数据首先写入到该目录,过程和直接导入HDFS是一样

    --hive-drop-import-delims:删除string字段内的特殊字符,如果Hive使用这些字符作为分隔符,hive的字段会解析错误、出现错位的情况。它的内部是用正则表达式替换的方式把\n, \r, \01替换成""

    --null-string/--null-non-string:指定空字段的值。Sqoop默认空数据存的是“NULL"字符串,但hive把空解析成\N,因此当文件存储的空是默认的"NULL"字符串,hive就不能正常读取文件中的空值了

    数据import到hive表的过程:完成源数据写入到hdfs后,就执行LOAD DATA INPATH命令把target-dir里的文件LOAD到hive表:

        2.5、Hbase

      --hbase-table指定数据直接导入到Hbase表而不是HDFS,对于每个输入行都会转换成HBase的put操作,每行的key取自输入的列,值转换成string、UTF-8格式存储在单元格里;所有的列都在同一列簇下,不能指定多个个列簇。

    然后通过Hbase shell查看表数据量、数据,

    参数详细说明:

    --hbase-create-table:当HTable不存在或列簇不存在,Sqoop根据HBase的默认配置自动新建表;如果没有指定该参数,就会报异常

    --hbase-row-key:指定row key使用的列。默认是split-by作为row key,如果没有指定,会把源表的主键作为row key;如果row key是多个列组成的,多个列必须用逗号隔开,生成的row key是由用下划线隔开(`ID`_`RUN_ID`)的字段组合

    --column-family:指定列簇名,所有的输出列都在同一列簇下   

    三、export

       export是HDFS里的文件导出到RDBMS的工具,不能从hive、hbase导出数据,且HDFS文件只能是文本格式。如果要把hive表数据导出到RDBMS,可以先把hive表通过查询写入到一个临时表,临时用文本格式,然后再从该临时表目录里export数据。

       3.1、task数

    Sqoop从HDFS目录里读取文件,所以启动的map task数依赖于-m参数、文件大小、文件数量、块大小等,可以参考这篇文章

       3.2、插入/更新

       默认情况下Sqoop在数据导出到RDBMS时,每行记录都转换成数据库的INSERT语句,但也支持插入/更新模式,即根据一定规则判断,如果是新记录用INSERT语句,否则就UPDATE,设置--update-mode allowinsert参数启用该功能,插入/更新操作依赖于目标数据库。

        对于Mysql,使用INSERT INTO … ON DUPLICATE KEY UPDATE语法,用户不能指定列来判断是插入还是更新,而是依赖于表的唯一约束,Mysql在插入数据时,如果是因唯一约束引起的错误,就更新数据行。Sqoop会忽略--update-key参数,但要至少指定一个有效列,才能启用更新模式

        3.3、参数

       --columns:指定插入目标数据库的字段,sqoop直接读取hdfs文件并把记录解析成多个字段,此时解析后的记录是没有字段名的,是通过位置和columns列表对应的,数据库插入的sql类似于:insert into _table (c1,c2...) value(v1,v2...)

     --export-dir:指定HDFS输入文件的目录

     --input-fields-terminated-by:字段之间分隔符

     --input-lines-terminated-by:行分隔符

    四、问题及优化

        4.1、Hive不支持的数据类型

     关系型数据库的字段类型和hive的字段类型还是有差别的,所以Sqoop有一个映射关系,把RDBMS中的类型映射到Hive类型。在create hive表时,会根据RDBMS类型和hive类型进行映射、进而设置hive表字段类型,如果没有匹配到,就会报异常,如VARBINARY:

        其解决方案有三种:

            1、在--query参数内显示对字段进行转换,如VARBINARY转换成VARCHAR,而Sqoop会默认的把VARCHAR转换成Hive的STRING.

            2、增加--map-column-hive参数,显示把字段映射到Hive指定的类型

            3、修改HiveTypes类,使Sqoop支持对特定类型(如:VARBINARY)的映射,这种方案相对以上两种可以一劳永逸,但要重编译sqoop源码

    类型映射逻辑如下:

    4.2、Java不支持的类型

        Sqoop创建ORM对象,数据库中的字段映射到Java属性,用于读取数据库ResultSet对象并解析字段,需要把数据库的类型映射到java类型,如果没有映射到,就会报错。解决方案也有三种:

            1、在--query参数内显示对字段进行转换

            2、设置--map-column-java参数

            3、修改ConnManager类

    映射逻辑代码:

    4.3、特殊字符

     当\t特殊字符导入到hive后,hive字段可能解析出错。解决方法是修改FieldFormatter类,使Sqoop可以删除或替换掉字段数据中包含\t的特殊字符:

    4.4、字段错位

        使用--query和--columns参数时,如果columns设置的列顺序和query列顺序不同,会有个疑惑是import后的字段和实际字段的值不一样,这是因为从数据库查询的ResultSet对象序列化到实体对象时,column的值是根据索引取的。

    例如readInteger的代码:

    如果要改为columns的字段值是根据字段名取而不是根据索引位置取,可以更改一下几个地方的代码:ClassWriter、JdbcWritableBridge

    本文首发于公众号:data之道

    相关文章

      网友评论

        本文标题:Sqoop最佳实践

        本文链接:https://www.haomeiwen.com/subject/xifyxftx.html