关键词 | 情形 |
---|---|
group by | 当某一个表分区重复数据较多,会导致数据倾斜,非常耗时 |
join | 当小表连接大表,处理大表的Map会慢且分配到的Reduce也会慢 |
count() | count会统计,交由一个Reduce来做,数据量大会很慢 |
group by解决方案
再 hive 里输入如下:
set hive.groupby.skewindata=true;
或在 hive-site.xml 里添加如下:
<property>
<name>hive.groupby.skewindata</name>
<value>true</value>
</property>
hive.groupby.skewindata=true:数据倾斜时负载均衡,当选项设定为true,生成的查询计划会有两个MRJob:
第一个MRJob中,Map的输出结果集合会随机分布到Reduce中,每个 Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的 GroupBy Key 有可能被分到不同的Reduce中,从而达到负载均衡的目的;
第二个MRJob再根据预处理的数据结果按照GroupBy Key分布到Reduce中(这个过程可以保证相同的GroupBy Key被分到统一个Reduce中),最后完成最终的聚合操作。
由上面可以看出,起到至关重要的作用其实时第二个参数的设置,它使计算变成了两个 MapReduce,现在第一个中再 shuffle 过程 partition 时随鸡给 key 打标机,使每个 key 随机均匀分布到各个 reduce 上计算,但是这样只能完成部分计算,因为相同key 没有分配到相同 reduce 上,所以需要第二次的 MapReduce,这次就回归正常 shuffle,但是数据分布不均匀的问题再第一次有很大的改善,因此基本解决数据倾斜。
join解决方案
再 hive 里输入如下:
set hive.auto.convert.join=true;
# 其中一个表大小小于25mb时,自动启用mapjoin
set hive.mapjoin.smalltable.filesize=25mb
或在 hive-site.xml 里添加如下:
<property>
<name>hive.auto.convert.join</name>
<value>true</value>
</property>
<property>
<name>hive.mapjoin.smalltable.filesize</name>
<value>50mb</value>
</property>
当使用以上参数设置时,需要注意sql写的时候,小表一定要在前面(最左侧)。
mapjoin 的主要意思就是,当连接两个表是一个比较小的表和一个特大的表的时候,我们把比较小的table直接放到内存中去,然后再对比较大的表格进行map操作。join就发生在map操作的时候,每当扫描一个大的table中的数据,就要去查看小表的数据,哪条与之相符,继而进行连接。这里的join并不会设计reduce操作。map端join的优势就是在于没有shuffle。
还有种sql优化的方式,就是有条件的话,先对大表进行条件过滤,后对他join。
count distinct 优化
优化前
select count(distinct id) from table_name
优化后
select count(*) from (select distinct id from table_name) tmp;
优化前的sql 会启用一个 Reduce 任务,优化后的会启用两个 Reduce 任务。优化后的 Reduce 必须设置 mapred.reduce.tasks 数量才有效,否则默认还是1个。而count一定之启用1个Reduce。
日常统计场景中,我们经常会对一段十七内的字段进行消重并统计数量,SQL语句类似于 优化前 的语句,这条语句是从一个表的符合where条件的记录中统计不重复的id的总数。该语句转化为 MapReduce 任务后执行示意图如下:
由于引入了 distinct,因此在Map阶段无法利用 combine 对输出结果消重,必须将id作为key输出,在Reduce阶段再对来自于不同 Map 相同 key 的结果进行消重,计入最终统计值。
我们看到任务运行时Reduce个数为1,对于统计大量数据量时,这会导致最终Map的全部输出由单个Reuduce处理,这唯一的Reduce需要shuffle大量的数据,并且进行排序聚合等处理,这使得它称为整个任务的IO和运算瓶颈。
经过上述分析后,我们可以尝试显示的增大 Reduce 任务个数来提高Reduce阶段的并发,使每一个Reduce的数据量控制在我们预想的范围。
set mapred.reduce.tasks=100
但这个调正不会影响 count 这种"全聚合(full aggregates)"的任务,它会忽略用户指定的
我们利用hive对嵌套语句的支持,将原来一个 MapReduce 任务转为两个任务,在第一阶段选出全部的非重复id,在第二阶段再对这些已消重的id进行统计。这样在第一阶段我们可以同过增大Reduce的并发数,并发处理Map输出。在第二阶段,由于id已经消重,因此count(*) 操作Map阶段不需要输出原id数据,只输出一个合并后的计数即可。这样即使第二阶段Hive强制指定一个Reduce任务,极少量的Map输出数据也不会使单一的Reduce任务成为瓶颈,改进后的视图如下:
调整切片数(map任务数)
Hive底层自动对小文件做了优化,用了 CombineTextInputFormat,将多个小文件切片合成一片,对应也就只会启动一个 Map 任务。
合并完成的切片大小,如果 > mapred.max.split.size 的大小,就启动一个新的切片任务。默认 mapred.max.split.size=134217728(128MB)
JVM重用
# 默认是1个
set mapred.job.reuse.jvm.num.task=20
JVM 重用是hadoop调优参数内容,对hive的性能具有非常大的影响,特别是对于很难避免小文件的场景或者task特别多的场景,这类场景大多数执行事件都很短。这时jvm的启动过程可能会造成相当大的开销,尤其时执行的Job包含由成千上万个task任务的情况。
JVM重用可以使得一个JVM进程在同一个JOB中重新使用N次后才会销毁。
启用严格模式
在hive里面可以通过严格模式防止用户执行那些可能产生意想不到的不好的效果的查询,从而保护hive的集群。
用户可以通过以下来设置严格模式,改成 unstrict 则为非严格模式。
set hive.mapred.mode=strict
在严格模式下,用户在运行如下查询的时候会报错:
- 分区表的查询语句没有使用分区字段来限制
- 使用了 order by 但没有使用limit 语句。(如果不使用limit,会对查询结果进行全局排序,消耗时间长)
- 产生了笛卡尔积
关闭推测执行机制
因为在测试环境下我们都把应用程序跑通了,如果还加上推测执行,如果有一个数据分片本来就会发生数据倾斜,执行事时间就是比其他的时间长,那么hive就会把这个执行时间长的job当作运行失败,继而又产生一个相同的job去运行,后果可想而知,可以通过如下设置关闭推测执行机制。该机制默认时开启的。
set mapreduce.map.speculative=false;
set mapreduce.reduce.speculative=false;
set hive.mapred.reduce.tasks.speculative.execution=false;
网友评论