1.区分排序
cluster by:对同一字段分桶并排序,不能和 sort by 连用
distribute by + sort by:分桶,保证同一字段值只存在一个结果文件当中,结合 sort by 保证
每个 reduceTask 结果有序
sort by:单机排序,单个 reduce 结果有序
order by:全局排序,缺陷是只能使用一个 reduce
2.解决笛卡尔积
MapJoin 在子查询中可能出现未知 BUG。在大表和小表做笛卡尔积时,规避笛卡尔积的 方法是,
给 Join 添加一个 Join key,原理很简单:将小表扩充一列 join key,并将小表的条 目复制数倍,join key 各不相同;
将大表扩充一列 join key 为随机数。
精髓就在于复制几倍,最后就有几个 reduce 来做,而且大表的数据是前面小表扩张 key 值 范围里面随机出来的,
所以复制了几倍 n,就相当于这个随机范围就有多大 n,那么相应的, 大表的数据就被随机的分为了 n 份。
并且最后处理所用的 reduce 数量也是 n,而且也不会 出现数据倾斜。
3.解决in/exists
用left semi join
select a.id, a.name from a where a.id in (select b.id from b);
select a.id, a.name from a left semi join b on a.id = b.id;
4.设置合理的maptask
dfs.blocksize 数据块大小
mapreduce.input.fileinputformat.split.minsize 最小分片大小
mapreduce.input.fileinputformat.split.maxsize 最大分片大小
5.设置jvm数量
set mapred.job.reuse.jvm.num.tasks=5
因为 Hive 语句最终要转换为一系列的 MapReduce Job 的,而每一个 MapReduce Job 是由一 系列的 MapTask 和 ReduceTask 组成的,
默认情况下, MapReduce 中一个 MapTask 或者一个 ReduceTask 就会启动一个 JVM 进程,一个 Task 执行完毕后,
JVM 进程就退出。这样如果任 务花费时间很短,又要多次启动 JVM 的情况下,JVM 的启动时间会变成一个比较大的消耗,
6.小文件合并
set hive.merge.mapfiles = true ##在 map only 的任务结束时合并小文件
set hive.merge.mapredfiles = false ## true 时在 MapReduce 的任务结束时合并小文件
set hive.merge.size.per.task = 256*1000*1000 ##合并文件的大小
set mapred.max.split.size=256000000; ##每个 Map 最大分割大小
set mapred.min.split.size.per.node=1; ##一个节点上 split 的最少值
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; ##执行 Map 前进行小文件合并
7.设置reducetask数量
1、hive.exec.reducers.bytes.per.reducer(默认为 256000000)
2、hive.exec.reducers.max(默认为 1009)
3、mapreduce.job.reduces=-1(设置一个常量 reducetask 数量)
计算 reducer 数的公式很简单: N=min(参数 2,总输入数据量/参数 1) 通常情况下,有必要手动指定 reducer 个数。
考虑到 map 阶段的输出数据量通常会比输入有 大幅减少,因此即使不设定 reducer 个数,重设参数 2 还是必要的。
8.合理利用分桶和采样samping
9.合理利用分区
10.join优化
1、 优先过滤后再进行 Join 操作,最大限度的减少参与 join 的数据量
2、 小表 join 大表,最好启动 mapjoin
3、 Join on 的条件相同的话,最好放入同一个 job,并且 join 表的排列顺序从小到大
在编写 Join 查询语句时,如果确定是由于 join 出现的数据倾斜,那么请做如下设置:
set hive.skewjoin.key=100000; // 这个是 join 的键对应的记录条数超过这个值则会进行 分拆,值根据具体数据量设置
set hive.optimize.skewjoin=true; // 如果是 join 过程出现倾斜应该设置为 true
11.Group By 优化
1、Map 端部分聚合
set hive.map.aggr = true 是否在 Map 端进行聚合,默认为 True
set hive.groupby.mapaggr.checkinterval = 100000 在 Map 端进行聚合操作的条目数目
2、使用 Group By 有数据倾斜的时候进行负载均衡
set hive.groupby.skewindata = true
12.合理利用文件存储格式
行式存储和列式存储 多用列式orc或parquet
13.本地模式执行 MapReduce
hive.exec.mode.local.auto 默认false 让hive决定是否在本地模式自动运行
hive.exec.mode.local.auto.input.files.max 默认4 不启用本地模式的task最大个数
hive.exec.mode.local.auto.inputbytes.max 默认128M 不启动本地模式的最大输入文件大小
14.并行化处理
set hive.exec.parallel=true;
set hive.exec.parallel.thread.number=8; //同一个 sql 允许并行任务的最大线程数
15.设置压缩存储
多用snappy
Job 输出文件按照 block 以 GZip 的方式进行压缩:
set mapreduce.output.fileoutputformat.compress=true // 默认值是 false
set mapreduce.output.fileoutputformat.compress.type=BLOCK // 默认值是 Record
set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec
// 默认值是 org.apache.hadoop.io.compress.DefaultCodec
Map 输出结果也以 Gzip 进行压缩:
set mapred.map.output.compress=true
set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.GzipCodec
// 默认值是 org.apache.hadoop.io.compress.DefaultCodec
对 Hive 输出结果和中间都进行压缩:
set hive.exec.compress.output=true // 默认值是 false,不压缩
set hive.exec.compress.intermediate=true // 默认值是 false,为 true 时 MR 设置的压缩才启用
网友评论