美文网首页
sqoop导入数据到hbase,在phoenix创建视图和索引流

sqoop导入数据到hbase,在phoenix创建视图和索引流

作者: cyclone_29 | 来源:发表于2019-04-18 16:37 被阅读0次

    使用sqoop向hbase导入数据的时候,一般不要让sqoop自动创建表,那样不能控制表的属性信息,比如分区等。在导入数据之前,手动在hbase中创建相应的表。
    建表注意点:
    1.观察源表,查看主键的长度,如果主键的字符数量比每个列的值的字符数量都要多很多,那么可以使用数据块编码,设置DATA_BLOCK_ENCODING=>'FAST_DIFF',可以减少rowkey存储所使用的空间,如果有某一列或者几列的值的字符数量特别多,远超主键字符数量,可以设置使用数据压缩,COMPRESSION=>'SNAPPY'
    2.设置表的分区方式,查看源表主键的生成方式,如果是数值自增型,可以在主键的前面增加UUID,如果是16进制字符串,但不是随机值,可以考虑在主键的前面增加随机数,随机数的取值范围根据需要设置的分区的数量确定,如果是前面增加UUID,可以参考以下的建表语句:

       create 'TBL_ROAD',{NAME=>'F', DATA_BLOCK_ENCODING=>'FAST_DIFF'或者COMPRESSION=>'snappy'},{NUMREGIONS => 估算的region的数量, SPLITALGO => 'HexStringSplit'}(如果预期数据总量小于100W,不需要使用预分区)
    

    预分区注意点:
    Administrators can pre-split tables during table creation based on the target number of
    regions per RegionServer to avoid costly dynamic splitting as the table starts to fill up. In
    addition, it ensures that the regions in the pre-split table are distributed across many host
    machines. Pre-splitting a table avoids the cost of compactions required to rewrite the data
    into separate physical files during automatic splitting. If a table is expected to grow very
    large, administrators should create at least one region per RegionServer. However, do
    not immediately split the table into the total number of desired regions. Rather, choose
    a low to intermediate value. For multiple tables, do not create more than one region per
    RegionServer, especially if you are uncertain how large the table will grow. Creating too
    many regions for a table that will never exceed 100 MB in size isn't useful; a single region
    can adequately services a table of this size
    大意是:在创建表时,管理员可以根据每个RegionServer上region目标数量预先拆分表,以避免在表开始填满时进行代价高昂的动态分割。此外,它还确保预分割表中的region分布在多个主机上。预分割表避免了在自动拆分期间将数据重写入多个物理文件的compact消耗。如果一个表预期增长非常快时,管理员应该为每个RegionServer为该表创建至少一个region。然而,不要立即将表分割为所需region的总数。相反,选择一个低到中间的值。对于数据较多的表,不要为表在单个regionServer创建多于一个region,特别是当您不确定表会增长到多大时。对于一个永远不会超过100mb大小的表来说,创建许多region是没有用的;一个region就可以提供足够的服务。
    如果是前面增加随机数,可以参考以下的建表语句:

        create 'TBL_ROAD',{NAME=>'F', DATA_BLOCK_ENCODING=>'FAST_DIFF'或者COMPRESSION=>'snappy'},SPLITS=>['0|','1|','2|','3|','4|','5|','6|','7|','8|']
    

    上面的建表语句是为源表的非随机16进制字符串主键增加0-9的随机数,预先创建10个分区。
    此外,不要创建过多的列族,将数据存储在一个列族上可以加快数据的查询速度。
    创建完成之后,可以考虑加上AggregateImplementation协处理器,可以用来通过Java代码的方式查询表的总行数,不过如果是特别大的表查询速度仍然很慢,根据经验100W-200W条数据需要1s
    参考命令如下:

        disable 'TBL_ROAD'
        alter 'TBL_ROAD', METHOD =>'table_att','coprocessor'=>'|org.apache.hadoop.hbase.coprocessor.AggregateImplementation||'
        enable 'TBL_ROAD'
    

    代码参考:

    def getRowNumOfSpecifiedTable(tableName: String): Long= {
        import org.apache.hadoop.hbase.client.coprocessor.AggregationClient
        import org.apache.hadoop.hbase.HBaseConfiguration
        import org.apache.hadoop.hbase.TableName
        import org.apache.hadoop.hbase.client.Scan
        import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter
            val configuration = HBaseConfiguration.create()
            configuration.addResource("core-site.xml")
            configuration.addResource("hbase-site.xml")
            configuration.addResource("hdfs-site.xml")
            val scan = new Scan()
            val table = TableName.valueOf(tableName.trim)
            val aggregationClient = new AggregationClient(configuration)
            try {
               val result = aggregationClient.rowCount(table, new LongColumnInterpreter(), scan)
               result
            } catch {
              case _: Exception => throw new RuntimeExceprion("数据表过大,查询超时")
            }finally {
                if(aggregationClient!=null) aggregationClient .close()
             }
       }
    

    数据导入命令参考:

        sqoop import -Dmapred.child.java.opts='-Djava.security.egd=file:/dev/../dev/urandom' -Dmapred.job.name='TBL_ROAD_1' --connect jdbc:oracle:thin:@localhost:1521:orcl --username xxx --password 123456 --query 'SELECT ID, NAME, LOCATION FROM TBL_ROAD WHERE  $CONDITIONS' --hbase-table TBL_ROAD  --hbase-row-key ID --column-family F -m 1
    

    命令参数说明:
    -Dmapred.child.java.opts='-Djava.security.egd=file:/dev/../dev/urandom':
    如果不加上这个参数,导入数据耗时时间较长的时候可能会出现以下异常:

      ERROR manager.SqlManager: Error executing statement: java.sql.SQLRecoverableException: IO Error: Connection reset
      java.sql.SQLRecoverableException: IO Error: Connection reset
      at oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:752)
      at oracle.jdbc.driver.PhysicalConnection.connect(PhysicalConnection.java:662)
      at oracle.jdbc.driver.T4CDriverExtension.getConnection(T4CDriverExtension.java:32)
      at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:560)
      at java.sql.DriverManager.getConnection(DriverManager.java:571)
      at java.sql.DriverManager.getConnection(DriverManager.java:233)
      at org.apache.sqoop.manager.OracleManager.makeConnection(OracleManager.java:325)
      at org.apache.sqoop.manager.GenericJdbcManager.getConnection(GenericJdbcManager.java:52)
    

    -Dmapred.job.name='TBL_ROAD_1':
    该参数指定了在yarn上该任务的任务名称,方便查询任务的进度信息。
    当源表存在联合主键的时候,可使用以下的方式来指定rowkey,如下使用COL1,COL2作为联合主键

    sqoop import -Dmapred.child.java.opts='-Djava.security.egd=file:/dev/../dev/urandom' -Dmapred.job.name='TBL_ROAD_1' --connect jdbc:oracle:thin:@localhost:1521:orcl --username xxx --password 123456 --query 'SELECT ID, NAME, LOCATION,COL1,COL2,COL3  FROM TBL_ROAD WHERE  $CONDITIONS' --hbase-table TBL_ROAD  --hbase-row-key COL1,COL2 --column-family F -m 1
    

    rowkey的值为下划线拼接的各列的值
    因为作为联合主键的任意列的值不能为null,当源表存在这样的联合主键的时候不会出现导入异常,但是如果源表不存在任何主键,但是导入到hbase需要指定rowkey,可以选择若干列或者一列作为主键,这样可能会出现重复数据覆盖的现象,指定为rowkey的任意列不能出现null,否则导入报错
    数据导入完成之后,由于直接查询hbase较为复杂,可以通过phoenix进行数据查询,在phoenix中创建相应的视图或者表以映射hbase表,Apache Phoenix接受SQL查询,将其编译为一系列HBase扫描,并协调这些扫描的运行以生成常规JDBC结果集。
    如果只做查询,强烈建议使用 phoenix 视图方式映射,删除视图不影响 hbase 源数据,语法如下:

        create view TBL_ROAD(PK VARCHAR PRIMARY KEY,F.NAME VARCHAR,F.LOCATION VARCHAR);
    

    如果必须要表映射,需要禁用列映射规则(会降低查询性能),如下:

      CREATE TABLE TBL_ROAD(PK VARCHAR PRIMARY KEY,F.NAME VARCHAR,F.LOCATION VARCHAR)COLUMN_ENCODED_BYTES=0;
    

    还有一些创建表或者视图时的可选项,官网http://phoenix.apache.org/language/index.html有详细的介绍,我就不搬砖了。
    索引类型:http://phoenix.apache.org/secondary_indexing.html
    索引主要有global index和local index,上面网址介绍了这两者的区别,下面主要介绍索引的创建和可能出现的问题。
    1.索引创建的必要性,如果表的数量较小,无需进行索引创建,查询速度也令人满意。
    2.数据导入完成之后,为了加快查询,需要加上索引,如果数据量较大,这时不能在phoenix命令行直接对表或者视图创建索引,会出现超时异常
    这时需要启动mapreduce任务批量创建索引
    异步创建local index:

    create local index TBL_ROAD_LOCAL_INDEX_1 ON TBL_ROAD(NAME,LOCATION)BLOOMFILTER='ROW' async;
    

    启动mapreduce任务填充索引:

    HADOOP_CLASSPATH=/opt/cloudera/parcels/CDH/lib/hbase/lib/hbase-protocol-1.2.0-cdh5.14.2.jar:/etc/hbase/conf:/etc/spark/conf.cloudera.spark_on_yarn/yarn-conf hadoop jar /opt/cloudera/parcels/APACHE_PHOENIX/lib/phoenix/phoenix-4.14.0-cdh5.14.2-client.jar org.apache.phoenix.mapreduce.index.IndexTool --data-table src-tablename --index-table index-tablename --output-path /indexesPath(hdfs path)
    

    mapreduce任务完成之后,会自动进行索引的激活,之后该索引可以参与相关的查询
    说明:增加hbase和yarn的配置信息,保证mapreduce任务运行在yarn上,否则任务运行在本地,当hbase表数据量稍大的时候,会引起内存溢出的异常
    --data-table 需要创建索引的表或者视图的名称
    --index-table 创建的索引的名称
    --output-path mapreduce任务创建的索引文件的临时存储位置,任务会在参数指定的目录下创建子目录,名称为需要创建索引的表的名称,任务完成之后会将临时数据移动到索引表目录下,删除创建的子目录,并将创建的索引激活以用于查询
    注意点:
    在向单分区表load数据或者在多分区表上创建单分区索引的时候,可能会出现以下异常:

        ERROR mapreduce.LoadIncrementalHFiles: Trying to load more than 32 hfiles to family d of region  with start key
        Exception in thread "main" java.io.IOException: Trying to load more than 32 hfiles to one family of one region
        at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.doBulkLoad(LoadIncrementalHFiles.java:288)
        at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.run(LoadIncrementalHFiles.java:842)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
        at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.main(LoadIncrementalHFiles.java:847)
    

    该问题的出现属于HBase本身的限制,HBase在Bulk Load时默认一个region的hfile个数是32,当hfile文件个数超过32个时则会报上述错误
    可以通过以下方式解决:
    修改HBase配置,调大hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily值,需要重启HBase才生效

        <property>
                <name>hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily</name>
                <value>100</value>
        </property>
    

    但是hbase表原本存在hbase.hstore.blockingStoreFiles的限制,当移动的hfile的数量超过这个限制的时候,可能会引起store的写入异常,这个没有具体测试过,但是尽量避免,这种情况可以通过创建多分区索引表避免.
    数据直接导入到hbase:
    在hbase中建表,并进行预分区并接收导入的数据,然后在phoenix中建表进行映射,那么不能使用SALT_BUCKETS属性,否则出现主键列查询异常(实际hbase中是有数据的),之后在phoenix中创建索引辅助查询,global index索引表不是分区的,默认只是一个分区,如果创建的是local index,由于只是在hbase表中增加一个新的列族,保证索引数据和源数据在一个region上.
    数据直接导入phoenix:
    在phoenix中建表,指定SALT_BUCKETS,并直接接收数据,创建的global index表也是自动分区的,分区数量和源表相同,这样流程稍微复杂一些,需要分成两步进行,首先使用sqoop将数据导入到hdfs上,然后使用CsvBulkLoadTool读取hdfs文件导入到phoenix表中,如果之前已经创建了索引,索引数据同步更新,优点是可以保持原本字段的数据类型,而不需要像直接导入hbase那样,设置phoenix所有字段类型为VARCHAR.
    如果命令执行出现namespaces mapping异常,关于如何配置namespace mapping,可以查看http://phoenix.apache.org/namspace_mapping.html,配置这个的主要目的是将在phoenix中的schema和hbase的namespace进行映射,不配置也没有关系,如果出现了异常,可以使用以下命令进行phoenix-4.14.0-cdh5.14.2-client.jar的更新
    说明:我搭建的cdh环境为5.14.2,不要直接照搬命令,参考实际的情况操作。

    jar uf /opt/cloudera/parcels/APACHE_PHOENIX/lib/phoenix/phoenix-4.14.0-cdh5.14.2-client.jar /etc/hbase/conf/hbase-site.xml
    

    相关文章

      网友评论

          本文标题:sqoop导入数据到hbase,在phoenix创建视图和索引流

          本文链接:https://www.haomeiwen.com/subject/vlervqtx.html