Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供类SQL查询功能。
本质是将SQL转换为MapReduce程序。
主要用途:用来做离线数据分析,比直接用MapReduce开发效率更高。
好处-----------使用Hive :
操作接口采用类SQL语法,提供快速开发的能力
避免了去写MapReduce,减少开发人员的学习成本
功能扩展很方便。
hive组件
用户接口:包括 CLI、JDBC/ODBC、WebGUI。其中,CLI(command line interface)为shell命令行;JDBC/ODBC是Hive的JAVA实现,与传统数据库JDBC类似;WebGUI是通过浏览器访问Hive。
元数据存储:通常是存储在关系数据库如 mysql/derby中。Hive 将元数据存储在数据库中。Hive 中的元数据包括表的名字,表的列和分区及其属性,表的属性(是否为外部表等),表的数据所在目录等。
解释器、编译器、优化器、执行器:完成 HQL 查询语句从词法分析、语法分析、编译、优化以及查询计划的生成。生成的查询计划存储在 HDFS 中,并在随后有 MapReduce 调用执行。
Hive利用HDFS存储数据,利用MapReduce查询分析数据。
Hive 数据模型
Hive中所有的数据都存储在HDFS中,没有专门的数据存储格式
在创建表时指定数据中的分隔符,Hive 就可以映射成功,解析数据。
Hive中包含以下数据模型:
db:在hdfs中表现为hive.metastore.warehouse.dir目录下一个文件夹
table:在hdfs中表现所属db目录下一个文件夹
external table:数据存放位置可以在HDFS任意指定路径
partition:在hdfs中表现为table目录下的子目录
bucket:在hdfs中表现为同一个表目录下根据hash散列之后的多个文件
5.Hive 安装部署
Hive安装前需要安装好JDK和Hadoop。配置好环境变量。
根据元数据存储的介质不同,分为下面两个版本,其中derby属于内嵌模式。实际生产环境中则使用mysql来进行元数据的存储。
- 内置derby版:
解压hive安装包
bin/hive 启动即可使用
缺点:不同路径启动hive,每一个hive拥有一套自己的元数据,无法共享 - mysql版:
解压、修改配置文件
vi conf/hive-site.xml
配置Mysql元数据库信息
数据库相关操作
Hive配置单元包含一个名为 default 默认的数据库.
create database [if not exists] <database name>;---创建数据库
show databases; --显示所有数据库
drop database if exists <database name> [restrict|cascade]; --删除数据库,默认情况下,hive不允许删除含有表的数据库,要先将数据库中的表清空才能drop,否则会报错
--加入cascade关键字,可以强制删除一个数据库,默认是restrict,表示有限制的
eg. hive> drop database if exists users cascade;
use <database name>; --切换数据库
分区表(PARTITIONED BY)
分区建表分为2种,一种是单分区,也就是说在表文件夹目录下只有一级文件夹目录。另外一种是多分区,表文件夹下出现多文件夹嵌套模式。
单分区建表语句:create table day_table (id int, content string) partitioned by (dt string);单分区表,按天分区,在表结构中存在id,content,dt三列。
双分区建表语句:create table day_hour_table (id int, content string) partitioned by (dt string, hour string);双分区表,按天和小时分区,在表结构中新增加了dt和hour两列。
导入数据
LOAD DATA local INPATH '/root/hivedata/dat_table.txt' INTO TABLE day_table partition(dt='2017-07-07');
LOAD DATA local INPATH '/root/hivedata/dat_table.txt' INTO TABLE day_hour_table PARTITION(dt='2017-07-07', hour='08');
基于分区的查询:
SELECT day_table.* FROM day_table WHERE day_table.dt = '2017-07-07';
查看分区:
show partitions day_hour_table;
总的说来partition就是辅助查询,缩小查询范围,加快数据的检索速度和对数据按照一定的规格和条件进行管理。
ROW FORMAT DELIMITED(指定分隔符)
create table day_table (id int, content string) partitioned by (dt string) row format delimited fields terminated by ','; ---指定分隔符创建分区表
复杂类型的数据表指定分隔符
create table complex_array(name string,work_locations array<string>) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' COLLECTION ITEMS TERMINATED BY ',';
数据如下:
zhangsan beijing,shanghai,tianjin,hangzhou
wangwu shanghai,chengdu,wuhan,haerbin
create table t_map(id int,name string,hobby map<string,string>)
row format delimited
fields terminated by ','
collection items terminated by '-'
map keys terminated by ':' ;
数据:
1,zhangsan,唱歌:非常喜欢-跳舞:喜欢-游泳:一般般
2,lisi,打游戏:非常喜欢-篮球:不喜欢
内部表、外部表
建内部表
create table student(Sno int,Sname string,Sex string,Sage int,Sdept string) row format delimited fields terminated by ',';
建外部表
create external table student_ext(Sno int,Sname string,Sex string,Sage int,Sdept string) row format delimited fields terminated by ',' location '/stu';
内、外部表加载数据:
load data local inpath '/root/hivedata/students.txt' overwrite into table student;
load data inpath '/stu' into table student_ext;
本地模式
set hive.exec.mode.local.auto=true;
分桶表 cluster by 、sort by、distribute by
指定开启分桶
set hive.enforce.bucketing = true;
set mapreduce.job.reduces=4;
TRUNCATE TABLE stu_buck;
drop table stu_buck;
create table stu_buck(Sno int,Sname string,Sex string,Sage int,Sdept string)
clustered by(Sno)
sorted by(Sno DESC)
into 4 buckets
row format delimited
fields terminated by ',';
分桶表导入数据
insert overwrite table stu_buck
select * from student cluster by(Sno) sort by(Sage); 报错,cluster 和 sort 不能共存
insert overwrite table stu_buck
select * from student cluster by(Sno);
对某列进行分桶的同时,根据另一列进行排序
insert overwrite table stu_buck
select * from student distribute by(Sno) sort by(Sage asc);
总结:
cluster(分且排序,必须一样)==distribute(分) + sort(排序)(可以不一样)
增加/删除分区
drop table t_partition;
create table t_partition(id int,name string)
partitioned by (dt string)
row format delimited
fields terminated by ',';
增加分区
alter table t_partition add partition (dt='2008-08-08') location 'hdfs://node-21:9000/t_parti/';
执行添加分区时 /t_parti文件夹下的数据不会被移动。并且没有分区目录dt=2008-08-08
删除分区
alter table t_partition drop partition (dt='2008-08-08');
执行删除分区时/t_parti下的数据会被删除并且连同/t_parti文件夹也会被删除
注意区别于load data时候添加分区:会移动数据 会创建分区目录
Insert查询语句
多重插入:
create table source_table (id int, name string) row format delimited fields terminated by ',';
create table test_insert1 (id int) row format delimited fields terminated by ',';
create table test_insert2 (name string) row format delimited fields terminated by ',';
from source_table
insert overwrite table test_insert1
select id
insert overwrite table test_insert2
select name;
动态分区插入
set hive.exec.dynamic.partition=true; #是否开启动态分区功能,默认false关闭。
set hive.exec.dynamic.partition.mode=nonstrict; #动态分区的模式,默认strict,表示必须指定至少一个分区为静态分区,nonstrict模式表示允许所有的分区字段都可以使用动态分区。
需求:
将dynamic_partition_table中的数据按照时间(day),插入到目标表d_p_t的相应分区中。
原始表:
create table dynamic_partition_table(day string,ip string)row format delimited fields terminated by ",";
load data local inpath '/root/hivedata/dynamic_partition_table.txt' into table dynamic_partition_table;
2015-05-10,ip1
2015-05-10,ip2
2015-06-14,ip3
2015-06-14,ip4
2015-06-15,ip1
2015-06-15,ip2
目标表:
create table d_p_t(ip string) partitioned by (month string,day string);
动态插入:
insert overwrite table d_p_t partition (month,day)
select ip,substr(day,1,7) as month,day
from dynamic_partition_table;
查询结果导出到文件系统
3、将查询结果保存到指定的文件目录(可以是本地,也可以是hdfs)
insert overwrite local directory '/home/hadoop/test'
select * from t_p;
insert overwrite directory '/aaa/test'
select * from t_p;
关于hive中的各种join
准备数据
1,a
2,b
3,c
4,d
7,y
8,u
2,bb
3,cc
7,yy
9,pp
建表:
create table a(id int,name string)
row format delimited fields terminated by ',';
create table b(id int,name string)
row format delimited fields terminated by ',';
导入数据:
load data local inpath '/root/hivedata/a.txt' into table a;
load data local inpath '/root/hivedata/b.txt' into table b;
实验:
** inner join
select * from a inner join b on a.id=b.id;
+-------+---------+-------+---------+--+
| a.id | a.name | b.id | b.name |
+-------+---------+-------+---------+--+
| 2 | b | 2 | bb |
| 3 | c | 3 | cc |
| 7 | y | 7 | yy |
+-------+---------+-------+---------+--+
**left join
select * from a left join b on a.id=b.id;
+-------+---------+-------+---------+--+
| a.id | a.name | b.id | b.name |
+-------+---------+-------+---------+--+
| 1 | a | NULL | NULL |
| 2 | b | 2 | bb |
| 3 | c | 3 | cc |
| 4 | d | NULL | NULL |
| 7 | y | 7 | yy |
| 8 | u | NULL | NULL |
+-------+---------+-------+---------+--+
**right join
select * from a right join b on a.id=b.id;
+-------+---------+-------+---------+--+
| a.id | a.name | b.id | b.name |
+-------+---------+-------+---------+--+
| 2 | b | 2 | bb |
| 3 | c | 3 | cc |
| 7 | y | 7 | yy |
| NULL | NULL | 9 | pp |
+-------+---------+-------+---------+--+
**
select * from a full outer join b on a.id=b.id;
+-------+---------+-------+---------+--+
| a.id | a.name | b.id | b.name |
+-------+---------+-------+---------+--+
| 1 | a | NULL | NULL |
| 2 | b | 2 | bb |
| 3 | c | 3 | cc |
| 4 | d | NULL | NULL |
| 7 | y | 7 | yy |
| 8 | u | NULL | NULL |
| NULL | NULL | 9 | pp |
+-------+---------+-------+---------+--+
**hive中的特别join
select * from a left semi join b on a.id = b.id;
+-------+---------+--+
| a.id | a.name |
+-------+---------+--+
| 2 | b |
| 3 | c |
| 7 | y |
+-------+---------+--+
相当于
select a.id,a.name from a where a.id in (select b.id from b); 在hive中效率极低
select a.id,a.name from a join b on (a.id = b.id);
cross join(##慎用)
返回两个表的笛卡尔积结果,不需要指定关联键。
select a.,b. from a cross join b;
内置jason函数
select get_json_object(line,'.rate') as rate from rat_json limit 10;
transform案例:
1、先加载rating.json文件到hive的一个原始表 rat_json
create table rat_json(line string) row format delimited;
load data local inpath '/root/hivedata/rating.json' into table rat_json;
2、需要解析json数据成四个字段,插入一张新的表 t_rating
drop table if exists t_rating;
create table t_rating(movieid string,rate int,timestring string,uid string)
row format delimited fields terminated by '\t';
insert overwrite table t_rating
select get_json_object(line,'.rate') as rate,get_json_object(line,'.uid') as uid from rat_json limit 10;
3、使用transform+python的方式去转换unixtime为weekday
先编辑一个python脚本文件
########python######代码
vi weekday_mapper.py
!/bin/python
import sys
import datetime
for line in sys.stdin:
line = line.strip()
movieid, rating, unixtime,userid = line.split('\t')
weekday = datetime.datetime.fromtimestamp(float(unixtime)).isoweekday()
print '\t'.join([movieid, rating, str(weekday),userid])
保存文件
然后,将文件加入hive的classpath:
hive>add FILE /root/hivedata/weekday_mapper.py;
create table u_data_new as select
transform (movieid, rate, timestring,uid)
using 'python weekday_mapper.py'
as (movieid, rate, weekday,uid)
from t_rating;
select distinct(weekday) from u_data_new limit 10;
hive表关联查询,如何解决数据倾斜的问题?
倾斜原因:
map输出数据按key Hash的分配到reduce中,由于key分布不均匀、业务数据本身的特点、建表时考虑不周、等原因造成的reduce 上的数据量差异过大。
当数据量比较大,并且key分布不均匀,大量的key都shuffle到一个reduce上了,就出现了数据的倾斜。
1)、key分布不均匀;
2)、业务数据本身的特性;
3)、建表时考虑不周;
4)、某些SQL语句本身就有数据倾斜;
如何避免:对于key为空产生的数据倾斜,可以对其赋予一个随机值。
解决方案
1>.参数调节:
hive.map.aggr = true
hive.groupby.skewindata=true
有数据倾斜的时候进行负载均衡,当选项设定位true,生成的查询计划会有两个MR Job。第一个MR Job中,Map的输出结果集合会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的Group By Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;第二个MR Job再根据预处理的数据结果按照Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个Reduce中),最后完成最终的聚合操作。
2>.SQL 语句调节:
常见的数据倾斜出现在group by和join..on..语句中。
join(数据倾斜)
在进行两个表join的过程中,由于hive都是从左向右执行,要注意讲小表在前,大表在后(小表会先进行缓存)。
map/reduce程序执行时,reduce节点大部分执行完毕,但是有一个或者几个reduce节点运行很慢,导致整个程序的处理时间很长,这是因为某一个key的条数比其他key多很多(有时是百倍或者千倍之多),这条key所在的reduce节点所处理的数据量比其他节点就大很多,从而导致某几个节点迟迟运行不完,此称之为数据倾斜。hive在跑数据时经常会出现数据倾斜的情况,使的作业经常reduce完成在99%后一直卡住,最后的1%花了几个小时都没跑完,这种情况就很可能是数据倾斜的原因.
hive.groupby.skewindata=true;
如果是group by过程出现倾斜应将此项设置true。
<property>
<name>hive.groupby.skewindata</name>
<value>false</value>
<description>Whether there is skew in data to optimize group by queries</description>
</property>
hive.optimize.skewjoin.compiletime=true;
如果是join 过程中出现倾斜应将此项设置为true
不影响结果可以考虑过滤空值
<property>
<name>hive.optimize.skewjoin.compiletime</name>
<value>false</value>
</property>
hive.optimize.skewjoin.compiletime=true; 如果是join过程出现倾斜应该设置为true
此时会将join语句转化为两个mapreduce任务,第一个会给jion字段加随机散列
set hive.skewjoin.key=100000; 这个是join的键对应的记录条数超过这个值则会进行优化。
可以在空值前面加随机散列
1)、选用join key分布最均匀的表作为驱动表。做好列裁剪和filter操作,以达到两表做join 的时候,数据量相对变小的效果。
2)、大小表Join:
使用map join让小的维度表(1000 条以下的记录条数)先进内存。在map端完成reduce.
4)、大表Join大表:
把空值的key变成一个字符串加上随机数,把倾斜的数据分到不同的reduce上,由于null 值关联不上,处理后并不影响最终结果。
5)、count distinct大量相同特殊值:
count distinct 时,将值为空的情况单独处理,如果是计算count distinct,可以不用处理,直接过滤,在最后结果中加1。如果还有其他计算,需要进行group by,可以先将值为空的记录单独处理,再和其他计算结果进行union。
. 请谈一下hive的特点是什么?hive和RDBMS有什么异同?
hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供完整的sql查询功能,可以将sql语句转换为MapReduce任务进行运行。其优点是学习成本低,可以通过类SQL语句快速实现简单的MapReduce统计,不必开发专门的MapReduce应用,十分适合数据仓库的统计分析。
请说明hive中 Sort By,Order By,Cluster By,Distrbute By各代表什么意思。
order by:会对输入做全局排序,因此只有一个reducer(多个reducer无法保证全局有序)。只有一个reducer,会导致当输入规模较大时,需要较长的计算时间。
sort by:不是全局排序,其在数据进入reducer前完成排序。
distribute by:按照指定的字段对数据进行划分输出到不同的reduce中。
cluster by:除了具有 distribute by 的功能外还兼具 sort by 的功能。
简要描述数据库中的 null,说出null在hive底层如何存储,并解释select a.* from t1 a left outer join t2 b on a.id=b.id where b.id is null; 语句的含义
null与任何值运算的结果都是null, 可以使用is null、is not null函数指定在其值为null情况下的取值。
null在hive底层默认是用'\N'来存储的,可以通过alter table test SET SERDEPROPERTIES('serialization.null.format' = 'a');来修改。
查询出t1表中与t2表中id相等的所有信息。
写出hive中split、coalesce及collect_list函数的用法(可举例)。
Split将字符串转化为数组。
split('a,b,c,d' , ',') ==> ["a","b","c","d"]
COALESCE(T v1, T v2, …) 返回参数中的第一个非空值;如果所有值都为 NULL,那么返回NULL。
collect_list列出该字段所有的值,不去重 select collect_list(id) from table;
写出将 text.txt 文件放入 hive 中 test 表‘2016-10-10’ 分区的语句,test 的分区字段是 l_date。
LOAD DATA LOCAL INPATH '/your/path/test.txt' OVERWRITE INTO TABLE test PARTITION (l_date='2016-10-10')
Hive的两个重要参数是什么?
1.hive -e ‘’ 从命令行执行指定的HQL
2.hive -f *.hql 执行hive脚本命令
Q15:Hive中如何复制一张表的表结构(不带有被复制表数据)
create table a like b;
Q16:Hive中追加导入数据的4种方式是什么?请写出简要语法。
1.从本地导入: load data local inpath '/home/1.txt' (overwrite)into table student;
2.从Hdfs导入: load data inpath '/user/hive/warehouse/1.txt' (overwrite)into table student;
3.查询导入: create table student1 as select * from student;(也可以具体查询某项数据)
4.查询结果导入:insert (overwrite)into table staff select * from track_log;
Hive导出数据有几种方式?如何导出数据
1、用insert overwrite导出方式
导出到本地:
insert overwrite local directory ‘/home/robot/1/2’ rom format delimited fields terminated by ‘\t’ select * from staff;(递归创建目录)
2、导出到HDFS
insert overwrite directory ‘/user/hive/1/2’ rom format delimited fields terminated by ‘\t’ select * from staff;
3、Bash shell覆盖追加导出
例如:$ bin/hive -e “select * from staff;” > /home/z/backup.log
4、Sqoop把hive数据导出到外部
hive 内部表和外部表区别
创建表时:创建内部表时,会将数据移动到数据仓库指向的路径;若创建外部表,仅记录数据所在的路径, 不对数据的位置做任何改变。
删除表时:在删除表的时候,内部表的元数据和数据会被一起删除, 而外部表只删除元数据,不删除数据。这样外部表相对来说更加安全些,数据组织也更加灵活,方便共享源数据。
分区和分桶的区别
分区
是指按照数据表的某列或某些列分为多个区,区从形式上可以理解为文件夹,比如我们要收集某个大型网站的日志数据,一个网站每天的日志数据存在同一张表上,由于每天会生成大量的日志,导致数据表的内容巨大,在查询时进行全表扫描耗费的资源非常多。
那其实这个情况下,我们可以按照日期对数据表进行分区,不同日期的数据存放在不同的分区,在查询时只要指定分区字段的值就可以直接从该分区查找
分桶
分桶是相对分区进行更细粒度的划分。
分桶将整个数据内容按照某列属性值得hash值进行区分,如要按照name属性分为3个桶,就是对name属性值的hash值对3取摸,按照取模结果对数据分桶。
如取模结果为0的数据记录存放到一个文件,取模为1的数据存放到一个文件,取模为2的数据存放到一个文件
hive 优化
1.fetch task任务不走MapReduce,可以在hive配置文件中设置最大化和最小化fetch task任务;通常在使用hiveserver2时调整为more;
2.strict mode:严格模式设置,严格模式下将会限制一些查询操作
文件格式,ORC PARQUET 等
分区表
select 查询不加where过滤条件,不会执行
开启严格模式
hive提供的严格模式,禁止3种情况下的查询模式。
a:当表为分区表时,where字句后没有分区字段和限制时,不允许执行。
b:当使用order by语句时,必须使用limit字段,因为order by 只会产生一个reduce任务。
c:限制笛卡尔积的查询。sql语句不加where不会执行
- 优化sql语句,如先过滤再join,先分组再做distinct;
4.MapReduce过程的map、shuffle、reduce端的snappy压缩
需要先替换hadoop的native本地包开启压缩
在mapred-site.xml文件设置启用压缩及压缩编码
在执行SQL执行时设置启用压缩和指定压缩编码
set mapreduce.output.fileoutputformat.compress.codec=org apache.hadoop.io.compress.SnappyCodec;
5.大表拆分成子表,提取中间结果集,减少每次加载数据
多维度分析,多个分析模块
每个分析模块涉及字段不一样,而且并不是表的全部字段
6.分区表及外部表
设计二级分区表(一级字段为天,二级字段设置小时)
创建的的是外部表,创建表时直接指定数据所在目录即可,不用再用load加载数据
7.设置map和reduce个数:默认情况下一个块对应一个map任务,map数据我们一般不去调整,reduce个数根据reduce处理的数据量大小进行适当调整体现“分而治之”的思想
set mapred.reduce.tasks=3;
8.JVM重用:一个job可能有多个map reduce任务,每个任务会开启一个JVM虚拟机,默认情况下一个任务对应一个JVM,任务运行完JVM即销毁,我们可以设置JVM重用参数,一般不超过5个,这样一个JVM内可以连续运行多个任务
JVM重用是Hadoop调优参数的内容,对Hive的性能具有非常大的影响,特别是对于很难避免小文件的场景或者task特别多的场景,这类场景大多数执行时间都很短。hadoop默认配置是使用派生JVM来执行map和reduce任务的,这是jvm的启动过程可能会造成相当大的开销,尤其是执行的job包含有成千上万个task任务的情况。
JVM重用可以使得JVM实例在同一个JOB中重新使用N次,N的值可以在Hadoop的mapre-site.xml文件中进行设置(建议参考5~10)
mapred.job.reuse.jvm.num.tasks(旧版)
mapreduce.job.jvm.numtasks(新版)
hadoop.apache.org/docs/r2.5.2/hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml
http://hadoop.apache.org/docs/r2.5.2/hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml
也可在hive的执行设置:
***set mapred.job.reuse.jvm.num.tasks=10;
hive (default)> set mapred.job.reuse.jvm.num.tasks;
mapred.job.reuse.jvm.num.tasks=1
- 推测执行:例如一个Job应用有10个MapReduce任务(map 及reduce),其中9个任务已经完成,那么application Master会在另外启动一个相同的任务来运行未完成的那个,最后哪个先运行完成就把另一个kill掉
启用speculative最大的好处是,一个map执行的时候,系统会在其他空闲的服务器上启动相同的map来同时运行,哪个运行的快就使用哪个的结果,另一个运行慢的在有了结果之后就会被kill。
hive-site.xml
hive.mapred.reduce.tasks.speculative.execution=true;
<property>
<name>hive.mapred.reduce.tasks.speculative.execution</name>
<value>true</value>
<description>Whether speculative execution for reducers should be turned on. </description>
</property>
网友评论