1 使用 BulkLoad 向 HBase 中批量导入数据
2 背景介绍
2.1 概述
我们经常面临向 HBase 中导入大量数据的情景。
往 HBase 中批量加载数据的方式有很多种,最直接方式是调用 HBase 的 API 用 put 方法插入数据;另外一种是用 MapReduce 的方式从 hdfs 上加载数据,调用 TableOutputFormat 类在 reduce 中直接生成 put 对象写入 HBase(这种方式可以看作多线程的调用 hbase API 方式);但是这两种方式效率都不是很高,因为 HBase 会频繁的进行 flush、compact、split 操作,需要消耗较大的 CPU 和网络资源,并且 region Server 压力也比较大。
BulkLoad 方式调用 MapReduce 的 job 直接将数据输出成 HBase table 内部的存储格式的文件 HFile,然后将生成的 StoreFiles 加载到集群的相应节点。这种方式无需进行 flush、compact、split 等过程,不占用 region 资源,不会产生巨量的写入 I/O,所以需要较少的 CPU 和网络资源。在首次数据加载时,能极大的提高写入效率,并降低对 Region Server 节点的写入压力。
2.2 Hbase API 方式数据导入流程
了解调用 Hbase API**方式导入流程之前,我们先来看一张 hbase**的架构图。
220225w7jjubcffppjek9w.png• Region Server 管理一系列的 region,每个 region 又由多个 Store 组成;
• 每个 Store 对应 hbase 的一个列族(Column Family),每个 Store 又由 MemStore 和 StoreFile 两部分组成;
• MemStore 是 sorted Memory Buffer,MemStore 满之后会 flush 到 StoreFile 中,每个 StoreFile 对应一个实际的 HFile 文件。
• HLog 记录每个 RegionServer 的数据操作日志,防止 RegionServer 突然宕机导致 MemStore 中的数据丢失,如果 regionServer 突然宕机,HLog 将对丢失的数据进行恢复;HLog 中的数据会定期滚动更新,删除已经持久化到 StoreFile 的旧文件。
下面看调用 HBase API**往 Hbase**中插入数据的流程。
220225kp3iuxjf13gdqpss.png• client 端写入操作实际上都是 RPC 请求,数据传到 Region Server 中,默认首先会写入到 WAL(Write Ahead Log)中,也就是 HLog 中,然后才将数据写入到对应 region 的 memStore 中,memStore 满了之后,flush 到 HFile 中,这种情况的 flush 操作会引起瞬间堵塞用户的写操作。
• 当 StoreFile 数量达到一定的阈值,会触发 compact 合并操作,将多个 storeFile 合并成一个大的 StoreFile,这一过程包含大量的硬盘 I/O 操作以及网络数据通信。单个 StoreFile 过大超过阈值之后会触发 region 的 split 操作,并将大的 StoreFile 一分为二。
该方式在大数据量写入时效率低下(频繁进行 flush,split,compact 耗费磁盘 I/O),还会对影响 HBase 节点的稳定性造(GC 时间过长,响应变慢,导致节点超时退出,并引起一系列连锁反应)。
2.3 BulkLoad 导入流程
BulkLoad 涉及两个过程:
-
Transform 阶段:使用 MapReduce 将 HDFS 上的数据生成成 HBase 的底层 Hfile 数据。
-
Load 阶段:根据生成的目标 HFile,利用 HBase 提供的 BulkLoad 工具将 HFile Load 到 HBase 的 region 中。
在这里需要注意,在 bulkLoading 执行之前要提前把数据导入到 hdfs 上,因为 mapreduce 只能读取 HDFS 上的数据;如果原始数据在 hdfs 上占用 100G 大小的空间,那么 hdfs 上的预留的空间大小要大于 200G,因为数据要首先生成 hfile 也是放在 hdfs 临时目录下。
2.4 bulkload 和 put 适合的场景:
• bulkload 适合的场景:
– 大量数据一次性加载到 HBase。
– 对数据加载到 HBase 可靠性要求不高,不需要生成 WAL 文件。
– 使用 put 加载大量数据到 HBase 速度变慢,且查询速度变慢时。
– 加载到 HBase 新生成的单个 HFile 文件大小接近 HDFS block 大小。
• put 适合的场景:
– 每次加载到单个 Region 的数据大小小于 HDFS block 大小的一半。
– 数据需要实时加载。
– 加载数据过程不会造成用户查询速度急剧下降。
2.5 Bulkload 批量导入数据 shell 操作步骤:
1.将数据导入到 HDFS 中
2.建表并创建导入模板文件
3.执行命令,生成 HFile 文件
4.执行命令将 HFile 导入 HBase
3 示例
3.1 场景描述:
陕西省西安市 2018 年全市户籍总人口 905.68 万人,公安系统现在需要把这些人口信息从原来的数据存储库迁移至 HBase 中。
3.2 原始数据:
原始数据 person_information.txt 文件中人口信息按行分别为:
身份证号,姓名,出生年月,性别,户籍所在区县
610122000000001001,陈精业,19940524,男,蓝田县
610102000000001002,王军林,19870402,男,新城区
610111000000001003,田心亚,19681103,女,灞桥区
610125000000001004,王朝辉,19970608,男,鄠邑区
610112000000001005,冯诗雨,19900801,女,未央区
610112000000001006,黄秋野,19990505,男,未央区
610122000000001007,彭云超,20011205,男,蓝田县
610116000000001008,尤丽雯,19981123,女,长安区
610104000000001009,龚小刚,20050817,男,莲湖区
610124000000001010,兰春元,19870609,女,周至县
610115000000001011,王苏涛,19881107,男,临潼区
610114000000001012,周东来,19761028,男,阎良区
610103000000001013,邱维坚,19770929,男,碑林区
610116000000001014,卓鹏宇,19730726,男,长安区
610104000000001015,尤丽雯,19690317,女,莲湖区
610102000000001016,张雪红,19820109,女,新城区
610104000000001017,赵静静,19660527,女,莲湖区
610124000000001018,曹笑天,19980616,女,周至县
610112000000001019,李晓萍,19851114,女,未央区
610115000000001020,牛红艺,19930520,女,临潼区
3.3 shell 操作步骤 :
3.3.1 1.将数据导入到 HDFS 中
HBase 不管理数据提取这部分过程。
通常需要导入的外部数据都是存储在其它的关系型数据库或一些文本文件中,我们需要将数据提取出来并放置于 HDFS 中。也借助 ETL 工具可以解决大多数关系型数据库向 HDFS 迁移数据的问题。
例如,我们可以在一个表上运行 mysqldump(mysql 数据库中备份工具,用于将 MySQL 服务器中的数据库以标准的 sql 语言的方式导出保存到文件中。)并将结果文件上传到 HDFS。
执行命令:
- 在 HDFS 上创建一个目录/testBulkload
hdfs dfs -mkdir /testBulkload
- 把 linux 本地/opt 目录下 person_information.txt 文件传到 HDFS 上/testBulkload 目录下
hdfs dfs -put /opt/person_information.txt /testBulkload
- 可使用下面命令查看一下
hdfs dfs -cat /testBulkload/person_information.txt
结果展示
上传源数据文件:
220226kll4er5hdqim4qzn.png3.3.2 2.建表并自定义导入模板文件
建表:
需要根据导入数据,设计好 HBase 数据表的表名、rowkey、列族、列,考虑好 row key 分配在创建表时进行预分割。
根据源数据和业务设计表,在 hbase shell 下执行命令:
- person_information_table:表名
- NAME => 'base':列族名称。
- COMPRESSION:压缩方式
- DATA_BLOCK_ENCODING:编码算法
- SPLITS:预分 region
create 'person_information_table', {NAME => 'base',COMPRESSION => 'SNAPPY', DATA_BLOCK_ENCODING => 'FAST_DIFF'},SPLITS => \['1','2','3','4','5','6','7','8'\]
建表结果展示:
建表成功:
220227ywmr5m92z9wrm292.png模板文件示例说明:
模板文件可以参考“${client path}/HBase/hbase/conf/index_import.xml.template”文件进行编辑。
模板文件示例说明:
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!-- column_nmu: 要和源数据文件中的列的数量对应 -->
<!-- id: 无关紧要,只是一个ID标识,可不修改 -->
<import id="first" column_num="4">
<!-- index: 原始数据中的列坐标 SMS_ADDRESS -->
<!-- type: 列数据类型 -->
<!-- SMS_ADDRESS: 原始数据列临时字段名 -->
<columns>
<column index="1" type="int">SMS_ID</column>
<column index="2" type="string">SMS_NAME</column>
<column index="3" type="boolean">SMS_ADDRESS</column>
<column index="4" type="long">SMS_SERIAL</column>
</columns>
<!--根据自己数据和业务设计rowkey -->
<rowkey>
SMS_ID+'_'+substring(SMS_NAME,1,4)+'_'+reverse(SMS_SERIAL)
</rowkey>
<!-- 定义HTable列 -->
<!-- family: 列族columns family -->
<!-- column: 对应于原始数据中的列 -->
<qualifiers>
<!-- column: 定义普通列 -->
<normal family="NF">
<!-- H_ID: HBase表列的字段名 -->
<qualifier column="SMS_ID">H_ID</qualifier>
<qualifier column="SMS_NAME">H_NAME</qualifier>
<qualifier column="SMS_ADDRESS">H_ADDRESS</qualifier>
<qualifier column="SMS_SERIAL">H_SERIAL</qualifier>
</normal>
<!-- 定义组合列 (可选) -->
<composite family="CF1">
<!-- H_COMBINE_1:组合列名 -->
<qualifier class="com.huawei.H_COMBINE_1">H_COMBINE_1</qualifier>
<columns>
<!-- 用如下三个列去组合 -->
<column>SMS_SERIAL</column>
<column>SMS_ADDRESS</column>
<column>SMS_NAME</column>
</columns>
</composite>
</qualifiers>
<!-- 定义二级索引 (可选) -->
<indices>
<index name="IDX1">
<index_column family="NF">
<!-- 支持的类型只有String,Int,Float,Long,Double,Short,Byte,Char -->
<!-- 不支持将复合列定义为索引 -->
<!-- 索引类型的首字母需要大写,例如“type”=“String” -->
<qualifier type="String" length="10">H_NAME</qualifier>
<qualifier type="String" length="20">H_ADDRESS</qualifier>
</index_column>
</index>
<index name="IDX2">
<index_column family="CF2">
<!-- “length="30"”指索引列“H_SERIAL”的列值不能超过30个字符。-->
<qualifier type="String" length="30">H_SERIAL</qualifier>
</index_column>
</index>
</indices>
<!-- 定义要过滤的数据行规则 (可选) -->
<badlines>SMS_ID < 7000 && SMS_NAME == 'HBase'</badlines>
</import>
</configuration>
注意:
因 bulkload 本身的限制,自定义时关注如下几点:
• 列的名称不能包含特殊字符,只能由字母、数字和下划线组成。
• 当将列的类型设置为 string 时,不能设置其长度。例如“<column index="1" type="string" length="1" >COLOUMN_1”,此类型不支持。
• 当将列的类型设置为 date 时,不能设置其日期格式。例如“<column index="13" type="date" format="yyyy-MM-dd hh:mm:ss">COLOUMN_13”,此类型不支持。
• 不能针对组合列建立二级索引。
根据业务编辑模板文件:
编辑好后放到/opt 目录下
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!--column_num要和数据文件(person_information.txt)中的列的数量对应:5列-->
<import id="first" column_num="5">
<columns>
<column index="1" type="string">P_ID</column>
<column index="2" type="string">P_NAME</column>
<column index="3" type="string">P_BIRTH</column>
<column index="4" type="string">P_GENDER</column>
<column index="5" type="string">P_DISTRICT</column>
</columns>
<!--reverse(P_BIRTH):反转出生年月避免热点-->
<!--substring(P_NAME,0,1):截取姓 -->
<!--substring(P_ID,0,6):截身份证前六位 -->
<rowkey>
reverse(P_BIRTH)+'_'+substring(P_NAME,0,1)+'_'+substring(P_ID,0,6)
</rowkey>
<qualifiers>
<!--family的指定要和表的列族名称对应。-->
<normal family="base">
<qualifier column="P_ID">H_ID</qualifier>
<qualifier column="P_NAME">H_NAME</qualifier>
<qualifier column="P_BIRTH">H_BIRTH</qualifier>
<qualifier column="P_GENDER">H_GENDER</qualifier>
<qualifier column="P_DISTRICT">H_DISTRICT</qualifier>
</normal>
</qualifiers>
</import>
</configuration>
3.3.3 3.执行如下命令,生成 HFile 文件。
命令示例:
hbase com.huawei.hadoop.hbase.tools.bulkload.IndexImportData -Dimport.skip.bad.lines=true -Dimport.separator=<separator> -Dimport.bad.lines.output=</path/badlines/output> -Dimport.hfile.output=</path/for/output> <configuration xmlfile> <tablename> <inputdir>
• -Dimport.separator:分隔符。例如,-Dimport.separator=','
• -Dimport.skip.bad.lines:指定值为 false,表示遇到不适用的行则停止执行。指定值为 true,表示遇到不适用的数据行则跳过该行继续执行,如果没有在 configuration.xml 中定义不适用行,该参数不需要添加。
• -Dimport.bad.lines.output=</path/badlines/output>:指的是不适用的数据行输出路径,如果没有在 configuration.xml 中定义不适用行,该参数不需要添加。
• -Dimport.hfile.output=< /path/for/output>:指的是执行结果输出路径。
• < configuration xmlfile>:指向 configuration 配置文件。
• < tablename>:指的是要操作的表名。
• < inputdir>:指的是要批量上传的数据目录。
根据场景业务执行命令:
hbase com.huawei.hadoop.hbase.tools.bulkload.IndexImportData -Dimport.separator=',' -Dimport.hfile.output=/testBulkload/hfile /opt/configuration_index.xml person_information_table /testBulkload/person_information.txt
结果展示:
base 列族数据生成 hfile
220227okw33f65kuozi6uf.png3.3.4 4.执行如下命令将 HFile 导入 HBase。
执行命令:
- /testBulkload/hfile :hfile 在 HDFS 上的位置
- person_information_table : hbase 表名
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /testBulkload/hfile person_information_table
结果展示:
表中部分数据:
220228b4h89xxrv19hxq8x.png4 异常处理
4.1 1、使用 HBase bulkload 导入数据成功,执行相同的查询时却可能返回不同的结果
问题
在使用 HBase bulkload 导入数据时,如果导入的数据存在相同的 rowkey 值,数据可以导入成功,但是执行相同的查询时可能返回不同的结果。
回答
正常情况下,相同 rowkey 值的数据加载到 HBase 是有先后顺序的,HBase 以最近的时间戳的数据为最新数据,一般的默认查询中,没有指定时间戳的,就会对相同 rowkey 值的数据仅返回最新数据。
使用 bulkload 加载数据,由于数据在内存中处理生成 HFile,速度是很快的,很可能出现相同 rowkey 值的数据具有相同时间戳,从而造成查询结果混乱的情况。
建议在建表和数据加载时,设计好 rowkey 值,尽量避免在同一个数据文件中存在相同 rowkey 值的情况。
网友评论