美文网首页数据与未来hive
Hive的性能优化以及数据倾斜

Hive的性能优化以及数据倾斜

作者: Mervyn_2014 | 来源:发表于2018-01-27 23:17 被阅读262次

    hive性能优化

    一、Map阶段的优化:

    (控制hive任务中的map数,确定合适的map数,以及每个map处理合适的数据量)

    map个数影响因子:

    1. input目录中文件总个数;
    2. input目录中每个文件大小;
    3. 集群设置的文件块大小(默认为128M, 可在hive中通过set dfs.block.size;命令查看,不能在hive中自定义修改);
    举例:
    input目录中有1个文件(300M),会产生3个块(2个128M,1个44M)即3个Map数。
    input目录中有3个文件(5M,10M,200M),会产生4个块(5M,10M,128M,72M)即4个Map数。
    
    适当减少Map数:

    当一个任务有很多小文件(远远小于块大小128m),会产生很多Map,而一个Map任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费,而且同时可执行的map数是受限的。

    set mapred.max.split.size=100000000;//(100M)
    set mapred.min.split.size.per.node=100000000;
    set mapred.min.split.size.per.rack=100000000;
    set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;//表示执行前进行小文件合并。
    //大于128:按照128M分割;100~128按照100分;小于100的进行合并。
    
    适当增加Map数:

    当有一个小于128M的文件(其中有上千万的数据,字段少并且数据单位小),如果map处理的逻辑比较复杂,用一个map任务去做,耗时比较大。

    set mapred.reduce.tasks=10;
    create table a_1 as 
    select * from a distribute by rand();
    //表示通过设置Map任务数来中加Map,把a表中的数据均匀的放到a_1目录下10个文件中。
    
    Map端聚合:
    set  hive.map.aggr=true ;(默认为true)
    

    二、Reduce阶段的优化:

    2.1 指定reduce数量
     set mapred.reduce.tasks=10
    
    2.2未指定reduce数量
    param1:hive.exec.reducers.bytes.per.reducer(默认为1000^3)
    param2:hive.exec.reducers.max(默认为999)
    reduceNum = min(param2,总输入数据量/param1(reduceNum = InputFileSize / bytes per reducer))
    

    通常情况下,有必要手动指定reducer个数。考虑到map阶段的输出数据量通常会比输入有大幅减少,因此即使不设定reducer个数,重设参数2还是必要的。依据Hadoop的经验,可以将参数2设定为0.95*(集群中TaskTracker个数)。

    三、其他优化:

    Multi-insert & multi-group by

    从一份基础表中按照不同的维度,一次组合出不同的数据

    FROM from_statement
    INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1)] select_statement1 group by key1
    INSERT OVERWRITE TABLE tablename2 [PARTITION(partcol2=val2 )] select_statement2 group by key2
    #具体实例
    FROM pv_users 
    INSERT OVERWRITE TABLE pv_gender_sum
    SELECT pv_users.gender, count(DISTINCT pv_users.userid) 
    GROUP BY pv_users.gender 
    INSERT OVERWRITE DIRECTORY '/opt/data/users/pv_age_sum'
    SELECT pv_users.age, count(DISTINCT pv_users.userid) 
    GROUP BY pv_users.age; 
    
    生成MR Job 个数
    生成一个MR Job

    多表连接,如果多个表中每个表都使用同一个列进行连接(出现在JOIN子句中),则只会生成一个MR Job。

    SELECT a.val, b.val, c.val FROM a 
    JOIN b ON (a.key = b.key1) 
    JOIN c ON (c.key = b.key1)
    

    三个表a、b、c都分别使用了同一个字段进行连接,亦即同一个字段同时出现在两个JOIN子句中,从而只生成一个MR Job。

    生成多个MR Job

    多表连接,如果多表中,其中存在一个表使用了至少2个字段进行连接(同一个表的至少2个列出现在JOIN子句中),则会至少生成2个MR Job。

    SELECT a.val, b.val, c.val FROM a 
    JOIN b ON (a.key = b.key1) 
    JOIN c ON (c.key = b.key2)
    

    三个表基于2个字段进行连接,这两个字段b.key1和b.key2同时出现在b表中。连接的过程是这样的:首先a和b表基于a.key和b.key1进行连接,对应着第一个MR Job;表a和b连接的结果,再和c进行连接,对应着第二个MR Job。

    数据倾斜:

    倾斜原因:

    map输出数据按key Hash的分配到reduce中,由于key分布不均匀业务数据本身的特性建表时考虑不周某些SQL语句本身就有数据倾斜等原因造成的reduce上的数据量差异过大,所以如何将数据均匀的分配到各个reduce中,就是解决数据倾斜的根本所在。

    解决方案:

    1. 空值数据倾斜

    join的key值发生倾斜,key值包含很多空值或是异常值,这种情况可以对异常值赋一个随机值来分散key。
    案例:在日志中,常会有信息丢失的问题,比如日志中的 user_id,如果取其中的 user_id 和 用户表中的user_id 关联,会碰到数据倾斜的问题。

    select * from log l
    left outer join user u on 
    case when (l.user_id is null or I.user_id='-' or I.user_id='0') 
    then concat(‘sql_hive’,rand() ) else l.user_id end = u.user_id;
    
    2. Join操作产生数据倾斜
    2.1 大表和小表Join

    产生原因:Hive在进行join时,按照join的key进行分发,而在join左边的表的数据会首先读入内存,如果左边表的key相对分散,读入内存的数据会比较小,join任务执行会比较快;而如果左边的表key比较集中,而这张表的数据量很大,那么数据倾斜就会比较严重,而如果这张表是小表,则还是应该把这张表放在join左边。
    解决方式:使用map join让小的维度表先进内存。在map端完成reduce。
    在0.7.0版本之前:需要在sql中使用 /*+ MAPJOIN(smallTable) */ ;

    SELECT /*+ MAPJOIN(b) */ a.key, a.value
    FROM a
    JOIN b ON a.key = b.key;
    

    在0.7.0版本之后:可以配置hive.auto.convert.join

    配置项 缺省值 配置说明
    hive.auto.convert.join (0.7.0-0.10.0)false; (0.11.0-)true 注意:hive-default.xml模板中错误地将默认设置为false,在Hive 0.11.0到0.13.1
    hive.smalltable.filesize(0.7.0) or hive.mapjoin.smalltable.filesize(0.8.1) 25000000 默认值为2500000(25M),通过配置该属性来确定使用该优化的表的大小,如果表的大小小于此值就会被加载进内存中

    注意:使用默认启动该优化的方式如果出现默名奇妙的BUG(比如MAPJOIN并不起作用),就将以下两个属性置为fase手动使用MAPJOIN标记来启动该优化

    hive.auto.convert.join=false(关闭自动MAPJOIN转换操作)
    hive.ignore.mapjoin.hint=false(不忽略MAPJOIN标记)
    

    对于以下查询是不支持使用方法二(MAPJOIN标记)来启动该优化的

    select /*+MAPJOIN(smallTableTwo)*/ idOne, idTwo, value FROM
      ( select /*+MAPJOIN(smallTableOne)*/ idOne, idTwo, value FROM
        bigTable JOIN smallTableOne on (bigTable.idOne = smallTableOne.idOne)                                                  
      ) firstjoin                                                            
      JOIN                                                                 
      smallTableTwo ON (firstjoin.idTwo = smallTableTwo.idTwo)  
    

    但是,如果使用的是方法一即没有MAPJOIN标记则以上查询语句将会被作为两个MJ执行,进一步的,如果预先知道表大小是能够被加载进内存的,则可以通过以下属性来将两个MJ合并成一个MJ

    hive.auto.convert.join.noconditionaltask:Hive在基于输入文件大小的前提下将普通JOIN转换成MapJoin,
    并是否将多个MJ合并成一个
    hive.auto.convert.join.noconditionaltask.size:
    多个MJ合并成一个MJ时,其表的总的大小须小于该值,同时hive.auto.convert.join.noconditionaltask必须为true
    
    2.2 大表和大表Join

    产生原因:业务数据本身的特性,导致两个表都是大表。
    解决方式:业务削减。
    案例:user 表有 500w+ 的记录,把 user 分发到所有的 map 上也是个不小的开销,而且 map join 不支持这么大的小表。如果用普通的 join,又会碰到数据倾斜的问题。

    select * from log l left outer join user u
     on l.user_id = u.user_id;
    

    解决方法:当天登陆的用户其实很少,先只查询当天登录的用户,log里user_id有上百万个,这就又回到原来map join问题。所幸,每日的会员uv不会太多,有交易的会员不会太多,有点击的会员不会太多,有佣金的会员不会太多等等。所以这个方法能解决很多场景下的数据倾斜问题。

    select /*+mapjoin(u2)*/* from log l2
    left outer join
     (
    select  /*+mapjoin(l1)*/u1.*
    from ( select distinct user_id from log ) l1
    join user u1 on l1.user_id = u1.user_id
    ) u2
    on l2.user_id = u2.user_id;
    
    3. count distinct 聚 合 时 存 在 大 量 特 殊 值

    产生原因: 做count distinct时,该字段存在大量值为NULL或空的记录。
    解决方式: 做count distinct时,将值为空的情况单独处理,如果是计算count distinct,可以不用处理,直接过滤,在最后结果中加1。如果还有其他计算,需要进行group by,可以先将值为空的记录单独处理,再和其他计算结果进行union。
    案例
    1.只计算count distinct

    select cast(count(distinct user_id)+1 as bigint) as user_cnt
    from user
    where user_id is not null and user_id <> '';
    

    2.计算完count distinct 后面还有 group by。同一个reduce上进行distinct操作时压力很大,先将值为空的记录单独处理,再和其他计算结果进行union。
    在Hive中,经常遇到count(distinct)操作,这样会导致最终只有一个reduce,我们可以先group 再在外面包一层count,就可以了。

    select day,
    count(case when type='session' then 1 else null end) as session_cnt,
    count(case when type='user' then 1 else null end) as user_cnt
    from (
      select day,type
      from (
        select day,session_id,'session' as type from log
        union all
        select day user_id,'user' as type from log
      )
      group by day,type
    )t1 
    group by day;
    
    4. group by 产生倾斜的问题
    set hive.map.aggr=true
    

    开启map端combiner:在Map端做combine,若map各条数据基本上不一样, 聚合无意义,通过如下参数设置。

    hive.groupby.mapaggr.checkinterval = 100000 (默认)
    hive.map.aggr.hash.min.reduction=0.5(默认)
    

    解释:预先取100000条数据聚合,如果聚合后的条数小于100000*0.5,则不再聚合。

    set hive.groupby.skewindata=true;//决定  group by 操作是否支持倾斜数据。
    

    注意:只能对单个字段聚合。
    控制生成两个MR Job,第一个MR Job Map的输出结果随机分配到reduce中减少某些key值条数过多某些key条数过小造成的数据倾斜问题。
    在第一个 MapReduce 中,map 的输出结果集合会随机分布到 reduce 中, 每个reduce 做部分聚合操作,并输出结果。这样处理的结果是,相同的 Group By Key 有可能分发到不同的reduce中,从而达到负载均衡的目的;
    第二个 MapReduce 任务再根据预处理的数据结果按照 Group By Key 分布到 reduce 中(这个过程可以保证相同的 Group By Key 分布到同一个 reduce 中),最后完成最终的聚合操作。

    相关文章

      网友评论

        本文标题:Hive的性能优化以及数据倾斜

        本文链接:https://www.haomeiwen.com/subject/pkubaxtx.html