还是如何将N个keys写到N个文件的需求。
这次的问题是单个key太大,引起的单个reduce任务执行时间过长,导致整个MR运行时间过长。数据大部分的key在千,万级别,而有几个key在亿,10亿级别。
解决数据倾斜问题的核心是将数据量很大的key,打散变小分配给多个reduce,最好能均匀分布,这样所有的reduce接收相同的数据量,大家执行时间相差不多,就解决了数据倾斜问题。
一个最好的算法或者说处理方式最好与业务无关。既然与业务无关,则需要有个地方统计各个key的数量,然后根据key的数量给其分配reduce的个数。
【尝试一】
规定一个key处理的个数为1w。通过combiner统计各个key的长度,然后将该长度与原key组成新key:
public class SplitTextCombiner extends Reducer<Text,Text,Text,Text> {
private Text outputKey = new Text();
private int counter = 0;
private final int MAX_RECOTD_PER_MAP = 10000;
//private OrcStruct pair = null;
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// if (pair == null){
// pair = (OrcStruct)OrcStruct.createValue(ISTool.getTypeDescription(context.getConfiguration()));
// }
for(Text oneValue:values){
outputKey.set(key.toString() + "&" + counter/MAX_RECOTD_PER_MAP);
context.write(outputKey, oneValue);
counter++;
}
}
}
这个尝试是失败的。因为MR框架是先经过Partition,再Combiner的。Combiner时数据已经分好reducer了。大key还是分给了一个reducer。我们这边的操作只是将一个大key分为多个小key,没啥作用的。
只能通过partition将大key分为多个小key,而partition的时候是无法知道key的数量。现在的需求是partition之前,需要知道key的数量级。
这个是对mr的处理流程不清晰,才会有这种错误的想法。。
【尝试二】
没办法只能通过指定key的方式分割数据。
在配置中指定大key的分割文件个数n,随机将大key分配到指定的n个文件中。
由于reduce个数的限制,一般一个key只会分配到几个文件中。这里采用随机生成大数,再求余的方式生成随机数:
package is.split;
import org.apache.commons.lang.math.RandomUtils;
public class CreateRandomValue {
private int begin;
private int end;
private final int max = 100000;
public CreateRandomValue(int begin, int end){
this.begin = begin;
this.end = end;
}
public int createRandom(){
int s = (RandomUtils.nextInt(max)%end)%(end-begin+1) + begin;
return s;
}
}
比如说我的配置项为(key-文件个数):10000:10,20000:20,3000:30。那么1000分配到第0-9reduce中,2000分配到第10-29reduce中,3000分配到30-59reduce中。避免大key分配到一个reduce,造成数据倾斜。
partition的时候对指定的key采用CreateRandomValue随机生成reduce序号即可。
网友评论