美文网首页
初识Hive

初识Hive

作者: 谭英智 | 来源:发表于2020-08-15 16:40 被阅读0次

    Hive是Facebook开源的用于解决海量结构化日志的数据统计工具。它是基于Hadoop的一个数据仓库工具,可以将结构化数据文件映射成一张表,并提供类SQL查询功能。本质是将HQL转化成MapReduce程序

    优点

    • 操作接口类SQL,提高开发效率
    • 避免写MapReduce
    • 延迟高,常用于数据分析,对实时性要求不高的场合
    • 善于处理大数据,对处理小数据没有优势,因为延迟高
    • 支持自定义函数

    缺点

    • 表达能力有限,无法表达迭代式算法,不善于数据挖掘
    • 效率低,生成的MapReduce作业不够智能化
    • 调优困难,粒度较粗

    架构

    hive-overview

    默认的MetaStore是derby,它只支持单用户模式,不能支持多个客户端同时访问,所以一般把它替换成Msql之类的数据库,通过JDBC对接

    Hive只是Hadoop的一个客户端,它只做了SQL的分析和转黄成MR和表元数据的管理

    元数据和HDFS存储数据分离,通过表名来关联,所以即使metastore有两个,如果他们的表一致,那么他们也可以访问到同一个HDFS文件

    与数据库比较

    • 查询语言

      HQL与SQL类似

    • 存储位置

      Hive存于HDFS上,数据库存于本地

    • 数据更新

      Hive是一个数据仓库,数据仓库的内容是读多写少,因此Hive不建议对数据修改。数据库通常支持频繁修改

    • 索引

      Hive没有索引,所有查询都是整个数据扫描,因此延迟高,通过MapReduce来并发访问数据,对于大数据来说,即使没有索引,通过并发读,性能也不会太差。数据库一般会建立索引,实时性高

    • 执行

      Hive通过Hadoop的MapReduce来执行;数据库由自己的引擎执行

    • 延迟

      Hive延迟高,数据库延迟低

    • 可扩展性

      Hive是客户端,hadoop集群可以非常高,2009年最大的规模是4000台;数据库扩展性差

    • 数据规模

      Hive支持大数据;数据库的支持数据规模小

    交互命令

    hive -e "sql"
    hive -f fileName
    

    数据类型

    DDL

    创建数据库和表,实际是跟HDFS的目录做关联,如果目录不存在,则创建,如果存在,则关联

    create database dbName;
    create database dbName location 'pathInHdfs';
    create table dbName.tableName;
    create database if not exists dbName;
    show databases;
    show databases like 'hive*';
    desc database <extended> dbName;
    #只能修改额外属性
    alter database dbName set dbproperties(attrName=value)
    drop database dbName <cascade>;
    
    create [external] table [if not exists] tableName
    [like tableName]
    [(colName type [comment])]
    [comment table]
    [partitioned by (colName type [comment])] #分区表
    [clustered by (colName, colName...) ino num_buckets buckets] #分桶表
    [sorted by (colName [ASC|DESC], ...)] 
    [row format row_format]
    [stroed as file_format]
    [location hdfs_path]
    

    客户端

    开启hiveserver2可以使用JDBC连接hive,例如beeline

    内部表

    也称管理表,如果删除表,HDFS上的数据也会删除

    外部表

    删除外部表,只会删除元数据,不会删除HDFS上的数据

    内部表和内部表的转换

    alter table tableName set tblproperties('ETERNAL'='TRUE/FALSE');

    desc formatted tableName

    分区表

    创建分区表的新字段,会以文件夹名出现,并可以使用where来查询,可以提高查询效率,如果只需要查特定的分区

    可以使用多个字段来创建分区,建立二级分区表

    在插入数据时,需要指定分区字段的值

    load data local inpath 'path' into table databaseName.tableName partition(colName='value')
    

    如果手工上传文件到HDFS,则需要对齐元数据与分区或者手工添加分区元数据

    msck repair table tableName
    

    修改表

    alter table tableName rename to newTableName;
    alter table tableName change [column] colOldName colNewName colType [comment string] [first|after colName]
    alter table tableName add|replace colums (colName colTYpe [comment])
    

    DML

    数据载入

    load data [local] inpath 'path' overwrite|into table tableName [partition(col1=value1,...)]
    #这相当于把文件直接put到hdfs上,不经过mr,所以在插入分桶分区的数据时,要使用insert,而不是load,走mr把数据插入
    
    insert overwrite|into table tableName partition(col=v) values (...);
    
    from tableName
    insert overwirte|into tableName2 partition(..)
    select * where month='202001'
    insert overwirte|into tableName2 partition(..)
    select * where month='202002'
    
    create table tableName as select ... from tableName2
    
    import table tableName from 'hdfs dir'
    

    数据导出

    insert overwrite|into [local] directory 'path'
    row format delimited fields terminated  by '\t'
    select * from tableName 
    
    dfs -get 'hdfsPath' localPath
    
    bin/hive -e 'sql'> localFileName
    
    export table databaseName.tableName to 'hdfsPath'
    #会导出元数据和数据文件
    

    清空数据

    truncate table tableName;
    

    查询

    select  [all|distinct] ...
    from tableName
    [where condition]
    [group by ..] [having ...]
    [order by ...ASC|DESC]
    [limit number]
    

    MapReduce内部排序sort by

    set mapreduce.job.reduce=3
    insert overwrite local directory 'path' select from tableName sort by colName asc|desc
    #可以看到结果会分成3个文件,文件内部排序输出
    #实际是起了三个reducer,map通过shuffer到三个R中,然后R排序并分别输出
    

    分区排序(Distribute by)

    set mapreduce.job.reduce=3
    insert overwrite local directory 'path' select from tableName distribute by colName1 sort by colName2 asc|desc
    #流程与上面类似,只是在shuffer的时候,通过指定的col来hash
    

    Cluster By

    select * from tableName cluster by sameColName
    =
    select * from tableName distribute by sameColName sort by sameColName;
    

    分桶表

    create table tableName (cols)
    clustered by (colName)
    into 4 buckets
    row format delimited fields terminated by '\t';
    
    set hive.enforce.bucketing=true;
    set mapreduce.job.reduces=1
    
    分桶表和分区表的区别

    分桶表通过自身字段来hash分文件

    分区表通过新增额外字段来hash分目录

    都是用来解决文件太大的问题

    分桶表抽样查询
    select * from table tabesample(bucket x out of y on id)
    #table分y份,从第x个bucked开始取,一共取(x/y)*total个样本
    #x必须小于y
    

    NVL

    select nvl(colName, value|colName2) from table;
    

    case when

    select dept, 
    sum(case sex when 'male' then 1 else 0 end) maleCount,
    sum(case sex when 'female' then 1 else 0 end) femaleCount
    from table
    group by dept;
    

    行转列

    select dept, concat_ws(",", collect_set(name))
    from table
    group by dept
    

    列转行

    select user_id,order_value,order_id
    from tableName
    lateral view explode(split(order_value,',')) num as order_id
    

    窗口查询

    over: 指定窗口大小

    current row: 当前行

    n preceding: 往前n行

    n following: 往后n行

    unbounded preceding: 第一行

    unbounded following: 最后一行

    lag(col, n): 往前第n行的col的值

    lead(col,n): 往后第n行col的值

    select name,count(*) over()
    from tableName
    where orderDate='2020-05'
    group by name;
    count输出groupby后的行数
    
    select *, sum(cost) over(distribute|partition by month(orderDate)) from tableName
    distribute相当于groupby,同一个月的cost的总数
    
    select *,sum(cost) over (sort by orderdate rows between unbounded preceding and current row)
    from tableName;
    #排序后,输出第一行到当前行的总数
    
    select *,lag(orderdate,1) over(distribute by name sort by orderdate) from tableName;
    #先groupby再排序,取上一条的orderdate
    
    select * from (
    select *,ntile(5) over(sort by orderdate) gid
    from table
    ) t
    where gid=1
    把数据先排序,再增加一列gid,从1到5设置
    

    rank

    select name,subject,
    rank() over(partition by subject order by score desc),#1,1,3,4
    dense_rank() over(partition by subject order by score desc),#1,1,2,3
    row_number() over(partition by subject order by score desc) #1,2,3,4
    from table
    

    snappy压缩

    编译hadoop,把snappy加上

    开启Map输出阶段压缩
    set hive.exec.compress.intermediate=true;
    set mapreduce.map.output.compress=true;
    set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
    
    开启reduce输出阶段压缩
    set mapreduce.output.fileoutputformat.compress=true;
    set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec
    set mapreduct.output.fileoutputformat.compress.type=BLOCK
    

    文件存储格式

    支持textfile/sequencefile/orc/parguet

    hive-store
    • 行存储:数据按一行行存储,一行的数据连续存储,方便查询的时候,多列同时查
    • 列存储:数据先按行做切分成多块,块内按列存储,同一列的数据连续存储,方便查询的时候,单列查询

    textfile/sequencefile是行存储

    orc/parguet是列存储

    textfile格式

    默认存储格式,数据不压缩,磁盘开销大,可以结合Gzip来使用

    ORC格式

    hive-orc

    数据默认压缩,块的大小为256m,块内按列存储。压缩率好,读取速度没差别

    create table name (...)
    row format delimited fields terminated by '\t'
    stored as orc tblproperties ("orc.compress"="SNAPPY")
    #默认的orc压缩率(zlib)会更高,但压缩速率没snappy好
    

    parquet格式

    与orc类似,但是压缩率没有orc好

    优化

    Fetch

    set hive.fetch.task.conversion=none/minmal/more
    none: 所有sql都走mr
    minimal:select */filter on partition/limit不走mr,其他都走mr
    more:select/filter/limit不走mr,其他都走mr
    

    本地模式

    hadoop的Job一般是跑在集群上的,但是对于数据量小的任务,触发执行的时间可能比执行时间多更多,此时,可以启动本地模式,把所有操作都在本地运行,加快执行速度

    set hive.exec.mode.local.auto=true #开启本地模式
    #当输入小于此字节数时,采用本地mr
    set hive.exec.mode.local.auto.inputbytes.max=500000;
    #当输入文件个数小于此数时,采用本地mr
    set hive.exec.mode.local.auto.input.files.max=10;
    

    小表join大表

    一般小表在左边,大表在右边。但是hive的高版本已经做了优化。

    set hive.auto.convert.join=true
    #开启时,会先缓存小表,并在map端join
    #不开启时,不缓存,所有的join会先shuffer到reduce上,再join
    
    hive-mapjoin

    空Key过滤

    有时join时,某些key对应的数据大多,而相同的key会hash到同一个reducer上,导致集群中的某个reducer承受过多的数据,而很多这样的key是null

    select * from (select * from t1 where id is not null) t left join t2 on t.id=t2.id;
    select * from t left join t2 on case when t.id is null then (rand()) else t.id end)=t2.id;
    #随机生成虽然解决了reducer的数据倾斜,但有可能有些业务无法处理
    

    group by

    hive.map.aggr=true #开启本地combinar
    hive.groupby.mapaggr.checkinterval=10000 #combinar的条数
    #默认为false,true用来解决数据倾斜,所以会导致某些业务无法处理
    hive.groupby.skewindata=true
    

    count(distinct)

    #会只能用一个reduce来排重,得出最后的结果
    select count(distinct id) from bigtable;
    #可以分散到多个reducer来计算
    select count(1) from (select id from bigtable group by id) t;
    

    笛卡儿积

    当join不加on时或者无效的on条件时,hive只能用1个reducer来完成笛卡儿积

    行列过滤

    在join表前,先把表用where来过滤,以减少join的数据量

    分区

    分桶

    动态分区

    当插入分区表时,一般需要在sql静态指明分区字段的值,否者插入会失败

    可以通过设置动态设置分区字段通过select的方法

    hive.exec.dynamic.partition=true#开启动态分区
    hive.exec.dynamic.partition.mode=nonstrict#开启非严格模式,严格模式为至少指定一个分区为非静态分区
    hive.exec.max.dynamic.partitions=1000#在所有mr节点最多可以创建多少个动态分区
    hive.exec.max.dynamic.partitions.pernode=100#在每个mr节点,最多可以创建多少个动态分区
    hive.exec.max.created.file=10000#在mr job中最大可以创建多少个hdfs文件
    hive.error.on.empty.partition=false#当分区为空时,是否报错
    
    insert overwrite table tableName partition(p_time)
    select id,name,p_time from tableName2;
    

    Map数量

    作业一般通过input目录产生一个或多个Map,主要决定因素有input文件总个数,文件大小,集群设置的文件块大小

    是不是map越多越好

    不是,如果一个任务处理很多小文件,启动job的时间都大于处理文件的时间,则会造成很大的浪费

    是不是保证每个map处理128m的文件块就好

    不是,如果文件每行需要的计算量非常大,如果还是按照128m来分的话,则有可能job的运行时间会超长

    mapreduce.input.fileinputformat.split.maxsize=100;
    mapreduce.input.fileinputformat.split.minsize=10;
    #通过设置maxsize小于blocksize就可以把文件拆开
    

    reduce数

    reduce处理的数据量默认时256m

    hive.exec.reducers.bytes.per.reducer=256000000;
    

    每个任务最大的reduce数默认1009

    hive.exec.reducers.max=1009
    

    设置reducer数

    mapreduce.job.reducer=10;
    

    有多少个reducer就会生成多少个结果文件,太多的reducer会消耗时间和资源,过多的结果文件会造成小文件

    小文件合并

    通过设置,可以在map前先把小文件合并,以减少map的数量

    hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
    

    并行执行

    hive的一个查询有可能有很多个阶段,而某些阶段有可能并没有依赖关系,此时可以开启并行模式来执行任务

    hive.exec.parallel=true;
    hive.exec.parallel.thread.number=16;
    

    严格模式

    hive.mapred.mode=strict/nonstrict
    #不能查询笛卡儿积
    #不能查询分区表而没有过滤分区条件
    #不能比较整形和字符串
    #不能比较整形和浮点
    #不能使用orderby而没有limit
    

    JVM重用

    Hadoop可以用一个JVM在同一个Job内重用N次,JVM启动的开销比较大,所以重用可以很好的提高性能

    mapreduce.job.jvm.numtasks
    10
    

    推测执行

    由于数据倾斜的问题,会导致hadoop的负载不均衡,某些job要很久也完成不了,造成任务都阻塞在最后一个job那里,hadoop通过推测机制,为这样的任务启动一个备份任务,让备份任务执行同一个数据,两个任务谁先完成计算,谁的结果就会成为最终结果

    mapreduce.map.speculative
    true/false
    

    相关文章

      网友评论

          本文标题:初识Hive

          本文链接:https://www.haomeiwen.com/subject/eagwcktx.html